Some minor refactoring on the code

Summary: I made some cleanup while reading the source code in `db`. Most changes are about style, naming or C++ 11 new features.

Test Plan: ran `make check`

Reviewers: haobo, dhruba, sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15009
This commit is contained in:
kailiu 2013-12-30 18:33:57 -08:00
parent f1cec73a76
commit 476416c27c
7 changed files with 189 additions and 173 deletions

View File

@ -20,8 +20,8 @@
#include <vector>
#include "db/builder.h"
#include "db/dbformat.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
@ -43,7 +43,6 @@
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "port/port.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
@ -59,7 +58,7 @@
namespace rocksdb {
void dumpLeveldbBuildVersion(Logger * log);
void DumpLeveldbBuildVersion(Logger * log);
// Information kept for every waiting writer
struct DBImpl::Writer {
@ -266,9 +265,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
storage_options_(options),
bg_work_gate_closed_(false),
refitting_level_(false) {
mem_->Ref();
env_->GetAbsolutePath(dbname, &db_absolute_path_);
stall_leveln_slowdown_.resize(options.num_levels);
@ -282,16 +279,15 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
const int table_cache_size = options_.max_open_files - 10;
table_cache_.reset(new TableCache(dbname_, &options_,
storage_options_, table_cache_size));
versions_.reset(new VersionSet(dbname_, &options_, storage_options_,
table_cache_.get(), &internal_comparator_));
dumpLeveldbBuildVersion(options_.info_log.get());
DumpLeveldbBuildVersion(options_.info_log.get());
options_.Dump(options_.info_log.get());
char name[100];
Status st = env_->GetHostName(name, 100L);
if (st.ok()) {
Status s = env_->GetHostName(name, 100L);
if (s.ok()) {
host_name_ = name;
} else {
Log(options_.info_log, "Can't get hostname, use localhost as host name.");
@ -502,7 +498,7 @@ void DBImpl::SuperVersion::Init(MemTable* new_mem, const MemTableList& new_imm,
}
// Returns the list of live files in 'sst_live' and the list
// of all files in the filesystem in 'all_files'.
// of all files in the filesystem in 'candidate_files'.
// no_full_scan = true -- never do the full scan using GetChildren()
// force = false -- don't force the full scan, except every
// options_.delete_obsolete_files_period_micros
@ -554,15 +550,18 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
versions_->AddLiveFiles(&deletion_state.sst_live);
if (doing_the_full_scan) {
// set of all files in the directory
env_->GetChildren(dbname_, &deletion_state.all_files); // Ignore errors
// set of all files in the directory. We'll exclude files that are still
// alive in the subsequent processings.
env_->GetChildren(
dbname_, &deletion_state.candidate_files
); // Ignore errors
//Add log files in wal_dir
if (options_.wal_dir != dbname_) {
std::vector<std::string> log_files;
env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors
deletion_state.all_files.insert(
deletion_state.all_files.end(),
deletion_state.candidate_files.insert(
deletion_state.candidate_files.end(),
log_files.begin(),
log_files.end()
);
@ -575,11 +574,10 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
// check if there is anything to do
if (!state.all_files.size() &&
!state.sst_delete_files.size() &&
!state.log_delete_files.size()) {
if (state.candidate_files.empty() &&
state.sst_delete_files.empty() &&
state.log_delete_files.empty()) {
return;
}
@ -589,38 +587,52 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
if (state.manifest_file_number == 0) {
return;
}
uint64_t number;
FileType type;
std::vector<std::string> old_log_files;
// Now, convert live list to an unordered set, WITHOUT mutex held;
// set is slow.
std::unordered_set<uint64_t> live_set(state.sst_live.begin(),
state.sst_live.end());
std::unordered_set<uint64_t> sst_live(
state.sst_live.begin(), state.sst_live.end()
);
state.all_files.reserve(state.all_files.size() +
state.sst_delete_files.size());
auto& candidate_files = state.candidate_files;
candidate_files.reserve(
candidate_files.size() +
state.sst_delete_files.size() +
state.log_delete_files.size());
// We may ignore the dbname when generating the file names.
const char* kDumbDbName = "";
for (auto file : state.sst_delete_files) {
state.all_files.push_back(TableFileName("", file->number).substr(1));
candidate_files.push_back(
TableFileName(kDumbDbName, file->number).substr(1)
);
delete file;
}
state.all_files.reserve(state.all_files.size() +
state.log_delete_files.size());
for (auto filenum : state.log_delete_files) {
if (filenum > 0) {
state.all_files.push_back(LogFileName("", filenum).substr(1));
for (auto file_num : state.log_delete_files) {
if (file_num > 0) {
candidate_files.push_back(
LogFileName(kDumbDbName, file_num).substr(1)
);
}
}
// dedup state.all_files so we don't try to delete the same
// dedup state.candidate_files so we don't try to delete the same
// file twice
sort(state.all_files.begin(), state.all_files.end());
auto unique_end = unique(state.all_files.begin(), state.all_files.end());
sort(candidate_files.begin(), candidate_files.end());
candidate_files.erase(
unique(candidate_files.begin(), candidate_files.end()),
candidate_files.end()
);
for (const auto& to_delete : candidate_files) {
uint64_t number;
FileType type;
// Ignore file if we cannot recognize it.
if (!ParseFileName(to_delete, &number, &type)) {
continue;
}
for (size_t i = 0; state.all_files.begin() + i < unique_end; i++) {
if (ParseFileName(state.all_files[i], &number, &type)) {
bool keep = true;
switch (type) {
case kLogFile:
@ -633,17 +645,17 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
keep = (number >= state.manifest_file_number);
break;
case kTableFile:
keep = (live_set.find(number) != live_set.end());
keep = (sst_live.find(number) != sst_live.end());
break;
case kTempFile:
// Any temp files that are currently being written to must
// be recorded in pending_outputs_, which is inserted into "live"
keep = (live_set.find(number) != live_set.end());
keep = (sst_live.find(number) != sst_live.end());
break;
case kInfoLogFile:
keep = true;
if (number != 0) {
old_log_files.push_back(state.all_files[i]);
old_log_files.push_back(to_delete);
}
break;
case kCurrentFile:
@ -654,35 +666,35 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
break;
}
if (!keep) {
if (keep) {
continue;
}
if (type == kTableFile) {
// evict from cache
table_cache_->Evict(number);
}
std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) +
"/" + state.all_files[i];
"/" + to_delete;
Log(options_.info_log,
"Delete type=%d #%lu",
int(type),
(unsigned long)number);
Status st;
if (type == kLogFile && (options_.WAL_ttl_seconds > 0 ||
options_.WAL_size_limit_MB > 0)) {
st = env_->RenameFile(fname,
if (type == kLogFile &&
(options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) {
Status s = env_->RenameFile(fname,
ArchivedLogFileName(options_.wal_dir, number));
if (!st.ok()) {
if (!s.ok()) {
Log(options_.info_log,
"RenameFile logfile #%lu FAILED -- %s\n",
(unsigned long)number, st.ToString().c_str());
(unsigned long)number, s.ToString().c_str());
}
} else {
st = env_->DeleteFile(fname);
if (!st.ok()) {
Status s = env_->DeleteFile(fname);
if (!s.ok()) {
Log(options_.info_log, "Delete type=%d #%lu FAILED -- %s\n",
int(type), (unsigned long)number, st.ToString().c_str());
}
}
int(type), (unsigned long)number, s.ToString().c_str());
}
}
}
@ -839,7 +851,9 @@ void DBImpl::PurgeObsoleteWALFiles() {
// If externalTable is set, then apply recovered transactions
// to that table. This is used for readonly mode.
Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
Status DBImpl::Recover(
VersionEdit* edit,
MemTable* external_table,
bool error_if_log_file_exist) {
mutex_.AssertHeld();
@ -906,10 +920,11 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
if (!s.ok()) {
return s;
}
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
uint64_t number;
FileType type;
if (ParseFileName(filenames[i], &number, &type)
&& type == kLogFile
&& ((number >= min_log) || (number == prev_log))) {
@ -925,12 +940,12 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence, external_table);
for (const auto& log : logs) {
s = RecoverLogFile(log, edit, &max_sequence, external_table);
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
versions_->MarkFileNumberUsed(log);
}
if (s.ok()) {
@ -1147,7 +1162,6 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
}
base->Unref();
// re-acquire the most current version
base = versions_->current();
@ -3285,7 +3299,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
} else {
unique_ptr<WritableFile> lfile;
MemTable* memtmp = nullptr;
MemTable* new_mem = nullptr;
// Attempt to switch to a new memtable and trigger compaction of old.
// Do this without holding the dbmutex lock.
@ -3306,7 +3320,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size);
memtmp = new MemTable(
new_mem = new MemTable(
internal_comparator_, mem_rep_factory_, NumberLevels(), options_);
new_superversion = new SuperVersion(options_.max_write_buffer_number);
}
@ -3315,7 +3329,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
assert (!memtmp);
assert (!new_mem);
break;
}
logfile_number_ = new_log_number;
@ -3325,7 +3339,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
if (force) {
imm_.FlushRequested();
}
mem_ = memtmp;
mem_ = new_mem;
mem_->Ref();
Log(options_.info_log,
"New memtable created with log file: #%lu\n",
@ -3806,7 +3820,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
delete impl;
return s;
}
impl->mutex_.Lock();
impl->mutex_.Lock(); // DBImpl::Recover() requires lock being held
VersionEdit edit(impl->NumberLevels());
s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) {
@ -3929,7 +3943,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
//
// A global method that can dump out the build version
void dumpLeveldbBuildVersion(Logger * log) {
void DumpLeveldbBuildVersion(Logger * log) {
Log(log, "Git sha %s", rocksdb_build_git_sha);
Log(log, "Compile time %s %s",
rocksdb_build_compile_time, rocksdb_build_compile_date);

View File

@ -11,6 +11,7 @@
#include <deque>
#include <set>
#include <vector>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
@ -159,7 +160,7 @@ class DBImpl : public DB {
// needed for CleanupIteratorState
struct DeletionState {
inline bool HaveSomethingToDelete() const {
return all_files.size() ||
return candidate_files.size() ||
sst_delete_files.size() ||
log_delete_files.size();
}
@ -167,7 +168,7 @@ class DBImpl : public DB {
// a list of all files that we'll consider deleting
// (every once in a while this is filled up with all files
// in the DB directory)
std::vector<std::string> all_files;
std::vector<std::string> candidate_files;
// the list of all live sst files that cannot be deleted
std::vector<uint64_t> sst_live;
@ -214,7 +215,7 @@ class DBImpl : public DB {
};
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'all_files'.
// of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than
// options_.delete_obsolete_files_period_micros microseconds ago,
// it will not fill up the deletion_state

View File

@ -17,7 +17,6 @@ namespace log {
enum RecordType {
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
// For fragments

View File

@ -74,12 +74,10 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode());
}
for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
iter != deleted_files_.end();
++iter) {
for (const auto& deleted : deleted_files_) {
PutVarint32(dst, kDeletedFile);
PutVarint32(dst, iter->first); // level
PutVarint64(dst, iter->second); // file number
PutVarint32(dst, deleted.first /* level */);
PutVarint64(dst, deleted.second /* file number */);
}
for (size_t i = 0; i < new_files_.size(); i++) {

View File

@ -75,6 +75,7 @@ class VersionEdit {
const InternalKey& largest,
const SequenceNumber& smallest_seqno,
const SequenceNumber& largest_seqno) {
assert(smallest_seqno <= largest_seqno);
FileMetaData f;
f.number = file;
f.file_size = file_size;
@ -82,13 +83,12 @@ class VersionEdit {
f.largest = largest;
f.smallest_seqno = smallest_seqno;
f.largest_seqno = largest_seqno;
assert(smallest_seqno <= largest_seqno);
new_files_.push_back(std::make_pair(level, f));
}
// Delete the specified "file" from the specified "level".
void DeleteFile(int level, uint64_t file) {
deleted_files_.insert(std::make_pair(level, file));
deleted_files_.insert({level, file});
}
// Number of edits

View File

@ -958,6 +958,7 @@ class VersionSet::Builder {
}
}
}
delete[] levels_;
base_->Unref();
}
@ -1043,19 +1044,17 @@ class VersionSet::Builder {
// Delete files
const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
iter != del.end();
++iter) {
const int level = iter->first;
const uint64_t number = iter->second;
for (const auto& del_file : del) {
const auto level = del_file.first;
const auto number = del_file.second;
levels_[level].deleted_files.insert(number);
CheckConsistencyForDeletes(edit, number, level);
}
// Add new files
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
for (const auto& new_file : edit->new_files_) {
const int level = new_file.first;
FileMetaData* f = new FileMetaData(new_file.second);
f->refs = 1;
// We arrange to automatically compact this file after
@ -1088,23 +1087,21 @@ class VersionSet::Builder {
for (int level = 0; level < vset_->NumberLevels(); level++) {
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const std::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
const FileSet* added = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added->size());
for (FileSet::const_iterator added_iter = added->begin();
added_iter != added->end();
++added_iter) {
const auto& base_files = base_->files_[level];
auto base_iter = base_files.begin();
auto base_end = base_files.end();
const auto& added_files = *levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added_files.size());
for (const auto& added : added_files) {
// Add all smaller files listed in base_
for (std::vector<FileMetaData*>::const_iterator bpos
= std::upper_bound(base_iter, base_end, *added_iter, cmp);
for (auto bpos = std::upper_bound(base_iter, base_end, added, cmp);
base_iter != bpos;
++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
MaybeAddFile(v, level, *added_iter);
MaybeAddFile(v, level, added);
}
// Add remaining base files
@ -1120,7 +1117,7 @@ class VersionSet::Builder {
if (levels_[level].deleted_files.count(f->number) > 0) {
// File is deleted: do nothing
} else {
std::vector<FileMetaData*>* files = &v->files_[level];
auto* files = &v->files_[level];
if (level > 0 && !files->empty()) {
// Must not overlap
assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
@ -1210,7 +1207,9 @@ void VersionSet::AppendVersion(Version* v) {
v->next_->prev_ = v;
}
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
Status VersionSet::LogAndApply(
VersionEdit* edit,
port::Mutex* mu,
bool new_descriptor_log) {
mu->AssertHeld();
@ -1232,17 +1231,16 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
ManifestWriter* last_writer = &w;
assert(!manifest_writers_.empty());
assert(manifest_writers_.front() == &w);
std::deque<ManifestWriter*>::iterator iter = manifest_writers_.begin();
for (; iter != manifest_writers_.end(); ++iter) {
last_writer = *iter;
LogAndApplyHelper(&builder, v, last_writer->edit, mu);
batch_edits.push_back(last_writer->edit);
for (const auto& writer : manifest_writers_) {
last_writer = writer;
LogAndApplyHelper(&builder, v, writer->edit, mu);
batch_edits.push_back(writer->edit);
}
builder.SaveTo(v);
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
std::string new_manifest_filename;
uint64_t new_manifest_file_size = 0;
Status s;
// we will need this if we are creating new manifest
@ -1256,7 +1254,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
}
if (new_descriptor_log) {
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
new_manifest_filename = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
}
@ -1271,9 +1269,10 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
// This is fine because everything inside of this block is serialized --
// only one thread can be here at the same time
if (!new_manifest_file.empty()) {
if (!new_manifest_filename.empty()) {
unique_ptr<WritableFile> descriptor_file;
s = env_->NewWritableFile(new_manifest_file, &descriptor_file,
s = env_->NewWritableFile(new_manifest_filename,
&descriptor_file,
storage_options_);
if (s.ok()) {
descriptor_log_.reset(new log::Writer(std::move(descriptor_file)));
@ -1321,7 +1320,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
if (s.ok() && !new_manifest_filename.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
if (s.ok() && old_manifest_file_number < manifest_file_number_) {
// delete old manifest file
@ -1356,9 +1355,9 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
Log(options_->info_log, "Error in committing version %lu",
(unsigned long)v->GetVersionNumber());
delete v;
if (!new_manifest_file.empty()) {
if (!new_manifest_filename.empty()) {
descriptor_log_.reset();
env_->DeleteFile(new_manifest_file);
env_->DeleteFile(new_manifest_filename);
}
}
@ -1410,27 +1409,33 @@ Status VersionSet::Recover() {
};
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
std::string manifest_filename;
Status s = ReadFileToString(
env_, CurrentFileName(dbname_), &manifest_filename
);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size()-1] != '\n') {
if (manifest_filename.empty() ||
manifest_filename.back() != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
// remove the trailing '\n'
manifest_filename.resize(manifest_filename.size() - 1);
Log(options_->info_log, "Recovering from manifest file:%s\n",
current.c_str());
manifest_filename.c_str());
std::string dscname = dbname_ + "/" + current;
unique_ptr<SequentialFile> file;
s = env_->NewSequentialFile(dscname, &file, storage_options_);
manifest_filename = dbname_ + "/" + manifest_filename;
unique_ptr<SequentialFile> manifest_file;
s = env_->NewSequentialFile(
manifest_filename, &manifest_file, storage_options_
);
if (!s.ok()) {
return s;
}
uint64_t manifest_file_size;
s = env_->GetFileSize(dscname, &manifest_file_size);
s = env_->GetFileSize(manifest_filename, &manifest_file_size);
if (!s.ok()) {
return s;
}
@ -1448,7 +1453,7 @@ Status VersionSet::Recover() {
{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(std::move(file), &reporter, true/*checksum*/,
log::Reader reader(std::move(manifest_file), &reporter, true /*checksum*/,
0 /*initial_offset*/);
Slice record;
std::string scratch;
@ -1489,7 +1494,6 @@ Status VersionSet::Recover() {
}
}
}
file.reset();
if (s.ok()) {
if (!have_next_file) {
@ -1529,7 +1533,7 @@ Status VersionSet::Recover() {
"manifest_file_number is %lu, next_file_number is %lu, "
"last_sequence is %lu, log_number is %lu,"
"prev_log_number is %lu\n",
current.c_str(),
manifest_filename.c_str(),
(unsigned long)manifest_file_number_,
(unsigned long)next_file_number_,
(unsigned long)last_sequence_,
@ -1844,9 +1848,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// Save files
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = current_->files_[level];
const auto& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
const auto f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
}

View File

@ -78,8 +78,8 @@ class Version {
};
void Get(const ReadOptions&, const LookupKey& key, std::string* val,
Status* status, MergeContext* merge_context,
GetStats* stats, const Options& db_option, bool* value_found =
nullptr);
GetStats* stats, const Options& db_option,
bool* value_found = nullptr);
// Adds "stats" into the current state. Returns true if a new
// compaction may need to be triggered, false otherwise.