diff --git a/db/db_bench.cc b/db/db_bench.cc index 623c72ecb..3f38b3716 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -87,6 +87,16 @@ static bool FLAGS_histogram = false; // (initialized to default value by "main") static int FLAGS_write_buffer_size = 0; +// The number of in-memory memtables. +// Each memtable is of size FLAGS_write_buffer_size. +// This is initialized to default value of 2 in "main" function. +static int FLAGS_max_write_buffer_number = 0; + +// The maximum number of concurrent background compactions +// that can occur in parallel. +// This is initialized to default value of 1 in "main" function. +static int FLAGS_max_background_compactions = 0; + // Number of bytes to use as a cache of uncompressed data. // Negative means use default settings. static long FLAGS_cache_size = -1; @@ -159,6 +169,9 @@ static int FLAGS_level0_stop_writes_trigger = 12; // Number of files in level-0 that will slow down writes. static int FLAGS_level0_slowdown_writes_trigger = 8; +// Number of files in level-0 when compactions start +static int FLAGS_level0_file_num_compaction_trigger = 4; + // Ratio of reads to writes (expressed as a percentage) // for the ReadRandomWriteRandom workload. The default // setting is 9 gets for every 1 put. @@ -326,7 +339,8 @@ class Stats { } else { double now = FLAGS_env->NowMicros(); fprintf(stderr, - "... thread %d: %ld ops in %.6f seconds and %.2f ops/sec\n", + "%s thread %d: %ld ops in %.6f seconds and %.2f ops/sec\n", + FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(), id_, done_ - last_report_done_, (now - last_report_finish_) / 1000000.0, @@ -873,6 +887,8 @@ class Benchmark { options.create_if_missing = !FLAGS_use_existing_db; options.block_cache = cache_; options.write_buffer_size = FLAGS_write_buffer_size; + options.max_write_buffer_number = FLAGS_max_write_buffer_number; + options.max_background_compactions = FLAGS_max_background_compactions; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -887,6 +903,8 @@ class Benchmark { options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger; + options.level0_file_num_compaction_trigger = + FLAGS_level0_file_num_compaction_trigger; options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type; @@ -1172,7 +1190,10 @@ class Benchmark { int main(int argc, char** argv) { FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; + FLAGS_max_write_buffer_number = leveldb::Options().max_write_buffer_number; FLAGS_open_files = leveldb::Options().max_open_files; + FLAGS_max_background_compactions = + leveldb::Options().max_background_compactions; // Compression test code above refers to FLAGS_block_size FLAGS_block_size = leveldb::Options().block_size; std::string default_db_path; @@ -1203,6 +1224,10 @@ int main(int argc, char** argv) { FLAGS_value_size = n; } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { FLAGS_write_buffer_size = n; + } else if (sscanf(argv[i], "--max_write_buffer_number=%d%c", &n, &junk) == 1) { + FLAGS_max_write_buffer_number = n; + } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { + FLAGS_max_background_compactions = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { @@ -1281,6 +1306,9 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i],"--level0_slowdown_writes_trigger=%d%c", &n, &junk) == 1) { FLAGS_level0_slowdown_writes_trigger = n; + } else if (sscanf(argv[i],"--level0_file_num_compaction_trigger=%d%c", + &n, &junk) == 1) { + FLAGS_level0_file_num_compaction_trigger = n; } else if (strncmp(argv[i], "--compression_type=", 19) == 0) { const char* ctype = argv[i] + 19; if (!strcasecmp(ctype, "none")) @@ -1309,6 +1337,10 @@ int main(int argc, char** argv) { } } + // The number of background threads should be at least as much the + // max number of concurrent compactions. + FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); + // Choose a location for the test database if none given with --db= if (FLAGS_db == NULL) { leveldb::Env::Default()->GetTestDirectory(&default_db_path); diff --git a/db/db_impl.cc b/db/db_impl.cc index 07ff09688..530d25a7a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -18,6 +18,7 @@ #include "db/log_reader.h" #include "db/log_writer.h" #include "db/memtable.h" +#include "db/memtablelist.h" #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" @@ -67,6 +68,7 @@ struct DBImpl::CompactionState { InternalKey smallest, largest; }; std::vector outputs; + std::list allocated_file_numbers; // State kept for output being generated WritableFile* outfile; @@ -133,20 +135,19 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) db_lock_(NULL), shutting_down_(NULL), bg_cv_(&mutex_), - mem_(new MemTable(internal_comparator_)), - imm_(NULL), + mem_(new MemTable(internal_comparator_, NumberLevels())), logfile_(NULL), logfile_number_(0), log_(NULL), tmp_batch_(new WriteBatch), - bg_compaction_scheduled_(false), + bg_compaction_scheduled_(0), bg_logstats_scheduled_(false), manual_compaction_(NULL), logger_(NULL), disable_delete_obsolete_files_(false), - delete_obsolete_files_last_run_(0) { + delete_obsolete_files_last_run_(0), + delayed_writes_(0) { mem_->Ref(); - has_imm_.Release_Store(NULL); env_->GetAbsolutePath(dbname, &db_absolute_path_); stats_ = new CompactionStats[options.num_levels]; @@ -190,7 +191,7 @@ DBImpl::~DBImpl() { delete versions_; if (mem_ != NULL) mem_->Unref(); - if (imm_ != NULL) imm_->Unref(); + imm_.UnrefAll(); delete tmp_batch_; delete log_; delete logfile_; @@ -476,7 +477,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, WriteBatchInternal::SetContents(&batch, record); if (mem == NULL) { - mem = new MemTable(internal_comparator_); + mem = new MemTable(internal_comparator_, NumberLevels()); mem->Ref(); } status = WriteBatchInternal::InsertInto(&batch, mem); @@ -492,7 +493,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, } if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { - status = WriteLevel0Table(mem, edit, NULL); + status = WriteLevel0TableForRecovery(mem, edit); if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. @@ -504,7 +505,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, } if (status.ok() && mem != NULL) { - status = WriteLevel0Table(mem, edit, NULL); + status = WriteLevel0TableForRecovery(mem, edit); // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. } @@ -514,8 +515,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, return status; } -Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, - Version* base) { +Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; @@ -537,8 +537,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, (unsigned long long) meta.file_size, s.ToString().c_str()); delete iter; - pending_outputs_.erase(meta.number); + pending_outputs_.erase(meta.number); // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. @@ -546,7 +546,69 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, if (s.ok() && meta.file_size > 0) { const Slice min_user_key = meta.smallest.user_key(); const Slice max_user_key = meta.largest.user_key(); - if (base != NULL) { + edit->AddFile(level, meta.number, meta.file_size, + meta.smallest, meta.largest); + } + + CompactionStats stats; + stats.micros = env_->NowMicros() - start_micros; + stats.bytes_written = meta.file_size; + stats_[level].Add(stats); + return s; +} + + +Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, + uint64_t* filenumber) { + mutex_.AssertHeld(); + const uint64_t start_micros = env_->NowMicros(); + FileMetaData meta; + meta.number = versions_->NewFileNumber(); + *filenumber = meta.number; + pending_outputs_.insert(meta.number); + Iterator* iter = mem->NewIterator(); + Log(options_.info_log, "Level-0 flush table #%llu: started", + (unsigned long long) meta.number); + + Version* base = versions_->current(); + base->Ref(); + Status s; + { + mutex_.Unlock(); + s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); + mutex_.Lock(); + } + base->Unref(); + + Log(options_.info_log, "Level-0 flush table #%llu: %lld bytes %s", + (unsigned long long) meta.number, + (unsigned long long) meta.file_size, + s.ToString().c_str()); + delete iter; + + // re-acquire the most current version + base = versions_->current(); + + // There could be multiple threads writing to its own level-0 file. + // The pending_outputs cannot be cleared here, otherwise this newly + // created file might not be considered as a live-file by another + // compaction thread that is concurrently deleting obselete files. + // The pending_outputs can be cleared only after the new version is + // committed so that other threads can recognize this file as a + // valid one. + // pending_outputs_.erase(meta.number); + + // Note that if file_size is zero, the file has been deleted and + // should not be added to the manifest. + int level = 0; + if (s.ok() && meta.file_size > 0) { + const Slice min_user_key = meta.smallest.user_key(); + const Slice max_user_key = meta.largest.user_key(); + // if we have more than 1 background thread, then we cannot + // insert files directly into higher levels because some other + // threads could be concurrently producing compacted files for + // that key range. + if (base != NULL && options_.max_background_compactions <= 1) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, @@ -560,37 +622,48 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, return s; } -Status DBImpl::CompactMemTable() { +Status DBImpl::CompactMemTable(bool* madeProgress) { mutex_.AssertHeld(); - assert(imm_ != NULL); + assert(imm_.size() != 0); - // Save the contents of the memtable as a new Table - VersionEdit edit(NumberLevels()); - Version* base = versions_->current(); - base->Ref(); - Status s = WriteLevel0Table(imm_, &edit, base); - base->Unref(); + if (!imm_.IsFlushPending()) { + Log(options_.info_log, "Memcompaction already in progress"); + Status s = Status::IOError("Memcompaction already in progress"); + return s; + } + + // Save the contents of the earliest memtable as a new Table + // This will release and re-acquire the mutex. + uint64_t file_number; + MemTable* m = imm_.PickMemtableToFlush(); + if (m == NULL) { + Log(options_.info_log, "Nothing in memstore to flush"); + Status s = Status::IOError("Nothing in memstore to flush"); + return s; + } + + // record the logfile_number_ before we release the mutex + VersionEdit* edit = m->GetEdits(); + edit->SetPrevLogNumber(0); + edit->SetLogNumber(logfile_number_); // Earlier logs no longer needed + + Status s = WriteLevel0Table(m, edit, &file_number); if (s.ok() && shutting_down_.Acquire_Load()) { s = Status::IOError("Deleting DB during memtable compaction"); } // Replace immutable memtable with the generated Table - if (s.ok()) { - edit.SetPrevLogNumber(0); - edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed - s = versions_->LogAndApply(&edit, &mutex_); - } + s = imm_.InstallMemtableFlushResults(m, versions_, s, &mutex_, + options_.info_log, file_number, pending_outputs_); if (s.ok()) { - // Commit to the new state - imm_->Unref(); - imm_ = NULL; - has_imm_.Release_Store(NULL); + if (madeProgress) { + *madeProgress = 1; + } DeleteObsoleteFiles(); MaybeScheduleLogDBDeployStats(); } - return s; } @@ -636,6 +709,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { ManualCompaction manual; manual.level = level; manual.done = false; + manual.in_progress = false; if (begin == NULL) { manual.begin = NULL; } else { @@ -650,16 +724,46 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { } MutexLock l(&mutex_); + + // When a manual compaction arrives, temporarily throttle down + // the number of background compaction threads to 1. This is + // needed to ensure that this manual compaction can compact + // any range of keys/files. We artificialy increase + // bg_compaction_scheduled_ by a large number, this causes + // the system to have a single background thread. Now, + // this manual compaction can progress without stomping + // on any other concurrent compactions. + const int LargeNumber = 10000000; + const int newvalue = options_.max_background_compactions-1; + bg_compaction_scheduled_ += LargeNumber; + while (bg_compaction_scheduled_ > LargeNumber) { + Log(options_.info_log, "Manual compaction request waiting for background threads to fall below 1"); + bg_cv_.Wait(); + } + Log(options_.info_log, "Manual compaction starting"); + while (!manual.done) { while (manual_compaction_ != NULL) { bg_cv_.Wait(); } manual_compaction_ = &manual; + if (bg_compaction_scheduled_ == LargeNumber) { + bg_compaction_scheduled_ = newvalue; + } MaybeScheduleCompaction(); while (manual_compaction_ == &manual) { bg_cv_.Wait(); } } + assert(!manual.in_progress); + + // wait till there are no background threads scheduled + bg_compaction_scheduled_ += LargeNumber; + while (bg_compaction_scheduled_ > LargeNumber + newvalue) { + Log(options_.info_log, "Manual compaction resetting background threads"); + bg_cv_.Wait(); + } + bg_compaction_scheduled_ = 0; } Status DBImpl::FlushMemTable(const FlushOptions& options) { @@ -676,10 +780,10 @@ Status DBImpl::WaitForCompactMemTable() { Status s; // Wait until the compaction completes MutexLock l(&mutex_); - while (imm_ != NULL && bg_error_.ok()) { + while (imm_.size() > 0 && bg_error_.ok()) { bg_cv_.Wait(); } - if (imm_ != NULL) { + if (imm_.size() != 0) { s = bg_error_; } return s; @@ -704,16 +808,16 @@ Status DBImpl::TEST_WaitForCompact() { void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); - if (bg_compaction_scheduled_) { + if (bg_compaction_scheduled_ >= options_.max_background_compactions) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions - } else if (imm_ == NULL && + } else if (!imm_.IsFlushPending() && manual_compaction_ == NULL && !versions_->NeedsCompaction()) { // No work to be done } else { - bg_compaction_scheduled_ = true; + bg_compaction_scheduled_++; env_->Schedule(&DBImpl::BGWork, this); } } @@ -723,10 +827,12 @@ void DBImpl::BGWork(void* db) { } void DBImpl::BackgroundCall() { + bool madeProgress; MutexLock l(&mutex_); + // Log(options_.info_log, "XXX BG Thread %llx process new work item", pthread_self()); assert(bg_compaction_scheduled_); if (!shutting_down_.Acquire_Load()) { - Status s = BackgroundCompaction(); + Status s = BackgroundCompaction(&madeProgress); if (!s.ok()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to @@ -741,28 +847,41 @@ void DBImpl::BackgroundCall() { } } - bg_compaction_scheduled_ = false; + bg_compaction_scheduled_--; MaybeScheduleLogDBDeployStats(); // Previous compaction may have produced too many files in a level, - // so reschedule another compaction if needed. - MaybeScheduleCompaction(); + // So reschedule another compaction if we made progress in the + // last compaction. + if (madeProgress) { + MaybeScheduleCompaction(); + } bg_cv_.SignalAll(); } -Status DBImpl::BackgroundCompaction() { +Status DBImpl::BackgroundCompaction(bool* madeProgress) { + *madeProgress = false; mutex_.AssertHeld(); - if (imm_ != NULL) { - return CompactMemTable(); + while (imm_.IsFlushPending()) { + Log(options_.info_log, + "BackgroundCompaction doing CompactMemTable, compaction slots available %d", + options_.max_background_compactions - bg_compaction_scheduled_); + Status stat = CompactMemTable(madeProgress); + if (!stat.ok()) { + return stat; + } } Compaction* c; - bool is_manual = (manual_compaction_ != NULL); + bool is_manual = (manual_compaction_ != NULL) && + (manual_compaction_->in_progress == false); InternalKey manual_end; if (is_manual) { ManualCompaction* m = manual_compaction_; + assert(!m->in_progress); + m->in_progress = true; // another thread cannot pick up the same work c = versions_->CompactRange(m->level, m->begin, m->end); m->done = (c == NULL); if (c != NULL) { @@ -781,6 +900,7 @@ Status DBImpl::BackgroundCompaction() { Status status; if (c == NULL) { // Nothing to do + Log(options_.info_log, "Compaction nothing to do"); } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); @@ -796,12 +916,16 @@ Status DBImpl::BackgroundCompaction() { static_cast(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); + versions_->ReleaseCompactionFiles(c); + *madeProgress = true; } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); CleanupCompaction(compact); + versions_->ReleaseCompactionFiles(c); c->ReleaseInputs(); DeleteObsoleteFiles(); + *madeProgress = true; } delete c; @@ -828,6 +952,7 @@ Status DBImpl::BackgroundCompaction() { m->tmp_storage = manual_end; m->begin = &m->tmp_storage; } + m->in_progress = false; // not being processed anymore manual_compaction_ = NULL; } return status; @@ -850,21 +975,54 @@ void DBImpl::CleanupCompaction(CompactionState* compact) { delete compact; } +// Allocate the file numbers for the output file. We allocate as +// many output file numbers as there are files in level+1. +// Insert them into pending_outputs so that they do not get deleted. +void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) { + mutex_.AssertHeld(); + assert(compact != NULL); + assert(compact->builder == NULL); + int filesNeeded = compact->compaction->num_input_files(1); + for (unsigned i = 0; i < filesNeeded; i++) { + uint64_t file_number = versions_->NewFileNumber(); + pending_outputs_.insert(file_number); + compact->allocated_file_numbers.push_back(file_number); + } +} + +// Frees up unused file number. +void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) { + mutex_.AssertHeld(); + for (std::list::iterator it = + compact->allocated_file_numbers.begin(); + it != compact->allocated_file_numbers.end(); ++it) { + uint64_t file_number = *it; + pending_outputs_.erase(file_number); + // Log(options_.info_log, "XXX releasing unused file num %d", file_number); + } +} + Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { assert(compact != NULL); assert(compact->builder == NULL); uint64_t file_number; - { + // If we have not yet exhausted the pre-allocated file numbers, + // then use the one from the front. Otherwise, we have to acquire + // the heavyweight lock and allocate a new file number. + if (!compact->allocated_file_numbers.empty()) { + file_number = compact->allocated_file_numbers.front(); + compact->allocated_file_numbers.pop_front(); + } else { mutex_.Lock(); file_number = versions_->NewFileNumber(); pending_outputs_.insert(file_number); - CompactionState::Output out; - out.number = file_number; - out.smallest.Clear(); - out.largest.Clear(); - compact->outputs.push_back(out); mutex_.Unlock(); } + CompactionState::Output out; + out.number = file_number; + out.smallest.Clear(); + out.largest.Clear(); + compact->outputs.push_back(out); // Make the output file std::string fname = TableFileName(dbname_, file_number); @@ -933,6 +1091,21 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, Status DBImpl::InstallCompactionResults(CompactionState* compact) { mutex_.AssertHeld(); + + // paranoia: verify that the files that we started with + // still exist in the current version and in the same original level. + // This ensures that a concurrent compaction did not erroneously + // pick the same files to compact. + if (options_.paranoid_checks && + !versions_->VerifyCompactionFileConsistency(compact->compaction)) { + Log(options_.info_log, "Compaction %d@%d + %d@%d files aborted", + compact->compaction->num_input_files(0), + compact->compaction->level(), + compact->compaction->num_input_files(1), + compact->compaction->level() + 1); + return Status::IOError("Compaction input files inconsistent"); + } + Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", compact->compaction->num_input_files(0), compact->compaction->level(), @@ -953,14 +1126,15 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { } Status DBImpl::DoCompactionWork(CompactionState* compact) { - const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions - Log(options_.info_log, "Compacting %d@%d + %d@%d files", + Log(options_.info_log, + "Compacting %d@%d + %d@%d files, compaction slots available %d", compact->compaction->num_input_files(0), compact->compaction->level(), compact->compaction->num_input_files(1), - compact->compaction->level() + 1); + compact->compaction->level() + 1, + options_.max_background_compactions - bg_compaction_scheduled_); char scratch[256]; compact->compaction->Summary(scratch, sizeof(scratch)); Log(options_.info_log, "Compaction start summary: %s\n", scratch); @@ -974,9 +1148,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->smallest_snapshot = snapshots_.oldest()->number_; } + // Allocate the output file numbers before we release the lock + AllocateCompactionOutputFileNumbers(compact); + // Release mutex while we're actually doing the compaction work mutex_.Unlock(); + const uint64_t start_micros = env_->NowMicros(); Iterator* input = versions_->MakeInputIterator(compact->compaction); input->SeekToFirst(); Status status; @@ -986,10 +1164,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { SequenceNumber last_sequence_for_key = kMaxSequenceNumber; for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work - if (has_imm_.NoBarrier_Load() != NULL) { + if (imm_.imm_flush_needed.NoBarrier_Load() != NULL) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); - if (imm_ != NULL) { + if (imm_.IsFlushPending()) { CompactMemTable(); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } @@ -1101,15 +1279,23 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { stats.bytes_written += compact->outputs[i].file_size; } + int MBpersec = ((stats.bytes_read + stats.bytes_written)) + /stats.micros; + mutex_.Lock(); stats_[compact->compaction->level() + 1].Add(stats); + // if there were any unused file number (mostly in case of + // compaction error), free up the entry from pending_putputs + ReleaseCompactionUnusedFileNumbers(compact); + if (status.ok()) { status = InstallCompactionResults(compact); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, - "compacted to: %s", versions_->LevelSummary(&tmp)); + "compacted to: %s %d MBytes/sec", versions_->LevelSummary(&tmp), + MBpersec); return status; } @@ -1117,15 +1303,15 @@ namespace { struct IterState { port::Mutex* mu; Version* version; - MemTable* mem; - MemTable* imm; + std::vector mem; // includes both mem_ and imm_ }; static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); state->mu->Lock(); - state->mem->Unref(); - if (state->imm != NULL) state->imm->Unref(); + for (unsigned int i = 0; i < state->mem.size(); i++) { + state->mem[i]->Unref(); + } state->version->Unref(); state->mu->Unlock(); delete state; @@ -1138,22 +1324,29 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); - // Collect together all needed child iterators + // Collect together all needed child iterators for mem std::vector list; - list.push_back(mem_->NewIterator()); mem_->Ref(); - if (imm_ != NULL) { - list.push_back(imm_->NewIterator()); - imm_->Ref(); + list.push_back(mem_->NewIterator()); + cleanup->mem.push_back(mem_); + + // Collect together all needed child iterators for imm_ + std::vector immutables; + imm_.GetMemTables(&immutables); + for (unsigned int i = 0; i < immutables.size(); i++) { + MemTable* m = immutables[i]; + m->Ref(); + list.push_back(m->NewIterator()); + cleanup->mem.push_back(m); } + + // Collect iterators for files in L0 - Ln versions_->current()->AddIterators(options, &list); Iterator* internal_iter = NewMergingIterator(&internal_comparator_, &list[0], list.size()); versions_->current()->Ref(); cleanup->mu = &mutex_; - cleanup->mem = mem_; - cleanup->imm = imm_; cleanup->version = versions_->current(); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); @@ -1184,10 +1377,10 @@ Status DBImpl::Get(const ReadOptions& options, } MemTable* mem = mem_; - MemTable* imm = imm_; + MemTableList imm = imm_; Version* current = versions_->current(); mem->Ref(); - if (imm != NULL) imm->Ref(); + imm.RefAll(); current->Ref(); bool have_stat_update = false; @@ -1200,7 +1393,7 @@ Status DBImpl::Get(const ReadOptions& options, LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { // Done - } else if (imm != NULL && imm->Get(lkey, value, &s)) { + } else if (imm.Get(lkey, value, &s)) { // Done } else { s = current->Get(options, lkey, value, &stats); @@ -1214,7 +1407,7 @@ Status DBImpl::Get(const ReadOptions& options, MaybeScheduleCompaction(); } mem->Unref(); - if (imm != NULL) imm->Unref(); + imm.UnrefAll(); current->Unref(); return s; } @@ -1399,24 +1592,30 @@ Status DBImpl::MakeRoomForWrite(bool force) { mutex_.Unlock(); env_->SleepForMicroseconds(1000); allow_delay = false; // Do not delay a single write more than once - Log(options_.info_log, "delaying write...\n"); mutex_.Lock(); + delayed_writes_++; } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // There is room in current memtable + if (allow_delay) { + DelayLoggingAndReset(); + } break; - } else if (imm_ != NULL) { + } else if (imm_.size() == options_.max_write_buffer_number - 1) { // We have filled up the current memtable, but the previous - // one is still being compacted, so we wait. + // ones are still being compacted, so we wait. + DelayLoggingAndReset(); Log(options_.info_log, "wait for memtable compaction...\n"); bg_cv_.Wait(); } else if (versions_->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { // There are too many level-0 files. - Log(options_.info_log, "waiting...\n"); + DelayLoggingAndReset(); + Log(options_.info_log, "wait for fewer level0 files...\n"); bg_cv_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old + DelayLoggingAndReset(); assert(versions_->PrevLogNumber() == 0); uint64_t new_log_number = versions_->NewFileNumber(); WritableFile* lfile = NULL; @@ -1431,9 +1630,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { logfile_ = lfile; logfile_number_ = new_log_number; log_ = new log::Writer(lfile); - imm_ = mem_; - has_imm_.Release_Store(imm_); - mem_ = new MemTable(internal_comparator_); + imm_.Add(mem_); + mem_ = new MemTable(internal_comparator_, NumberLevels()); mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); @@ -1522,6 +1720,13 @@ void DBImpl::GetApproximateSizes( } } +inline void DBImpl::DelayLoggingAndReset() { + if (delayed_writes_ > 0) { + Log(options_.info_log, "delayed %d write...\n", delayed_writes_ ); + delayed_writes_ = 0; + } +} + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { diff --git a/db/db_impl.h b/db/db_impl.h index e08025eca..a011dea7d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -14,6 +14,7 @@ #include "leveldb/env.h" #include "port/port.h" #include "util/stats_logger.h" +#include "memtablelist.h" #ifdef USE_SCRIBE #include "scribe/scribe_logger.h" @@ -99,13 +100,20 @@ class DBImpl : public DB { // Compact the in-memory write buffer to disk. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status CompactMemTable(); + Status CompactMemTable(bool* madeProgress = NULL); Status RecoverLogFile(uint64_t log_number, VersionEdit* edit, SequenceNumber* max_sequence); - Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base); + // The following two methods are used to flush a memtable to + // storage. The first one is used atdatabase RecoveryTime (when the + // database is opened) and is heavyweight because it holds the mutex + // for the entire period. The second method WriteLevel0Table supports + // concurrent flush memtables to storage. + Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit); + Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, + uint64_t* filenumber); Status MakeRoomForWrite(bool force /* compact even if there is room? */); WriteBatch* BuildBatchGroup(Writer** last_writer); @@ -123,13 +131,16 @@ class DBImpl : public DB { void MaybeScheduleCompaction(); static void BGWork(void* db); void BackgroundCall(); - Status BackgroundCompaction(); + Status BackgroundCompaction(bool* madeProgress); void CleanupCompaction(CompactionState* compact); Status DoCompactionWork(CompactionState* compact); Status OpenCompactionOutputFile(CompactionState* compact); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); Status InstallCompactionResults(CompactionState* compact); + void AllocateCompactionOutputFileNumbers(CompactionState* compact); + void ReleaseCompactionUnusedFileNumbers(CompactionState* compact); + // Constant after construction Env* const env_; @@ -151,8 +162,7 @@ class DBImpl : public DB { port::AtomicPointer shutting_down_; port::CondVar bg_cv_; // Signalled when background work finishes MemTable* mem_; - MemTable* imm_; // Memtable being compacted - port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ + MemTableList imm_; // Memtable that are not changing WritableFile* logfile_; uint64_t logfile_number_; log::Writer* log_; @@ -169,8 +179,8 @@ class DBImpl : public DB { // part of ongoing compactions. std::set pending_outputs_; - // Has a background compaction been scheduled or is running? - bool bg_compaction_scheduled_; + // count how many background compaction been scheduled or is running? + int bg_compaction_scheduled_; // Has a background stats log thread scheduled? bool bg_logstats_scheduled_; @@ -179,6 +189,7 @@ class DBImpl : public DB { struct ManualCompaction { int level; bool done; + bool in_progress; // compaction request being processed? const InternalKey* begin; // NULL means beginning of key range const InternalKey* end; // NULL means end of key range InternalKey tmp_storage; // Used to keep track of compaction progress @@ -220,6 +231,9 @@ class DBImpl : public DB { static const int KEEP_LOG_FILE_NUM = 1000; std::string db_absolute_path_; + // count of the number of contiguous delaying writes + int delayed_writes_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); @@ -227,6 +241,9 @@ class DBImpl : public DB { const Comparator* user_comparator() const { return internal_comparator_.user_comparator(); } + + // dump the delayed_writes_ to the log file and reset counter. + void DelayLoggingAndReset(); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/memtable.cc b/db/memtable.cc index bfec0a7e7..cbd94919a 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -18,10 +18,14 @@ static Slice GetLengthPrefixedSlice(const char* data) { return Slice(p, len); } -MemTable::MemTable(const InternalKeyComparator& cmp) +MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel) : comparator_(cmp), refs_(0), - table_(comparator_, &arena_) { + table_(comparator_, &arena_), + flush_in_progress_(false), + flush_completed_(false), + file_number_(0), + edit_(numlevel) { } MemTable::~MemTable() { @@ -101,7 +105,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, p += 8; p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); - assert((p + val_size) - buf == encoded_len); + assert((p + val_size) - buf == (unsigned)encoded_len); table_.Insert(buf); } diff --git a/db/memtable.h b/db/memtable.h index 92e90bb09..fa918c922 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -9,6 +9,7 @@ #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" +#include "db/version_set.h" #include "util/arena.h" namespace leveldb { @@ -21,7 +22,8 @@ class MemTable { public: // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. - explicit MemTable(const InternalKeyComparator& comparator); + explicit MemTable(const InternalKeyComparator& comparator, + int numlevel = 7); // Increase reference count. void Ref() { ++refs_; } @@ -63,6 +65,9 @@ class MemTable { // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s); + // Returns the edits area that is needed for flushing the memtable + VersionEdit* GetEdits() { return &edit_; } + private: ~MemTable(); // Private since only Unref() should be used to delete it @@ -73,6 +78,7 @@ class MemTable { }; friend class MemTableIterator; friend class MemTableBackwardIterator; + friend class MemTableList; typedef SkipList Table; @@ -81,6 +87,15 @@ class MemTable { Arena arena_; Table table_; + // These are used to manage memtable flushes to storage + bool flush_in_progress_; // started the flush + bool flush_completed_; // finished the flush + uint64_t file_number_; // filled up after flush is complete + + // The udpates to be applied to the transaction log when this + // memtable is flushed to storage. + VersionEdit edit_; + // No copying allowed MemTable(const MemTable&); void operator=(const MemTable&); diff --git a/db/memtablelist.cc b/db/memtablelist.cc new file mode 100644 index 000000000..916dd902f --- /dev/null +++ b/db/memtablelist.cc @@ -0,0 +1,192 @@ +// Copyright (c) 2012 Facebook. + +#include +#include "leveldb/db.h" +#include "db/memtable.h" +#include "db/memtablelist.h" +#include "leveldb/env.h" +#include "leveldb/iterator.h" +#include "util/coding.h" + +namespace leveldb { + +class InternalKeyComparator; +class Mutex; +class MemTableListIterator; +class VersionSet; + +using std::list; + +// Increase reference count on all underling memtables +void MemTableList::RefAll() { + for (list::iterator it = memlist_.begin(); + it != memlist_.end() ; ++it) { + (*it)->Ref(); + } +} + +// Drop reference count on all underling memtables +void MemTableList::UnrefAll() { + for (list::iterator it = memlist_.begin(); + it != memlist_.end() ; ++it) { + (*it)->Unref(); + } +} + +// Returns the total number of memtables in the list +int MemTableList::size() { + assert(num_flush_not_started_ <= size_); + return size_; +} + +// Returns true if there is at least one memtable on which flush has +// not yet started. +bool MemTableList::IsFlushPending() { + if (num_flush_not_started_ > 0) { + assert(imm_flush_needed.NoBarrier_Load() != NULL); + return true; + } + return false; +} + +// Returns the earliest memtable that needs to be flushed. +// Returns null, if no such memtable exist. +MemTable* MemTableList::PickMemtableToFlush() { + for (list::reverse_iterator it = memlist_.rbegin(); + it != memlist_.rend(); it++) { + MemTable* m = *it; + if (!m->flush_in_progress_) { + assert(!m->flush_completed_); + num_flush_not_started_--; + if (num_flush_not_started_ == 0) { + imm_flush_needed.Release_Store(NULL); + } + m->flush_in_progress_ = true; // flushing will start very soon + return m; + } + } + return NULL; +} + +// Record a successful flush in the manifest file +Status MemTableList::InstallMemtableFlushResults(MemTable* m, + VersionSet* vset, Status flushStatus, + port::Mutex* mu, Logger* info_log, + uint64_t file_number, + std::set& pending_outputs) { + mu->AssertHeld(); + assert(m->flush_in_progress_); + assert(m->file_number_ == 0); + + // If the flush was not successful, then just reset state. + // Maybe a suceeding attempt to flush will be successful. + if (!flushStatus.ok()) { + m->flush_in_progress_ = false; + m->flush_completed_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + imm_flush_needed.Release_Store((void *)1); + pending_outputs.erase(file_number); + return flushStatus; + } + + // flush was sucessful + m->flush_completed_ = true; + m->file_number_ = file_number; + + // if some other thread is already commiting, then return + Status s; + if (commit_in_progress_) { + return s; + } + + // Only a single thread can be executing this piece of code + commit_in_progress_ = true; + + // scan all memtables from the earliest, and commit those + // (in that order) that have finished flushing. + while (!memlist_.empty()) { + m = memlist_.back(); // get the last element + if (!m->flush_completed_) { + break; + } + Log(info_log, + "Level-0 commit table #%llu: started", + (unsigned long long)m->file_number_); + + // this can release and reacquire the mutex. + s = vset->LogAndApply(&m->edit_, mu); + + if (s.ok()) { // commit new state + Log(info_log, "Level-0 commit table #%llu: done", + (unsigned long long)m->file_number_); + memlist_.remove(m); + assert(m->file_number_ > 0); + + // pending_outputs can be cleared only after the newly created file + // has been written to a committed version so that other concurrently + // executing compaction threads do not mistakenly assume that this + // file is not live. + pending_outputs.erase(m->file_number_); + m->Unref(); + size_--; + } else { + //commit failed. setup state so that we can flush again. + Log(info_log, "Level-0 commit table #%llu: failed", + (unsigned long long)m->file_number_); + m->flush_completed_ = false; + m->flush_in_progress_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + pending_outputs.erase(m->file_number_); + m->file_number_ = 0; + imm_flush_needed.Release_Store((void *)1); + s = Status::IOError("Unable to commit flushed memtable"); + break; + } + } + commit_in_progress_ = false; + return s; +} + +// New memtables are inserted at the front of the list. +void MemTableList::Add(MemTable* m) { + assert(size_ >= num_flush_not_started_); + size_++; + memlist_.push_front(m); + num_flush_not_started_++; + if (num_flush_not_started_ == 1) { + imm_flush_needed.Release_Store((void *)1); + } +} + +// Returns an estimate of the number of bytes of data in use. +size_t MemTableList::ApproximateMemoryUsage() { + size_t size = 0; + for (list::iterator it = memlist_.begin(); + it != memlist_.end(); ++it) { + size += (*it)->ApproximateMemoryUsage(); + } + return size; +} + +// Search all the memtables starting from the most recent one. +// Return the most recent value found, if any. +bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s) { + for (list::iterator it = memlist_.begin(); + it != memlist_.end(); ++it) { + if ((*it)->Get(key, value, s)) { + return true; + } + } + return false; +} + +void MemTableList::GetMemTables(std::vector* output) { + for (list::iterator it = memlist_.begin(); + it != memlist_.end(); ++it) { + output->push_back(*it); + } +} + +} // namespace leveldb diff --git a/db/memtablelist.h b/db/memtablelist.h new file mode 100644 index 000000000..76008a8c2 --- /dev/null +++ b/db/memtablelist.h @@ -0,0 +1,96 @@ +// Copyright (c) 2012 Facebook. + +#ifndef STORAGE_LEVELDB_DB_MEMTABLELIST_H_ +#define STORAGE_LEVELDB_DB_MEMTABLELIST_H_ + +#include +#include +#include "leveldb/db.h" +#include "db/dbformat.h" +#include "db/skiplist.h" +#include "util/arena.h" +#include "memtable.h" + +namespace leveldb { + +class InternalKeyComparator; +class Mutex; +class MemTableListIterator; + +// +// This class stores refeernces to all the immutable memtables. +// The memtables are flushed to L0 as soon as possible and in +// any order. If there are more than one immutable memtable, their +// flushes can occur concurrently. However, they are 'committed' +// to the manifest in FIFO order to maintain correctness and +// recoverability from a crash. +// +class MemTableList { + public: + // A list of memtables. + MemTableList() : size_(0), num_flush_not_started_(0), + commit_in_progress_(false) { + imm_flush_needed.Release_Store(NULL); + } + ~MemTableList() {}; + + // so that backgrund threads can detect non-NULL pointer to + // determine whether this is anything more to start flushing. + port::AtomicPointer imm_flush_needed; + + // Increase reference count on all underling memtables + void RefAll(); + + // Drop reference count on all underling memtables + void UnrefAll(); + + // Returns the total number of memtables in the list + int size(); + + // Returns true if there is at least one memtable on which flush has + // not yet started. + bool IsFlushPending(); + + // Returns the earliest memtable that needs to be flushed. + // Returns null, if no such memtable exist. + MemTable* PickMemtableToFlush(); + + // Commit a successful flush in the manifest file + Status InstallMemtableFlushResults(MemTable* m, + VersionSet* vset, Status flushStatus, + port::Mutex* mu, Logger* info_log, + uint64_t file_number, + std::set& pending_outputs); + + // New memtables are inserted at the front of the list. + void Add(MemTable* m); + + // Returns an estimate of the number of bytes of data in use. + size_t ApproximateMemoryUsage(); + + // Search all the memtables starting from the most recent one. + // Return the most recent value found, if any. + bool Get(const LookupKey& key, std::string* value, Status* s); + + // Returns the list of underlying memtables. + void GetMemTables(std::vector* list); + + // Copying allowed + // MemTableList(const MemTableList&); + // void operator=(const MemTableList&); + + private: + std::list memlist_; + int size_; + + // the number of elements that still need flushing + int num_flush_not_started_; + + // committing in progress + bool commit_in_progress_; + +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_DB_MEMTABLELIST_H_ diff --git a/db/repair.cc b/db/repair.cc index 3a2d038c8..4b0220b5c 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -199,7 +199,7 @@ class Repairer { std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = new MemTable(icmp_); + MemTable* mem = new MemTable(icmp_, options_.num_levels); mem->Ref(); int counter = 0; while (reader.ReadRecord(&record, &scratch)) { diff --git a/db/skiplist.h b/db/skiplist.h index ba62641c8..9cb67bbe6 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -23,6 +23,10 @@ // more lists. // // ... prev vs. next pointer ordering ... +// + +#ifndef STORAGE_LEVELDB_DB_SKIPLIST_H_ +#define STORAGE_LEVELDB_DB_SKIPLIST_H_ #include #include @@ -390,3 +394,5 @@ bool SkipList::Contains(const Key& key) const { } } // namespace leveldb + +#endif // STORAGE_LEVELDB_DB_SKIPLIST_H_ diff --git a/db/version_edit.h b/db/version_edit.h index 320361754..159606448 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -21,8 +21,10 @@ struct FileMetaData { uint64_t file_size; // File size in bytes InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table + bool being_compacted; // Is this file undergoing compaction? - FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { } + FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0), + being_compacted(false) { } }; class VersionEdit { diff --git a/db/version_set.cc b/db/version_set.cc index b00dc7737..d36438251 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -266,13 +266,14 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } -Version::Version(VersionSet* vset) +Version::Version(VersionSet* vset, uint64_t version_number) : vset_(vset), next_(this), prev_(this), refs_(0), file_to_compact_(NULL), file_to_compact_level_(-1), - compaction_score_(-1), - compaction_level_(-1), - offset_manifest_file_(0) { + compaction_score_(vset->NumberLevels()), + compaction_level_(vset->NumberLevels()), + offset_manifest_file_(0), + version_number_(version_number) { files_ = new std::vector[vset->NumberLevels()]; } @@ -503,6 +504,8 @@ std::string Version::DebugString() const { // 20:43['e' .. 'g'] r.append("--- level "); AppendNumberTo(&r, level); + r.append(" --- version# "); + AppendNumberTo(&r, version_number_); r.append(" ---\n"); const std::vector& files = files_[level]; for (size_t i = 0; i < files.size(); i++) { @@ -520,6 +523,17 @@ std::string Version::DebugString() const { return r; } +// this is used to batch writes to the manifest file +struct VersionSet::ManifestWriter { + Status status; + bool done; + port::CondVar cv; + VersionEdit* edit; + + explicit ManifestWriter(port::Mutex* mu, VersionEdit* e) : + done(false), cv(mu), edit(e) {} +}; + // A helper class so we can efficiently apply a whole sequence // of edits to a particular state without creating intermediate // Versions that contain full copies of the intermediate state. @@ -586,8 +600,30 @@ class VersionSet::Builder { base_->Unref(); } + void CheckConsistency(Version* v) { +#ifndef NDEBUG + for (int level = 0; level < vset_->NumberLevels(); level++) { + // Make sure there is no overlap in levels > 0 + if (level > 0) { + for (uint32_t i = 1; i < v->files_[level].size(); i++) { + const InternalKey& prev_end = v->files_[level][i-1]->largest; + const InternalKey& this_begin = v->files_[level][i]->smallest; + if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) { + fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", + prev_end.DebugString().c_str(), + this_begin.DebugString().c_str()); + abort(); + } + } + } + } +#endif + } + // Apply all of the edits in *edit to the current state. void Apply(VersionEdit* edit) { + CheckConsistency(base_); + // Update compaction pointers for (size_t i = 0; i < edit->compact_pointers_.size(); i++) { const int level = edit->compact_pointers_[i].first; @@ -603,6 +639,18 @@ class VersionSet::Builder { const int level = iter->first; const uint64_t number = iter->second; levels_[level].deleted_files.insert(number); + +#ifndef NDEBUG + // none of the files to be deleted could have been added + // by a concurrent compaction process + const FileSet* added = levels_[level].added_files; + for (FileSet::const_iterator added_iter = added->begin(); + added_iter != added->end(); + ++added_iter) { + FileMetaData* f = *added_iter; + assert(f->number != number); + } +#endif } // Add new files @@ -634,6 +682,8 @@ class VersionSet::Builder { // Save the current state in *v. void SaveTo(Version* v) { + CheckConsistency(base_); + CheckConsistency(v); BySmallestKey cmp; cmp.internal_comparator = &vset_->icmp_; for (int level = 0; level < vset_->NumberLevels(); level++) { @@ -662,22 +712,7 @@ class VersionSet::Builder { for (; base_iter != base_end; ++base_iter) { MaybeAddFile(v, level, *base_iter); } - -#ifndef NDEBUG - // Make sure there is no overlap in levels > 0 - if (level > 0) { - for (uint32_t i = 1; i < v->files_[level].size(); i++) { - const InternalKey& prev_end = v->files_[level][i-1]->largest; - const InternalKey& this_begin = v->files_[level][i]->smallest; - if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) { - fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", - prev_end.DebugString().c_str(), - this_begin.DebugString().c_str()); - abort(); - } - } - } -#endif + CheckConsistency(v); } } @@ -714,7 +749,9 @@ VersionSet::VersionSet(const std::string& dbname, descriptor_file_(NULL), descriptor_log_(NULL), dummy_versions_(this), - current_(NULL) { + current_(NULL), + compactions_in_progress_(options_->num_levels), + current_version_number_(0) { compact_pointer_ = new std::string[options_->num_levels]; max_file_size_ = new uint64_t[options_->num_levels]; level_max_bytes_ = new uint64_t[options->num_levels]; @@ -729,7 +766,7 @@ VersionSet::VersionSet(const std::string& dbname, level_max_bytes_[i] = options_->max_bytes_for_level_base; } } - AppendVersion(new Version(this)); + AppendVersion(new Version(this, current_version_number_++)); } VersionSet::~VersionSet() { @@ -747,6 +784,7 @@ void VersionSet::AppendVersion(Version* v) { assert(v->refs_ == 0); assert(v != current_); if (current_ != NULL) { + assert(current_->refs_ > 0); current_->Unref(); } current_ = v; @@ -760,27 +798,35 @@ void VersionSet::AppendVersion(Version* v) { } Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { - if (edit->has_log_number_) { - assert(edit->log_number_ >= log_number_); - assert(edit->log_number_ < next_file_number_); - } else { - edit->SetLogNumber(log_number_); - } + mu->AssertHeld(); - if (!edit->has_prev_log_number_) { - edit->SetPrevLogNumber(prev_log_number_); + // queue our request + ManifestWriter w(mu, edit); + manifest_writers_.push_back(&w); + while (!w.done && &w != manifest_writers_.front()) { + w.cv.Wait(); } - - edit->SetNextFile(next_file_number_); - edit->SetLastSequence(last_sequence_); - - Version* v = new Version(this); - { - Builder builder(this, current_); - builder.Apply(edit); - builder.SaveTo(v); + if (w.done) { + manifest_lock_.Unlock(); + return w.status; } - Finalize(v); + + std::vector batch_edits; + Version* v = new Version(this, current_version_number_++); + Builder builder(this, current_); + + // process all requests in the queue + ManifestWriter* last_writer = &w; + ManifestWriter* first = manifest_writers_.front(); + assert(!manifest_writers_.empty()); + assert(first == &w); + std::deque::iterator iter = manifest_writers_.begin(); + for (; iter != manifest_writers_.end(); ++iter) { + last_writer = *iter; + LogAndApplyHelper(&builder, v, last_writer->edit, mu); + batch_edits.push_back(last_writer->edit); + } + builder.SaveTo(v); // Initialize new descriptor log file if necessary by creating // a temporary file that contains a snapshot of the current version. @@ -800,15 +846,22 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { } } - // Unlock during expensive MANIFEST log write + // Unlock during expensive MANIFEST log write. New writes cannot get here + // because &w is ensuring that all new writes get queued. { mu->Unlock(); + Finalize(v); // Write new record to MANIFEST log if (s.ok()) { std::string record; - edit->EncodeTo(&record); - s = descriptor_log_->AddRecord(record); + for (unsigned int i = 0; i < batch_edits.size(); i++) { + batch_edits[i]->EncodeTo(&record); + s = descriptor_log_->AddRecord(record); + if (!s.ok()) { + break; + } + } if (s.ok()) { if (options_->use_fsync) { s = descriptor_file_->Fsync(); @@ -826,7 +879,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { // find offset in manifest file where this version is stored. new_manifest_file_size = descriptor_file_->GetFileSize(); - + mu->Lock(); } @@ -836,7 +889,10 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { AppendVersion(v); log_number_ = edit->log_number_; prev_log_number_ = edit->prev_log_number_; + } else { + Log(options_->info_log, "Error in committing version %d", + v->GetVersionNumber()); delete v; if (!new_manifest_file.empty()) { delete descriptor_log_; @@ -847,9 +903,46 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { } } + // wake up all the waiting writers + while (true) { + ManifestWriter* ready = manifest_writers_.front(); + manifest_writers_.pop_front(); + if (ready != &w) { + ready->status = s; + ready->done = true; + ready->cv.Signal(); + } + if (ready == last_writer) break; + } + // Notify new head of write queue + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } return s; } +void VersionSet::LogAndApplyHelper(Builder* builder, Version* v, + VersionEdit* edit, port::Mutex* mu) { + manifest_lock_.AssertHeld(); + mu->AssertHeld(); + + if (edit->has_log_number_) { + assert(edit->log_number_ >= log_number_); + assert(edit->log_number_ < next_file_number_); + } else { + edit->SetLogNumber(log_number_); + } + + if (!edit->has_prev_log_number_) { + edit->SetPrevLogNumber(prev_log_number_); + } + + edit->SetNextFile(next_file_number_); + edit->SetLastSequence(last_sequence_); + + builder->Apply(edit); +} + Status VersionSet::Recover() { struct LogReporter : public log::Reader::Reporter { Status* status; @@ -958,7 +1051,7 @@ Status VersionSet::Recover() { } if (s.ok()) { - Version* v = new Version(this); + Version* v = new Version(this, current_version_number_++); builder.SaveTo(v); // Install recovered version Finalize(v); @@ -1073,7 +1166,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname) { } if (s.ok()) { - Version* v = new Version(this); + Version* v = new Version(this, 0); builder.SaveTo(v); // Install recovered version Finalize(v); @@ -1101,9 +1194,6 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { } void VersionSet::Finalize(Version* v) { - // Precomputed best level for next compaction - int best_level = -1; - double best_score = -1; for (int level = 0; level < NumberLevels()-1; level++) { double score; @@ -1119,22 +1209,54 @@ void VersionSet::Finalize(Version* v) { // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). - score = v->files_[level].size() / + int numfiles = 0; + for (unsigned int i = 0; i < v->files_[level].size(); i++) { + if (!v->files_[level][i]->being_compacted) { + numfiles++; + } + } + + // If we are slowing down writes, then we better compact that first + if (numfiles >= options_->level0_stop_writes_trigger) { + score = 1000000; + // Log(options_->info_log, "XXX score l0 = 1000000000 max"); + } else if (numfiles >= options_->level0_slowdown_writes_trigger) { + score = 10000; + // Log(options_->info_log, "XXX score l0 = 1000000 medium"); + } else { + score = numfiles / static_cast(options_->level0_file_num_compaction_trigger); + if (score >= 1) { + // Log(options_->info_log, "XXX score l0 = %d least", (int)score); + } + } } else { // Compute the ratio of current size to size limit. - const uint64_t level_bytes = TotalFileSize(v->files_[level]); + const uint64_t level_bytes = TotalFileSize(v->files_[level]) - + SizeBeingCompacted(level); score = static_cast(level_bytes) / MaxBytesForLevel(level); + if (score > 1) { + // Log(options_->info_log, "XXX score l%d = %d ", level, (int)score); + } } - - if (score > best_score) { - best_level = level; - best_score = score; - } + v->compaction_level_[level] = level; + v->compaction_score_[level] = score; } - v->compaction_level_ = best_level; - v->compaction_score_ = best_score; + // sort all the levels based on their score. Higher scores get listed + // first. Use bubble sort because the number of entries are small. + for(int i = 0; i < NumberLevels()-2; i++) { + for (int j = i+1; j < NumberLevels()-2; j++) { + if (v->compaction_score_[i] < v->compaction_score_[j]) { + double score = v->compaction_score_[i]; + int level = v->compaction_level_[i]; + v->compaction_score_[i] = v->compaction_score_[j]; + v->compaction_level_[i] = v->compaction_level_[j]; + v->compaction_score_[j] = score; + v->compaction_level_[j] = level; + } + } + } } Status VersionSet::WriteSnapshot(log::Writer* log) { @@ -1379,40 +1501,162 @@ int64_t VersionSet::MaxGrandParentOverlapBytes(int level) { return result; } -Compaction* VersionSet::PickCompaction() { - Compaction* c; - int level; +// verify that the files listed in this compaction are present +// in the current version +bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { + if (c->input_version_ != current_) { + Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch"); + } - // We prefer compactions triggered by too much data in a level over - // the compactions triggered by seeks. - const bool size_compaction = (current_->compaction_score_ >= 1); - const bool seek_compaction = (current_->file_to_compact_ != NULL); - if (size_compaction) { - level = current_->compaction_level_; - assert(level >= 0); - assert(level+1 < NumberLevels()); - c = new Compaction(level, MaxFileSizeForLevel(level), - MaxGrandParentOverlapBytes(level), NumberLevels()); + // verify files in level + int level = c->level(); + for (int i = 0; i < c->num_input_files(0); i++) { + uint64_t number = c->input(0,i)->number; - // Pick the first file that comes after compact_pointer_[level] - for (size_t i = 0; i < current_->files_[level].size(); i++) { - FileMetaData* f = current_->files_[level][i]; - if (compact_pointer_[level].empty() || - icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { - c->inputs_[0].push_back(f); + // look for this file in the current version + bool found = false; + for (unsigned int j = 0; j < current_->files_[level].size(); j++) { + FileMetaData* f = current_->files_[level][j]; + if (f->number == number) { + found = true; break; } } - if (c->inputs_[0].empty()) { - // Wrap-around to the beginning of the key space - c->inputs_[0].push_back(current_->files_[level][0]); + if (!found) { + return false; // input files non existant in current version } - } else if (seek_compaction) { + } + // verify level+1 files + level++; + for (int i = 0; i < c->num_input_files(1); i++) { + uint64_t number = c->input(1,i)->number; + + // look for this file in the current version + bool found = false; + for (unsigned int j = 0; j < current_->files_[level].size(); j++) { + FileMetaData* f = current_->files_[level][j]; + if (f->number == number) { + found = true; + break; + } + } + if (!found) { + return false; // input files non existant in current version + } + } + return true; // everything good +} + +// Clear all files to indicate that they are not being compacted +// Delete this compaction from the list of running compactions. +void VersionSet::ReleaseCompactionFiles(Compaction* c) { + c->MarkFilesBeingCompacted(false); + compactions_in_progress_[c->level()].erase(c); +} + +// The total size of files that are currently being compacted +uint64_t VersionSet::SizeBeingCompacted(int level) { + uint64_t total = 0; + for (std::set::iterator it = + compactions_in_progress_[level].begin(); + it != compactions_in_progress_[level].end(); + ++it) { + Compaction* c = (*it); + assert(c->level() == level); + for (int i = 0; i < c->num_input_files(0); i++) { + total += c->input(0,i)->file_size; + } + } + return total; +} + +Compaction* VersionSet::PickCompactionBySize(int level) { + Compaction* c = NULL; + + // level 0 files are overlapping. So we cannot pick more + // than one concurrent compactions at this level. This + // cold be made better by looking at key-ranges that are + // being compacted at level 0. + if (level == 0 && compactions_in_progress_[level].size() == 1) { + return NULL; + } + + assert(level >= 0); + assert(level+1 < NumberLevels()); + c = new Compaction(level, MaxFileSizeForLevel(level), + MaxGrandParentOverlapBytes(level), NumberLevels()); + + // remember the first file that is not being compacted + int firstIndex = 0; + + // Pick the first file that comes after compact_pointer_[level] + for (size_t i = 0; i < current_->files_[level].size(); i++) { + FileMetaData* f = current_->files_[level][i]; + // do not pick a file to compact if it is being compacted + // from n-1 level. + if (f->being_compacted) { + continue; + } + firstIndex = i; + + // Pick a file that has a key-range larger than the range + // we picked in the previous call to PickCompaction. + if (compact_pointer_[level].empty() || + icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { + // Do not pick this file if its parents at level+1 are being compacted. + // Maybe we can avoid redoing this work in SetupOtherInputs + if (ParentFilesInCompaction(f, level)) { + continue; + } + c->inputs_[0].push_back(f); + break; + } + } + if (c->inputs_[0].empty()) { + // Wrap-around to the beginning of the key space + FileMetaData* f = current_->files_[level][firstIndex]; + if (!f->being_compacted && !ParentFilesInCompaction(f, level)) { + c->inputs_[0].push_back(f); + } + } + if (c->inputs_[0].empty()) { + delete c; + c = NULL; + } + return c; +} + +Compaction* VersionSet::PickCompaction() { + Compaction* c = NULL; + int level; + + // compute the compactions needed. It is better to do it here + // and also in LogAndApply(), otherwise the values could be stale. + Finalize(current_); + + // We prefer compactions triggered by too much data in a level over + // the compactions triggered by seeks. + // + // Find the compactions by size on all levels. + for (int i = 0; i < NumberLevels()-1; i++) { + level = current_->compaction_level_[i]; + if ((current_->compaction_score_[i] >= 1)) { + c = PickCompactionBySize(level); + if (c != NULL) { + break; + } + } + } + + // Find compactions needed by seeks + if (c == NULL && (current_->file_to_compact_ != NULL)) { level = current_->file_to_compact_level_; c = new Compaction(level, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels()); c->inputs_[0].push_back(current_->file_to_compact_); - } else { + } + + if (c == NULL) { return NULL; } @@ -1426,15 +1670,46 @@ Compaction* VersionSet::PickCompaction() { // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. - current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); + c->inputs_[0].clear(); + std::vector more; + current_->GetOverlappingInputs(0, &smallest, &largest, &more); + for (unsigned int i = 0; i < more.size(); i++) { + FileMetaData* f = more[i]; + if (!f->being_compacted && !ParentFilesInCompaction(f, level)) { + c->inputs_[0].push_back(f); + } + } assert(!c->inputs_[0].empty()); } SetupOtherInputs(c); + // mark all the files that are being compacted + c->MarkFilesBeingCompacted(true); + + // remember this currently undergoing compaction + compactions_in_progress_[level].insert(c); + return c; } +// Returns true if any one of the parent files are being compacted +bool VersionSet::ParentFilesInCompaction(FileMetaData* f, int level) { + std::vector inputs; + current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest, &inputs); + return FilesInCompaction(inputs); +} + +// Returns true if any one of specified files are being compacted +bool VersionSet::FilesInCompaction(std::vector& files) { + for (unsigned int i = 0; i < files.size(); i++) { + if (files[i]->being_compacted) { + return true; + } + } + return false; +} + void VersionSet::SetupOtherInputs(Compaction* c) { const int level = c->level(); InternalKey smallest, largest; @@ -1456,13 +1731,15 @@ void VersionSet::SetupOtherInputs(Compaction* c) { const int64_t expanded0_size = TotalFileSize(expanded0); int64_t limit = ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && - inputs1_size + expanded0_size < limit) { + inputs1_size + expanded0_size < limit && + !FilesInCompaction(expanded0)) { InternalKey new_start, new_limit; GetRange(expanded0, &new_start, &new_limit); std::vector expanded1; current_->GetOverlappingInputs(level+1, &new_start, &new_limit, &expanded1); - if (expanded1.size() == c->inputs_[1].size()) { + if (expanded1.size() == c->inputs_[1].size() && + !FilesInCompaction(expanded1)) { Log(options_->info_log, "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n", level, @@ -1531,6 +1808,11 @@ Compaction* VersionSet::CompactRange( c->input_version_->Ref(); c->inputs_[0] = inputs; SetupOtherInputs(c); + + // These files that are to be manaully compacted do not trample + // upon other files because manual compactions are processed when + // the system has a max of 1 background compaction thread. + c->MarkFilesBeingCompacted(true); return c; } @@ -1619,6 +1901,18 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) { } } +// Mark (or clear) each file that is being compacted +void Compaction::MarkFilesBeingCompacted(bool value) { + for (int i = 0; i < 2; i++) { + std::vector v = inputs_[i]; + for (unsigned int j = 0; j < inputs_[i].size(); j++) { + assert(value ? !inputs_[i][j]->being_compacted : + inputs_[i][j]->being_compacted); + inputs_[i][j]->being_compacted = value; + } + } +} + void Compaction::ReleaseInputs() { if (input_version_ != NULL) { input_version_->Unref(); @@ -1642,7 +1936,8 @@ static void InputSummary(std::vector& files, } void Compaction::Summary(char* output, int len) { - int write = snprintf(output, len, "Base level %d, inputs:", level_); + int write = snprintf(output, len, "Base version %ld Base level %d, inputs:", + input_version_->GetVersionNumber(), level_); if(write < 0 || write > len) return; diff --git a/db/version_set.h b/db/version_set.h index c9aa2a5d4..3688cfa5f 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -18,6 +18,7 @@ #include #include #include +#include #include "db/dbformat.h" #include "db/version_edit.h" #include "port/port.h" @@ -106,6 +107,11 @@ class Version { // Return a human readable string that describes this version's contents. std::string DebugString() const; + // Returns the version nuber of this version + uint64_t GetVersionNumber() { + return version_number_; + } + private: friend class Compaction; friend class VersionSet; @@ -128,13 +134,19 @@ class Version { // Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields // are initialized by Finalize(). - double compaction_score_; - int compaction_level_; + // The most critical level to be compacted is listed first + // These are used to pick the best compaction level + std::vector compaction_score_; + std::vector compaction_level_; // The offset in the manifest file where this version is stored. uint64_t offset_manifest_file_; - explicit Version(VersionSet* vset); + // A version number that uniquely represents this version. This is + // used for debugging and logging purposes only. + uint64_t version_number_; + + explicit Version(VersionSet* vset, uint64_t version_number = 0); ~Version(); @@ -229,10 +241,20 @@ class VersionSet { // The caller should delete the iterator when no longer needed. Iterator* MakeInputIterator(Compaction* c); + // Returns true iff some level needs a compaction because it has + // exceeded its target size. + bool NeedsSizeCompaction() const { + for (int i = 0; i < NumberLevels()-1; i++) { + if (current_->compaction_score_[i] >= 1) { + return true; + } + } + return false; + } // Returns true iff some level needs a compaction. bool NeedsCompaction() const { - Version* v = current_; - return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL); + return ((current_->file_to_compact_ != NULL) || + NeedsSizeCompaction()); } // Add all files listed in any live version to *live. @@ -263,8 +285,22 @@ class VersionSet { // Return the size of the current manifest file const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; } + // For the specfied level, pick a compaction. + // Returns NULL if there is no compaction to be done. + Compaction* PickCompactionBySize(int level); + + // Free up the files that were participated in a compaction + void ReleaseCompactionFiles(Compaction* c); + + // verify that the files that we started with for a compaction + // still exist in the current version and in the same original level. + // This ensures that a concurrent compaction did not erroneously + // pick the same files to compact. + bool VerifyCompactionFileConsistency(Compaction* c); + private: class Builder; + struct ManifestWriter; friend class Compaction; friend class Version; @@ -322,9 +358,34 @@ class VersionSet { // Per-level max bytes uint64_t* level_max_bytes_; + // record all the ongoing compactions for all levels + std::vector > compactions_in_progress_; + + // A lock that serialize writes to the manifest + port::Mutex manifest_lock_; + + // generates a increasing version number for every new version + uint64_t current_version_number_; + + // Queue of writers to the manifest file + std::deque manifest_writers_; + // No copying allowed VersionSet(const VersionSet&); void operator=(const VersionSet&); + + // Return the total amount of data that is undergoing + // compactions at this level + uint64_t SizeBeingCompacted(int level); + + // Returns true if any one of the parent files are being compacted + bool ParentFilesInCompaction(FileMetaData* f, int level); + + // Returns true if any one of the specified files are being compacted + bool FilesInCompaction(std::vector& files); + + void LogAndApplyHelper(Builder*b, Version* v, + VersionEdit* edit, port::Mutex* mu); }; // A Compaction encapsulates information about a compaction. @@ -403,6 +464,9 @@ class Compaction { // higher level than the ones involved in this compaction (i.e. for // all L >= level_ + 2). size_t* level_ptrs_; + + // mark (or clear) all files that are being compacted + void MarkFilesBeingCompacted(bool); }; } // namespace leveldb diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 944946692..4c07cc63c 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -139,6 +139,10 @@ class HdfsEnv : public Env { posixEnv->SetBackgroundThreads(number); } + virtual std::string TimeToString(uint64_t number) { + return posixEnv->TimeToString(number); + } + static uint64_t gettid() { assert(sizeof(pthread_t) <= sizeof(uint64_t)); return (uint64_t)pthread_self(); @@ -273,6 +277,8 @@ class HdfsEnv : public Env { std::string* outputpath) {return notsup;} virtual void SetBackgroundThreads(int number) {} + + virtual std::string TimeToString(uint64_t number) { return "";} }; } diff --git a/include/leveldb/env.h b/include/leveldb/env.h index a1a6c854b..f4284ace4 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -159,6 +159,9 @@ class Env { // default: 1 virtual void SetBackgroundThreads(int number) = 0; + // Converts seconds-since-Jan-01-1970 to a printable string + virtual std::string TimeToString(uint64_t time) = 0; + private: // No copying allowed Env(const Env&); @@ -358,6 +361,9 @@ class EnvWrapper : public Env { void SetBackgroundThreads(int num) { return target_->SetBackgroundThreads(num); } + std::string TimeToString(uint64_t time) { + return target_->TimeToString(time); + } private: Env* target_; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index da26cd440..c3b8aa479 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -78,7 +78,8 @@ struct Options { // on disk) before converting to a sorted on-disk file. // // Larger values increase performance, especially during bulk loads. - // Up to two write buffers may be held in memory at the same time, + // Up to max_write_buffer_number write buffers may be held in memory + // at the same time, // so you may wish to adjust this parameter to control memory usage. // Also, a larger write buffer will result in a longer recovery time // the next time the database is opened. @@ -86,6 +87,12 @@ struct Options { // Default: 4MB size_t write_buffer_size; + // The maximum number of write buffers that are built up in memory. + // The default is 2, so that when 1 write buffer is being flushed to + // storage, new writes can continue to the other write buffer. + // Default: 2 + int max_write_buffer_number; + // Number of open files that can be used by the DB. You may need to // increase this if your database has a large working set (budget // one open file per 2MB of working set). @@ -244,6 +251,10 @@ struct Options { // value is 0 which means that obsolete files get removed after // every compaction run. uint64_t delete_obsolete_files_period_micros; + + // Maximum number of concurrent background compactions. + // Default: 1 + int max_background_compactions; // Create an Options object with default values for all fields. Options(); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 84ab569e4..833ad2c38 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -46,6 +46,16 @@ static bool FLAGS_verbose = false; // (initialized to default value by "main") static int FLAGS_write_buffer_size = 0; +// The number of in-memory memtables. +// Each memtable is of size FLAGS_write_buffer_size. +// This is initialized to default value of 2 in "main" function. +static int FLAGS_max_write_buffer_number = 0; + +// The maximum number of concurrent background compactions +// that can occur in parallel. +// This is initialized to default value of 1 in "main" function. +static int FLAGS_max_background_compactions = 0; + // Number of bytes to use as a cache of uncompressed data. static long FLAGS_cache_size = 2 * KB * KB * KB; @@ -104,6 +114,11 @@ static int FLAGS_readwritepercent = 10; // Option to disable compation triggered by read. static int FLAGS_disable_seek_compaction = false; +// Option to delete obsolete files periodically +// Default: 0 which means that obsolete files are +// deleted after every compaction run. + static uint64_t FLAGS_delete_obsolete_files_period_micros = 0; + // Algorithm to use to compress the database static enum leveldb::CompressionType FLAGS_compression_type = leveldb::kSnappyCompression; @@ -626,6 +641,8 @@ class StressTest { Options options; options.block_cache = cache_; options.write_buffer_size = FLAGS_write_buffer_size; + options.max_write_buffer_number = FLAGS_max_write_buffer_number; + options.max_background_compactions = FLAGS_max_background_compactions; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -644,6 +661,8 @@ class StressTest { options.compression = FLAGS_compression_type; options.create_if_missing = true; options.disable_seek_compaction = FLAGS_disable_seek_compaction; + options.delete_obsolete_files_period_micros = + FLAGS_delete_obsolete_files_period_micros; Status s = DB::Open(options, FLAGS_db, &db_); if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); @@ -670,7 +689,10 @@ class StressTest { int main(int argc, char** argv) { FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; + FLAGS_max_write_buffer_number = leveldb::Options().max_write_buffer_number; FLAGS_open_files = leveldb::Options().max_open_files; + FLAGS_max_background_compactions = + leveldb::Options().max_background_compactions; // Compression test code above refers to FLAGS_block_size FLAGS_block_size = leveldb::Options().block_size; std::string default_db_path; @@ -706,6 +728,10 @@ int main(int argc, char** argv) { FLAGS_value_size_mult = n; } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { FLAGS_write_buffer_size = n; + } else if (sscanf(argv[i], "--max_write_buffer_number=%d%c", &n, &junk) == 1) { + FLAGS_max_write_buffer_number = n; + } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { + FLAGS_max_background_compactions = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { @@ -784,6 +810,9 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_disable_seek_compaction = n; + } else if (sscanf(argv[i], "--delete_obsolete_files_period_micros=%ld%c", + &l, &junk) == 1) { + FLAGS_delete_obsolete_files_period_micros = n; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/util/env_posix.cc b/util/env_posix.cc index 6b5feb284..f2b4a8474 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -749,6 +749,26 @@ class PosixEnv : public Env { } } + virtual std::string TimeToString(uint64_t secondsSince1970) { + const time_t seconds = (time_t)secondsSince1970; + struct tm t; + int maxsize = 64; + std::string dummy; + dummy.reserve(maxsize); + dummy.resize(maxsize); + char* p = &dummy[0]; + localtime_r(&seconds, &t); + snprintf(p, maxsize, + "%04d/%02d/%02d-%02d:%02d:%02d ", + t.tm_year + 1900, + t.tm_mon + 1, + t.tm_mday, + t.tm_hour, + t.tm_min, + t.tm_sec); + return dummy; + } + private: void PthreadCall(const char* label, int result) { if (result != 0) { diff --git a/util/options.cc b/util/options.cc index 8078b4db1..5441112b8 100644 --- a/util/options.cc +++ b/util/options.cc @@ -19,6 +19,7 @@ Options::Options() env(Env::Default()), info_log(NULL), write_buffer_size(4<<20), + max_write_buffer_number(2), max_open_files(1000), block_cache(NULL), block_size(4096), @@ -42,32 +43,34 @@ Options::Options() db_stats_log_interval(1800), db_log_dir(""), disable_seek_compaction(false), - delete_obsolete_files_period_micros(0) { + delete_obsolete_files_period_micros(0), + max_background_compactions(1) { } void Options::Dump( Logger * log) const { - Log(log," Options.comparator: %s", comparator->Name()); - Log(log," Options.create_if_missing: %d", create_if_missing); - Log(log," Options.error_if_exists: %d", error_if_exists); - Log(log," Options.paranoid_checks: %d", paranoid_checks); - Log(log," Options.env: %p", env); - Log(log," Options.info_log: %p", info_log); - Log(log," Options.write_buffer_size: %zd", write_buffer_size); - Log(log," Options.max_open_files: %d", max_open_files); - Log(log," Options.block_cache: %p", block_cache); - Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity()); - Log(log," Options.block_size: %zd", block_size); - Log(log,"Options.block_restart_interval: %d", block_restart_interval); - Log(log," Options.compression: %d", compression); - Log(log," Options.filter_policy: %s", + Log(log," Options.comparator: %s", comparator->Name()); + Log(log," Options.create_if_missing: %d", create_if_missing); + Log(log," Options.error_if_exists: %d", error_if_exists); + Log(log," Options.paranoid_checks: %d", paranoid_checks); + Log(log," Options.env: %p", env); + Log(log," Options.info_log: %p", info_log); + Log(log," Options.write_buffer_size: %zd", write_buffer_size); + Log(log," Options.max_write_buffer_number: %zd", max_write_buffer_number); + Log(log," Options.max_open_files: %d", max_open_files); + Log(log," Options.block_cache: %p", block_cache); + Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity()); + Log(log," Options.block_size: %zd", block_size); + Log(log," Options.block_restart_interval: %d", block_restart_interval); + Log(log," Options.compression: %d", compression); + Log(log," Options.filter_policy: %s", filter_policy == NULL ? "NULL" : filter_policy->Name()); - Log(log," Options.num_levels: %d", num_levels); - Log(log," Options.disableDataSync: %d", disableDataSync); - Log(log," Options.use_fsync: %d", use_fsync); - Log(log," Options.db_stats_log_interval: %d", + Log(log," Options.num_levels: %d", num_levels); + Log(log," Options.disableDataSync: %d", disableDataSync); + Log(log," Options.use_fsync: %d", use_fsync); + Log(log," Options.db_stats_log_interval: %d", db_stats_log_interval); Log(log," Options.level0_file_num_compaction_trigger: %d", level0_file_num_compaction_trigger); @@ -89,10 +92,12 @@ Options::Dump( expanded_compaction_factor); Log(log," Options.max_grandparent_overlap_factor: %d", max_grandparent_overlap_factor); - Log(log," Options.db_log_dir: %s", + Log(log," Options.db_log_dir: %s", db_log_dir.c_str()); - Log(log," Options.disable_seek_compaction: %d", + Log(log," Options.disable_seek_compaction: %d", disable_seek_compaction); + Log(log," Options.max_background_compactions: %d", + max_background_compactions); } // Options::Dump