VersionEdit not to take NumLevels()

Summary:
I will submit a sequence of diffs that are preparing master branch for column families. There are a lot of implicit assumptions in the code that are making column family implementation hard. If I make the change only in column family branch, it will make merging back to master impossible.

Most of the diffs will be simple code refactorings, so I hope we can have fast turnaround time. Feel free to grab me in person to discuss any of them.

This diff removes number of level check from VersionEdit. It is used only when VersionEdit is read, not written, but has to be set when it is written. I believe it is a right thing to make VersionEdit dumb and check consistency on the caller side. This will also make it much easier to implement Column Families, since different column families can have different number of levels.

Test Plan: make check

Reviewers: dhruba, haobo, sdong, kailiu

Reviewed By: kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15159
This commit is contained in:
Igor Canadi 2014-01-14 15:27:09 -08:00
parent 7d9f21cf23
commit 055e6df45b
11 changed files with 66 additions and 79 deletions

View File

@ -252,8 +252,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
: env_(options.env), : env_(options.env),
dbname_(dbname), dbname_(dbname),
internal_comparator_(options.comparator), internal_comparator_(options.comparator),
options_(SanitizeOptions( options_(SanitizeOptions(dbname, &internal_comparator_,
dbname, &internal_comparator_, &internal_filter_policy_, options)), &internal_filter_policy_, options)),
internal_filter_policy_(options.filter_policy), internal_filter_policy_(options.filter_policy),
owns_info_log_(options_.info_log != options.info_log), owns_info_log_(options_.info_log != options.info_log),
db_lock_(nullptr), db_lock_(nullptr),
@ -261,8 +261,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
shutting_down_(nullptr), shutting_down_(nullptr),
bg_cv_(&mutex_), bg_cv_(&mutex_),
mem_rep_factory_(options_.memtable_factory.get()), mem_rep_factory_(options_.memtable_factory.get()),
mem_(new MemTable(internal_comparator_, mem_rep_factory_, mem_(new MemTable(internal_comparator_, options_)),
NumberLevels(), options_)),
logfile_number_(0), logfile_number_(0),
super_version_(nullptr), super_version_(nullptr),
tmp_batch_(), tmp_batch_(),
@ -408,7 +407,7 @@ uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
} }
Status DBImpl::NewDB() { Status DBImpl::NewDB() {
VersionEdit new_db(NumberLevels()); VersionEdit new_db;
new_db.SetComparatorName(user_comparator()->Name()); new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(0); new_db.SetLogNumber(0);
new_db.SetNextFile(2); new_db.SetNextFile(2);
@ -864,7 +863,7 @@ void DBImpl::PurgeObsoleteWALFiles() {
// If externalTable is set, then apply recovered transactions // If externalTable is set, then apply recovered transactions
// to that table. This is used for readonly mode. // to that table. This is used for readonly mode.
Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
bool error_if_log_file_exist) { bool error_if_log_file_exist) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(db_lock_ == nullptr); assert(db_lock_ == nullptr);
@ -1031,8 +1030,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) { if (mem == nullptr) {
mem = new MemTable(internal_comparator_, mem_rep_factory_, mem = new MemTable(internal_comparator_, options_);
NumberLevels(), options_);
mem->Ref(); mem->Ref();
} }
status = WriteBatchInternal::InsertInto(&batch, mem, &options_); status = WriteBatchInternal::InsertInto(&batch, mem, &options_);
@ -1358,7 +1356,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
Log(options_.info_log, "Before refitting:\n%s", Log(options_.info_log, "Before refitting:\n%s",
versions_->current()->DebugString().data()); versions_->current()->DebugString().data());
VersionEdit edit(NumberLevels()); VersionEdit edit;
for (const auto& f : versions_->current()->files_[level]) { for (const auto& f : versions_->current()->files_[level]) {
edit.DeleteFile(level, f->number); edit.DeleteFile(level, f->number);
edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest, edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest,
@ -3289,17 +3287,13 @@ Status DBImpl::MakeRoomForWrite(bool force,
EnvOptions soptions(storage_options_); EnvOptions soptions(storage_options_);
soptions.use_mmap_writes = false; soptions.use_mmap_writes = false;
DelayLoggingAndReset(); DelayLoggingAndReset();
s = env_->NewWritableFile( s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number),
LogFileName(options_.wal_dir, new_log_number), &lfile, soptions);
&lfile,
soptions
);
if (s.ok()) { if (s.ok()) {
// Our final size should be less than write_buffer_size // Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution. // (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size);
memtmp = new MemTable( memtmp = new MemTable(internal_comparator_, options_);
internal_comparator_, mem_rep_factory_, NumberLevels(), options_);
new_superversion = new SuperVersion(options_.max_write_buffer_number); new_superversion = new SuperVersion(options_.max_write_buffer_number);
} }
} }
@ -3680,7 +3674,7 @@ Status DBImpl::DeleteFile(std::string name) {
int level; int level;
FileMetaData metadata; FileMetaData metadata;
int maxlevel = NumberLevels(); int maxlevel = NumberLevels();
VersionEdit edit(maxlevel); VersionEdit edit;
DeletionState deletion_state(0, true); DeletionState deletion_state(0, true);
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
@ -3802,7 +3796,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
return s; return s;
} }
impl->mutex_.Lock(); impl->mutex_.Lock();
VersionEdit edit(impl->NumberLevels()); VersionEdit edit;
s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) { if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber(); uint64_t new_log_number = impl->versions_->NewFileNumber();

View File

@ -86,7 +86,7 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DBImplReadOnly* impl = new DBImplReadOnly(options, dbname); DBImplReadOnly* impl = new DBImplReadOnly(options, dbname);
impl->mutex_.Lock(); impl->mutex_.Lock();
VersionEdit edit(impl->NumberLevels()); VersionEdit edit;
Status s = impl->Recover(&edit, impl->GetMemTable(), Status s = impl->Recover(&edit, impl->GetMemTable(),
error_if_log_file_exist); error_if_log_file_exist);
impl->mutex_.Unlock(); impl->mutex_.Unlock();

View File

@ -765,10 +765,9 @@ TEST(DBTest, LevelLimitReopen) {
options.num_levels = 1; options.num_levels = 1;
options.max_bytes_for_level_multiplier_additional.resize(1, 1); options.max_bytes_for_level_multiplier_additional.resize(1, 1);
Status s = TryReopen(&options); Status s = TryReopen(&options);
ASSERT_EQ(s.IsCorruption(), true); ASSERT_EQ(s.IsInvalidArgument(), true);
ASSERT_EQ(s.ToString(), ASSERT_EQ(s.ToString(),
"Corruption: VersionEdit: db already has " "Invalid argument: db has more levels than options.num_levels");
"more levels than options.num_levels");
options.num_levels = 10; options.num_levels = 10;
options.max_bytes_for_level_multiplier_additional.resize(10, 1); options.max_bytes_for_level_multiplier_additional.resize(10, 1);
@ -4936,7 +4935,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
EnvOptions sopt; EnvOptions sopt;
VersionSet vset(dbname, &options, sopt, nullptr, &cmp); VersionSet vset(dbname, &options, sopt, nullptr, &cmp);
ASSERT_OK(vset.Recover()); ASSERT_OK(vset.Recover());
VersionEdit vbase(vset.NumberLevels()); VersionEdit vbase;
uint64_t fnum = 1; uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) { for (int i = 0; i < num_base_files; i++) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
@ -4948,7 +4947,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
uint64_t start_micros = env->NowMicros(); uint64_t start_micros = env->NowMicros();
for (int i = 0; i < iters; i++) { for (int i = 0; i < iters; i++) {
VersionEdit vedit(vset.NumberLevels()); VersionEdit vedit;
vedit.DeleteFile(2, fnum); vedit.DeleteFile(2, fnum);
InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);

View File

@ -33,24 +33,20 @@ struct hash<rocksdb::Slice> {
namespace rocksdb { namespace rocksdb {
MemTable::MemTable(const InternalKeyComparator& cmp, MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
MemTableRepFactory* table_factory,
int numlevel,
const Options& options)
: comparator_(cmp), : comparator_(cmp),
refs_(0), refs_(0),
arena_impl_(options.arena_block_size), arena_impl_(options.arena_block_size),
table_(table_factory->CreateMemTableRep(comparator_, &arena_impl_)), table_(options.memtable_factory->CreateMemTableRep(comparator_,
&arena_impl_)),
flush_in_progress_(false), flush_in_progress_(false),
flush_completed_(false), flush_completed_(false),
file_number_(0), file_number_(0),
edit_(numlevel),
first_seqno_(0), first_seqno_(0),
mem_next_logfile_number_(0), mem_next_logfile_number_(0),
mem_logfile_number_(0), mem_logfile_number_(0),
locks_(options.inplace_update_support locks_(options.inplace_update_support ? options.inplace_update_num_locks
? options.inplace_update_num_locks : 0) {}
: 0) { }
MemTable::~MemTable() { MemTable::~MemTable() {
assert(refs_ == 0); assert(refs_ == 0);
@ -58,7 +54,7 @@ MemTable::~MemTable() {
size_t MemTable::ApproximateMemoryUsage() { size_t MemTable::ApproximateMemoryUsage() {
return arena_impl_.ApproximateMemoryUsage() + return arena_impl_.ApproximateMemoryUsage() +
table_->ApproximateMemoryUsage(); table_->ApproximateMemoryUsage();
} }
int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)

View File

@ -34,11 +34,8 @@ class MemTable {
// MemTables are reference counted. The initial reference count // MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once. // is zero and the caller must call Ref() at least once.
explicit MemTable( explicit MemTable(const InternalKeyComparator& comparator,
const InternalKeyComparator& comparator, const Options& options = Options());
MemTableRepFactory* table_factory,
int numlevel = 7,
const Options& options = Options());
~MemTable(); ~MemTable();

View File

@ -58,7 +58,7 @@ class Repairer {
next_file_number_(1) { next_file_number_(1) {
// TableCache can be small since we expect each table to be opened once. // TableCache can be small since we expect each table to be opened once.
table_cache_ = new TableCache(dbname_, &options_, storage_options_, 10); table_cache_ = new TableCache(dbname_, &options_, storage_options_, 10);
edit_ = new VersionEdit(options.num_levels); edit_ = new VersionEdit();
} }
~Repairer() { ~Repairer() {
@ -196,8 +196,7 @@ class Repairer {
std::string scratch; std::string scratch;
Slice record; Slice record;
WriteBatch batch; WriteBatch batch;
MemTable* mem = new MemTable(icmp_, options_.memtable_factory.get(), MemTable* mem = new MemTable(icmp_, options_);
options_.num_levels);
mem->Ref(); mem->Ref();
int counter = 0; int counter = 0;
while (reader.ReadRecord(&record, &scratch)) { while (reader.ReadRecord(&record, &scratch)) {

View File

@ -33,6 +33,7 @@ enum Tag {
void VersionEdit::Clear() { void VersionEdit::Clear() {
comparator_.clear(); comparator_.clear();
max_level_ = 0;
log_number_ = 0; log_number_ = 0;
prev_log_number_ = 0; prev_log_number_ = 0;
last_sequence_ = 0; last_sequence_ = 0;
@ -107,14 +108,13 @@ static bool GetInternalKey(Slice* input, InternalKey* dst) {
bool VersionEdit::GetLevel(Slice* input, int* level, const char** msg) { bool VersionEdit::GetLevel(Slice* input, int* level, const char** msg) {
uint32_t v; uint32_t v;
if (GetVarint32(input, &v) && if (GetVarint32(input, &v)) {
(int)v < number_levels_) {
*level = v; *level = v;
if (max_level_ < *level) {
max_level_ = *level;
}
return true; return true;
} else { } else {
if ((int)v >= number_levels_) {
*msg = "db already has more levels than options.num_levels";
}
return false; return false;
} }
} }

View File

@ -34,10 +34,7 @@ struct FileMetaData {
class VersionEdit { class VersionEdit {
public: public:
explicit VersionEdit(int number_levels) : VersionEdit() { Clear(); }
number_levels_(number_levels) {
Clear();
}
~VersionEdit() { } ~VersionEdit() { }
void Clear(); void Clear();
@ -108,7 +105,7 @@ class VersionEdit {
bool GetLevel(Slice* input, int* level, const char** msg); bool GetLevel(Slice* input, int* level, const char** msg);
int number_levels_; int max_level_;
std::string comparator_; std::string comparator_;
uint64_t log_number_; uint64_t log_number_;
uint64_t prev_log_number_; uint64_t prev_log_number_;
@ -120,9 +117,9 @@ class VersionEdit {
bool has_next_file_number_; bool has_next_file_number_;
bool has_last_sequence_; bool has_last_sequence_;
std::vector< std::pair<int, InternalKey> > compact_pointers_; std::vector<std::pair<int, InternalKey> > compact_pointers_;
DeletedFileSet deleted_files_; DeletedFileSet deleted_files_;
std::vector< std::pair<int, FileMetaData> > new_files_; std::vector<std::pair<int, FileMetaData> > new_files_;
}; };
} // namespace rocksdb } // namespace rocksdb

View File

@ -15,7 +15,7 @@ namespace rocksdb {
static void TestEncodeDecode(const VersionEdit& edit) { static void TestEncodeDecode(const VersionEdit& edit) {
std::string encoded, encoded2; std::string encoded, encoded2;
edit.EncodeTo(&encoded); edit.EncodeTo(&encoded);
VersionEdit parsed(7); VersionEdit parsed();
Status s = parsed.DecodeFrom(encoded); Status s = parsed.DecodeFrom(encoded);
ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_TRUE(s.ok()) << s.ToString();
parsed.EncodeTo(&encoded2); parsed.EncodeTo(&encoded2);
@ -27,7 +27,7 @@ class VersionEditTest { };
TEST(VersionEditTest, EncodeDecode) { TEST(VersionEditTest, EncodeDecode) {
static const uint64_t kBig = 1ull << 50; static const uint64_t kBig = 1ull << 50;
VersionEdit edit(7); VersionEdit edit();
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
TestEncodeDecode(edit); TestEncodeDecode(edit);
edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, edit.AddFile(3, kBig + 300 + i, kBig + 400 + i,

View File

@ -980,14 +980,12 @@ class VersionSet::Builder {
#endif #endif
} }
void CheckConsistencyForDeletes( void CheckConsistencyForDeletes(VersionEdit* edit, unsigned int number,
VersionEdit* edit, int level) {
unsigned int number,
int level) {
#ifndef NDEBUG #ifndef NDEBUG
// a file to be deleted better exist in the previous version // a file to be deleted better exist in the previous version
bool found = false; bool found = false;
for (int l = 0; !found && l < edit->number_levels_; l++) { for (int l = 0; !found && l < vset_->NumberLevels(); l++) {
const std::vector<FileMetaData*>& base_files = base_->files_[l]; const std::vector<FileMetaData*>& base_files = base_->files_[l];
for (unsigned int i = 0; i < base_files.size(); i++) { for (unsigned int i = 0; i < base_files.size(); i++) {
FileMetaData* f = base_files[i]; FileMetaData* f = base_files[i];
@ -1000,7 +998,7 @@ class VersionSet::Builder {
// if the file did not exist in the previous version, then it // if the file did not exist in the previous version, then it
// is possibly moved from lower level to higher level in current // is possibly moved from lower level to higher level in current
// version // version
for (int l = level+1; !found && l < edit->number_levels_; l++) { for (int l = level+1; !found && l < vset_->NumberLevels(); l++) {
const FileSet* added = levels_[l].added_files; const FileSet* added = levels_[l].added_files;
for (FileSet::const_iterator added_iter = added->begin(); for (FileSet::const_iterator added_iter = added->begin();
added_iter != added->end(); ++added_iter) { added_iter != added->end(); ++added_iter) {
@ -1213,7 +1211,7 @@ void VersionSet::AppendVersion(Version* v) {
} }
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
bool new_descriptor_log) { bool new_descriptor_log) {
mu->AssertHeld(); mu->AssertHeld();
// queue our request // queue our request
@ -1383,7 +1381,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
} }
void VersionSet::LogAndApplyHelper(Builder* builder, Version* v, void VersionSet::LogAndApplyHelper(Builder* builder, Version* v,
VersionEdit* edit, port::Mutex* mu) { VersionEdit* edit, port::Mutex* mu) {
mu->AssertHeld(); mu->AssertHeld();
if (edit->has_log_number_) { if (edit->has_log_number_) {
@ -1455,21 +1453,28 @@ Status VersionSet::Recover() {
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit(NumberLevels()); VersionEdit edit;
s = edit.DecodeFrom(record); s = edit.DecodeFrom(record);
if (s.ok()) { if (!s.ok()) {
if (edit.has_comparator_ && break;
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(icmp_.user_comparator()->Name(),
"does not match existing comparator " +
edit.comparator_);
}
} }
if (s.ok()) { if (edit.max_level_ >= NumberLevels()) {
builder.Apply(&edit); s = Status::InvalidArgument(
"db has more levels than options.num_levels");
break;
} }
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(icmp_.user_comparator()->Name(),
"does not match existing comparator " +
edit.comparator_);
break;
}
builder.Apply(&edit);
if (edit.has_log_number_) { if (edit.has_log_number_) {
log_number = edit.log_number_; log_number = edit.log_number_;
have_log_number = true; have_log_number = true;
@ -1577,7 +1582,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
Slice record; Slice record;
std::string scratch; std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit(NumberLevels()); VersionEdit edit;
s = edit.DecodeFrom(record); s = edit.DecodeFrom(record);
if (s.ok()) { if (s.ok()) {
if (edit.has_comparator_ && if (edit.has_comparator_ &&
@ -1832,7 +1837,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery? // TODO: Break up into multiple records to reduce memory usage on recovery?
// Save metadata // Save metadata
VersionEdit edit(NumberLevels()); VersionEdit edit;
edit.SetComparatorName(icmp_.user_comparator()->Name()); edit.SetComparatorName(icmp_.user_comparator()->Name());
// Save compaction pointers // Save compaction pointers
@ -2994,7 +2999,7 @@ Compaction::Compaction(int level, int out_level, uint64_t target_file_size,
bottommost_level_(false), bottommost_level_(false),
is_full_compaction_(false), is_full_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels)) { level_ptrs_(std::vector<size_t>(number_levels)) {
edit_ = new VersionEdit(number_levels_); edit_ = new VersionEdit();
for (int i = 0; i < number_levels_; i++) { for (int i = 0; i < number_levels_; i++) {
level_ptrs_[i] = 0; level_ptrs_[i] = 0;
} }

View File

@ -72,8 +72,8 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
num_levels_ = new_levels; num_levels_ = new_levels;
compact_pointer_ = new std::string[new_levels]; compact_pointer_ = new std::string[new_levels];
Init(new_levels); Init(new_levels);
VersionEdit ve(new_levels); VersionEdit ve;
st = LogAndApply(&ve , mu, true); st = LogAndApply(&ve, mu, true);
return st; return st;
} }