From fdb6be4e2476943d36b94078bb6b29cd4133731a Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 19 Dec 2014 20:38:12 +0100 Subject: [PATCH] Rewritten system for scheduling background work Summary: When scaling to higher number of column families, the worst bottleneck was MaybeScheduleFlushOrCompaction(), which did a for loop over all column families while holding a mutex. This patch addresses the issue. The approach is similar to our earlier efforts: instead of a pull-model, where we do something for every column family, we can do a push-based model -- when we detect that column family is ready to be flushed/compacted, we add it to the flush_queue_/compaction_queue_. That way we don't need to loop over every column family in MaybeScheduleFlushOrCompaction. Here are the performance results: Command: ./db_bench --write_buffer_size=268435456 --db_write_buffer_size=268435456 --db=/fast-rocksdb-tmp/rocks_lots_of_cf --use_existing_db=0 --open_files=55000 --statistics=1 --histogram=1 --disable_data_sync=1 --max_write_buffer_number=2 --sync=0 --benchmarks=fillrandom --threads=16 --num_column_families=5000 --disable_wal=1 --max_background_flushes=16 --max_background_compactions=16 --level0_file_num_compaction_trigger=2 --level0_slowdown_writes_trigger=2 --level0_stop_writes_trigger=3 --hard_rate_limit=1 --num=33333333 --writes=33333333 Before the patch: fillrandom : 26.950 micros/op 37105 ops/sec; 4.1 MB/s After the patch: fillrandom : 17.404 micros/op 57456 ops/sec; 6.4 MB/s Next bottleneck is VersionSet::AddLiveFiles, which is painfully slow when we have a lot of files. This is coming in the next patch, but when I removed that code, here's what I got: fillrandom : 7.590 micros/op 131758 ops/sec; 14.6 MB/s Test Plan: make check two stress tests: Big number of compactions and flushes: ./db_stress --threads=30 --ops_per_thread=20000000 --max_key=10000 --column_families=20 --clear_column_family_one_in=10000000 --verify_before_write=0 --reopen=15 --max_background_compactions=10 --max_background_flushes=10 --db=/fast-rocksdb-tmp/db_stress --prefixpercent=0 --iterpercent=0 --writepercent=75 --db_write_buffer_size=2000000 max_background_flushes=0, to verify that this case also works correctly ./db_stress --threads=30 --ops_per_thread=2000000 --max_key=10000 --column_families=20 --clear_column_family_one_in=10000000 --verify_before_write=0 --reopen=3 --max_background_compactions=3 --max_background_flushes=0 --db=/fast-rocksdb-tmp/db_stress --prefixpercent=0 --iterpercent=0 --writepercent=75 --db_write_buffer_size=2000000 Reviewers: ljin, rven, yhchiang, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D30123 --- db/column_family.cc | 38 ++-- db/column_family.h | 17 ++ db/compaction_picker.cc | 24 ++- db/db_impl.cc | 375 ++++++++++++++++++++++++---------------- db/db_impl.h | 51 +++++- 5 files changed, 326 insertions(+), 179 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index f07c741a4..8a5c4a01f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -223,14 +223,11 @@ void SuperVersionUnrefHandle(void* ptr) { } } // anonymous namespace -ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, - Version* _dummy_versions, - Cache* _table_cache, - WriteBuffer* write_buffer, - const ColumnFamilyOptions& cf_options, - const DBOptions* db_options, - const EnvOptions& env_options, - ColumnFamilySet* column_family_set) +ColumnFamilyData::ColumnFamilyData( + uint32_t id, const std::string& name, Version* _dummy_versions, + Cache* _table_cache, WriteBuffer* write_buffer, + const ColumnFamilyOptions& cf_options, const DBOptions* db_options, + const EnvOptions& env_options, ColumnFamilySet* column_family_set) : id_(id), name_(name), dummy_versions_(_dummy_versions), @@ -250,7 +247,9 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, next_(nullptr), prev_(nullptr), log_number_(0), - column_family_set_(column_family_set) { + column_family_set_(column_family_set), + pending_flush_(false), + pending_compaction_(false) { Ref(); // if _dummy_versions is nullptr, then this is a dummy column family. @@ -285,10 +284,14 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, new LevelCompactionPicker(ioptions_, &internal_comparator_)); } - Log(InfoLogLevel::INFO_LEVEL, - ioptions_.info_log, "Options for column family \"%s\":\n", - name.c_str()); - options_.Dump(ioptions_.info_log); + if (column_family_set_->NumberOfColumnFamilies() < 10) { + Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log, + "--------------- Options for column family [%s]:\n", name.c_str()); + options_.Dump(ioptions_.info_log); + } else { + Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log, + "\t(skipping printing options)\n"); + } } RecalculateWriteStallConditions(mutable_cf_options_); @@ -313,6 +316,11 @@ ColumnFamilyData::~ColumnFamilyData() { current_->Unref(); } + // It would be wrong if this ColumnFamilyData is in flush_queue_ or + // compaction_queue_ and we destroyed it + assert(!pending_flush_); + assert(!pending_compaction_); + if (super_version_ != nullptr) { // Release SuperVersion reference kept in ThreadLocalPtr. // This must be done outside of mutex_ since unref handler can lock mutex. @@ -434,6 +442,10 @@ void ColumnFamilyData::CreateNewMemtable( mem_->Ref(); } +bool ColumnFamilyData::NeedsCompaction() const { + return compaction_picker_->NeedsCompaction(current_->storage_info()); +} + Compaction* ColumnFamilyData::PickCompaction( const MutableCFOptions& mutable_options, LogBuffer* log_buffer) { auto* result = compaction_picker_->PickCompaction( diff --git a/db/column_family.h b/db/column_family.h index 51ccd99ac..8cf66a0c0 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -210,8 +210,11 @@ class ColumnFamilyData { // See documentation in compaction_picker.h // REQUIRES: DB mutex held + bool NeedsCompaction() const; + // REQUIRES: DB mutex held Compaction* PickCompaction(const MutableCFOptions& mutable_options, LogBuffer* log_buffer); + // REQUIRES: DB mutex held Compaction* CompactRange( const MutableCFOptions& mutable_cf_options, int input_level, int output_level, uint32_t output_path_id, @@ -248,6 +251,7 @@ class ColumnFamilyData { // if its reference count is zero and needs deletion or nullptr if not // As argument takes a pointer to allocated SuperVersion to enable // the clients to allocate SuperVersion outside of mutex. + // IMPORTANT: Only call this from DBImpl::InstallSuperVersion() SuperVersion* InstallSuperVersion(SuperVersion* new_superversion, port::Mutex* db_mutex, const MutableCFOptions& mutable_cf_options); @@ -261,6 +265,12 @@ class ColumnFamilyData { bool triggered_flush_slowdown, bool triggered_flush_stop); + // Protected by DB mutex + void set_pending_flush(bool value) { pending_flush_ = value; } + void set_pending_compaction(bool value) { pending_compaction_ = value; } + bool pending_flush() { return pending_flush_; } + bool pending_compaction() { return pending_compaction_; } + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, @@ -328,6 +338,13 @@ class ColumnFamilyData { ColumnFamilySet* column_family_set_; std::unique_ptr write_controller_token_; + + // If true --> this ColumnFamily is currently present in DBImpl::flush_queue_ + bool pending_flush_; + + // If true --> this ColumnFamily is currently present in + // DBImpl::compaction_queue_ + bool pending_compaction_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 82653ff70..70be388c9 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -695,15 +695,6 @@ Compaction* LevelCompactionPicker::PickCompaction( Compaction* c = nullptr; int level = -1; - // Compute the compactions needed. It is better to do it here - // and also in LogAndApply(), otherwise the values could be stale. - std::vector size_being_compacted(NumberLevels() - 1); - SizeBeingCompacted(size_being_compacted); - - CompactionOptionsFIFO dummy_compaction_options_fifo; - vstorage->ComputeCompactionScore( - mutable_cf_options, dummy_compaction_options_fifo, size_being_compacted); - // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. // @@ -766,6 +757,21 @@ Compaction* LevelCompactionPicker::PickCompaction( compactions_in_progress_[level].insert(c); c->mutable_cf_options_ = mutable_cf_options; + + // Creating a compaction influences the compaction score because the score + // takes running compactions into account (by skipping files that are already + // being compacted). Since we just changed compaction score, we recalculate it + // here + { // this piece of code recomputes compaction score + std::vector size_being_compacted(NumberLevels() - 1); + SizeBeingCompacted(size_being_compacted); + + CompactionOptionsFIFO dummy_compaction_options_fifo; + vstorage->ComputeCompactionScore(mutable_cf_options, + dummy_compaction_options_fifo, + size_being_compacted); + } + return c; } diff --git a/db/db_impl.cc b/db/db_impl.cc index 3275165e8..cb5dcc59c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -89,6 +89,7 @@ void DumpRocksDBBuildVersion(Logger * log); struct DBImpl::WriteContext { autovector superversions_to_free_; autovector logs_to_free_; + bool schedule_bg_work_ = false; ~WriteContext() { for (auto& sv : superversions_to_free_) { @@ -205,8 +206,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) max_total_in_memory_state_(0), is_snapshot_supported_(true), write_buffer_(options.db_write_buffer_size), - tmp_batch_(), - bg_schedule_needed_(false), + unscheduled_flushes_(0), + unscheduled_compactions_(0), bg_compaction_scheduled_(0), bg_manual_only_(0), bg_flush_scheduled_(0), @@ -272,6 +273,19 @@ DBImpl::~DBImpl() { listeners_.clear(); flush_scheduler_.Clear(); + while (!flush_queue_.empty()) { + auto cfd = PopFirstFromFlushQueue(); + if (cfd->Unref()) { + delete cfd; + } + } + while (!compaction_queue_.empty()) { + auto cfd = PopFirstFromCompactionQueue(); + if (cfd->Unref()) { + delete cfd; + } + } + if (default_cf_handle_ != nullptr) { // we need to delete handle outside of lock because it does its own locking mutex_.Unlock(); @@ -1643,10 +1657,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, // SetNewMemtableAndNewLogFile() will release and reacquire mutex // during execution s = SetNewMemtableAndNewLogFile(cfd, &context); - cfd->imm()->FlushRequested(); - MaybeScheduleFlushOrCompaction(); - write_thread_.ExitWriteThread(&w, &w, s); + + cfd->imm()->FlushRequested(); + + // schedule flush + SchedulePendingFlush(cfd); + MaybeScheduleFlushOrCompaction(); } if (s.ok() && flush_options.wait) { @@ -1671,52 +1688,90 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); - bg_schedule_needed_ = false; if (bg_work_gate_closed_) { - // gate closed for backgrond work + // gate closed for background work + return; } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions - } else { - bool is_flush_pending = false; - // no need to refcount since we're under a mutex - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->imm()->IsFlushPending()) { - is_flush_pending = true; - } - } - if (is_flush_pending) { - // memtable flush needed - if (bg_flush_scheduled_ < db_options_.max_background_flushes) { - bg_flush_scheduled_++; - env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); - } else if (db_options_.max_background_flushes > 0) { - bg_schedule_needed_ = true; - } - } - bool is_compaction_needed = false; - // no need to refcount since we're under a mutex - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->compaction_picker()->NeedsCompaction( - cfd->current()->storage_info())) { - is_compaction_needed = true; - break; - } - } + return; + } else if (bg_manual_only_) { + // manual only + return; + } - // Schedule BGWorkCompaction if there's a compaction pending (or a memtable - // flush, but the HIGH pool is not enabled) - // Do it only if max_background_compactions hasn't been reached and - // bg_manual_only_ == 0 - if (!bg_manual_only_ && - (is_compaction_needed || - (is_flush_pending && db_options_.max_background_flushes == 0))) { - if (bg_compaction_scheduled_ < db_options_.max_background_compactions) { - bg_compaction_scheduled_++; - env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); - } else { - bg_schedule_needed_ = true; - } + while (unscheduled_flushes_ > 0 && + bg_flush_scheduled_ < db_options_.max_background_flushes) { + unscheduled_flushes_--; + bg_flush_scheduled_++; + env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); + } + + if (db_options_.max_background_flushes == 0 && + bg_compaction_scheduled_ < db_options_.max_background_compactions && + unscheduled_flushes_ > 0) { + // special case where flush is executed by compaction thread + // (if max_background_flushes == 0). + // Compaction thread will execute all the flushes + unscheduled_flushes_ = 0; + if (unscheduled_compactions_ > 0) { + // bg compaction will execute one compaction + unscheduled_compactions_--; } + bg_compaction_scheduled_++; + env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); + } + + while (bg_compaction_scheduled_ < db_options_.max_background_compactions && + unscheduled_compactions_ > 0) { + bg_compaction_scheduled_++; + unscheduled_compactions_--; + env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); + } +} + +void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) { + assert(!cfd->pending_compaction()); + cfd->Ref(); + compaction_queue_.push_back(cfd); + cfd->set_pending_compaction(true); +} + +ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() { + assert(!compaction_queue_.empty()); + auto cfd = *compaction_queue_.begin(); + compaction_queue_.pop_front(); + assert(cfd->pending_compaction()); + cfd->set_pending_compaction(false); + return cfd; +} + +void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd) { + assert(!cfd->pending_flush()); + cfd->Ref(); + flush_queue_.push_back(cfd); + cfd->set_pending_flush(true); +} + +ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() { + assert(!flush_queue_.empty()); + auto cfd = *flush_queue_.begin(); + flush_queue_.pop_front(); + assert(cfd->pending_flush()); + cfd->set_pending_flush(false); + return cfd; +} + +void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd) { + if (!cfd->pending_flush() && cfd->imm()->IsFlushPending()) { + AddToFlushQueue(cfd); + ++unscheduled_flushes_; + } +} + +void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { + if (!cfd->pending_compaction() && cfd->NeedsCompaction()) { + AddToCompactionQueue(cfd); + ++unscheduled_compactions_; } } @@ -1743,33 +1798,41 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context, return bg_error_; } - // call_status is failure if at least one flush was a failure. even if - // flushing one column family reports a failure, we will continue flushing - // other column families. however, call_status will be a failure in that case. - Status call_status; - // refcounting in iteration - for (auto cfd : *versions_->GetColumnFamilySet()) { - cfd->Ref(); - Status flush_status; - const MutableCFOptions mutable_cf_options = - *cfd->GetLatestMutableCFOptions(); - while (flush_status.ok() && cfd->imm()->IsFlushPending()) { - LogToBuffer( - log_buffer, - "BackgroundCallFlush doing FlushMemTableToOutputFile with column " - "family [%s], flush slots available %d", - cfd->GetName().c_str(), - db_options_.max_background_flushes - bg_flush_scheduled_); - flush_status = FlushMemTableToOutputFile( - cfd, mutable_cf_options, madeProgress, job_context, log_buffer); + ColumnFamilyData* cfd = nullptr; + while (!flush_queue_.empty()) { + // This cfd is already referenced + cfd = PopFirstFromFlushQueue(); + + if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) { + // can't flush this CF, try next one + if (cfd->Unref()) { + delete cfd; + } + continue; } - if (call_status.ok() && !flush_status.ok()) { - call_status = flush_status; - } - cfd->Unref(); + + // found a flush! + break; } - versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); - return call_status; + + Status status; + if (cfd != nullptr) { + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + LogToBuffer( + log_buffer, + "Calling FlushMemTableToOutputFile with column " + "family [%s], flush slots available %d, compaction slots available %d", + cfd->GetName().c_str(), + db_options_.max_background_flushes - bg_flush_scheduled_, + db_options_.max_background_compactions - bg_compaction_scheduled_); + status = FlushMemTableToOutputFile(cfd, mutable_cf_options, madeProgress, + job_context, log_buffer); + if (cfd->Unref()) { + delete cfd; + } + } + return status; } void DBImpl::BackgroundCallFlush() { @@ -1829,13 +1892,8 @@ void DBImpl::BackgroundCallFlush() { } bg_flush_scheduled_--; - // Any time the mutex is released After finding the work to do, another - // thread might execute MaybeScheduleFlushOrCompaction(). It is possible - // that there is a pending job but it is not scheduled because of the - // max thread limit. - if (madeProgress || bg_schedule_needed_) { - MaybeScheduleFlushOrCompaction(); - } + // See if there's more work to be done + MaybeScheduleFlushOrCompaction(); RecordFlushIOStats(); bg_cv_.SignalAll(); // IMPORTANT: there should be no code after calling SignalAll. This call may @@ -1909,17 +1967,8 @@ void DBImpl::BackgroundCallCompaction() { versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); - // Previous compaction may have produced too many files in a level, - // So reschedule another compaction if we made progress in the - // last compaction. - // - // Also, any time the mutex is released After finding the work to do, - // another thread might execute MaybeScheduleFlushOrCompaction(). It is - // possible that there is a pending job but it is not scheduled because of - // the max thread limit. - if (madeProgress || bg_schedule_needed_) { - MaybeScheduleFlushOrCompaction(); - } + // See if there's more work to be done + MaybeScheduleFlushOrCompaction(); if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) { // signal if // * madeProgress -- need to wakeup DelayWrite @@ -1964,35 +2013,28 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, } // FLUSH preempts compaction - Status flush_stat; - for (auto cfd : *versions_->GetColumnFamilySet()) { - const MutableCFOptions mutable_cf_options = - *cfd->GetLatestMutableCFOptions(); - while (cfd->imm()->IsFlushPending()) { - LogToBuffer( - log_buffer, - "BackgroundCompaction doing FlushMemTableToOutputFile, " - "compaction slots available %d", - db_options_.max_background_compactions - bg_compaction_scheduled_); - cfd->Ref(); - flush_stat = FlushMemTableToOutputFile( - cfd, mutable_cf_options, madeProgress, job_context, log_buffer); - cfd->Unref(); - if (!flush_stat.ok()) { - if (is_manual) { - manual_compaction_->status = flush_stat; - manual_compaction_->done = true; - manual_compaction_->in_progress = false; - manual_compaction_ = nullptr; - } - return flush_stat; + // TODO(icanadi) we should only do this if max_background_flushes == 0 + // BackgroundFlush() will only execute a single flush. We keep calling it as + // long as there's more flushes to be done + while (!flush_queue_.empty()) { + LogToBuffer( + log_buffer, + "BackgroundCompaction calling BackgroundFlush. flush slots available " + "%d, compaction slots available %d", + db_options_.max_background_flushes - bg_flush_scheduled_, + db_options_.max_background_compactions - bg_compaction_scheduled_); + auto flush_status = BackgroundFlush(madeProgress, job_context, log_buffer); + if (!flush_status.ok()) { + if (is_manual) { + manual_compaction_->status = flush_status; + manual_compaction_->done = true; + manual_compaction_->in_progress = false; + manual_compaction_ = nullptr; } + return flush_status; } } - // Compaction makes a copy of the latest MutableCFOptions. It should be used - // throughout the compaction procedure to make sure consistency. It will - // eventually be installed into SuperVersion unique_ptr c; InternalKey manual_end_storage; InternalKey* manual_end = &manual_end_storage; @@ -2014,22 +2056,53 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, ((m->done || manual_end == nullptr) ? "(end)" : manual_end->DebugString().c_str())); - } else { - // no need to refcount in iteration since it's always under a mutex - for (auto cfd : *versions_->GetColumnFamilySet()) { - // Pick up latest mutable CF Options and use it throughout the - // compaction job - auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); - if (!mutable_cf_options->disable_auto_compactions) { - // NOTE: try to avoid unnecessary copy of MutableCFOptions if - // compaction is not necessary. Need to make sure mutex is held - // until we make a copy in the following code - c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); - if (c != nullptr) { - // update statistics - MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, - c->inputs(0)->size()); - break; + } else if (!compaction_queue_.empty()) { + // cfd is referenced here + auto cfd = PopFirstFromCompactionQueue(); + // We unreference here because the following code will take a Ref() on + // this cfd if it is going to use it (Compaction class holds a + // reference). + // This will all happen under a mutex so we don't have to be afraid of + // somebody else deleting it. + if (cfd->Unref()) { + delete cfd; + // This was the last reference of the column family, so no need to + // compact. + return Status::OK(); + } + + // Pick up latest mutable CF Options and use it throughout the + // compaction job + // Compaction makes a copy of the latest MutableCFOptions. It should be used + // throughout the compaction procedure to make sure consistency. It will + // eventually be installed into SuperVersion + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) { + // NOTE: try to avoid unnecessary copy of MutableCFOptions if + // compaction is not necessary. Need to make sure mutex is held + // until we make a copy in the following code + c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); + if (c != nullptr) { + // update statistics + MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, + c->inputs(0)->size()); + // There are three things that can change compaction score: + // 1) When flush or compaction finish. This case is covered by + // InstallSuperVersion() + // 2) When MutableCFOptions changes. This case is also covered by + // InstallSuperVersion(), because this is when the new options take + // effect. + // 3) When we Pick a new compaction, we "remove" those files being + // compacted from the calculation, which then influences compaction + // score. Here we check if we need the new compaction even without the + // files that are currently being compacted. If we need another + // compaction, we might be able to execute it in parallel, so we add it + // to the queue and schedule a new thread. + if (cfd->NeedsCompaction()) { + // Yes, we need more compactions! + AddToCompactionQueue(cfd); + ++unscheduled_compactions_; + MaybeScheduleFlushOrCompaction(); } } } @@ -2085,8 +2158,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->ReleaseCompactionFiles(status); *madeProgress = true; } else { - MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. - auto yield_callback = [&]() { return CallFlushDuringCompaction(c->column_family_data(), *c->mutable_cf_options(), job_context, @@ -2275,7 +2346,7 @@ void DBImpl::InstallSuperVersionBackground( SuperVersion* DBImpl::InstallSuperVersion( ColumnFamilyData* cfd, SuperVersion* new_sv, - const MutableCFOptions& mutable_cf_options) { + const MutableCFOptions& mutable_cf_options, bool dont_schedule_bg_work) { mutex_.AssertHeld(); // Update max_total_in_memory_state_ @@ -2289,10 +2360,15 @@ SuperVersion* DBImpl::InstallSuperVersion( auto* old = cfd->InstallSuperVersion( new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); - // We want to schedule potential flush or compactions since new options may - // have been picked up in this new version. New options may cause flush - // compaction trigger condition to change. - MaybeScheduleFlushOrCompaction(); + // Whenever we install new SuperVersion, we might need to issue new flushes or + // compactions. dont_schedule_bg_work is true when scheduling from write + // thread and we don't want to add additional overhead. Callers promise to + // call SchedulePendingFlush() and MaybeScheduleFlushOrCompaction() eventually + if (!dont_schedule_bg_work) { + SchedulePendingFlush(cfd); + SchedulePendingCompaction(cfd); + MaybeScheduleFlushOrCompaction(); + } // Update max_total_in_memory_state_ max_total_in_memory_state_ = @@ -2848,9 +2924,10 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { break; } cfd->imm()->FlushRequested(); + SchedulePendingFlush(cfd); + context.schedule_bg_work_ = true; } } - MaybeScheduleFlushOrCompaction(); } else if (UNLIKELY(write_buffer_.ShouldFlush())) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing all column families. Write buffer is using %" PRIu64 @@ -2865,6 +2942,8 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { break; } cfd->imm()->FlushRequested(); + SchedulePendingFlush(cfd); + context.schedule_bg_work_ = true; } } MaybeScheduleFlushOrCompaction(); @@ -2986,6 +3065,10 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { } write_thread_.ExitWriteThread(&w, last_writer, status); + + if (context.schedule_bg_work_) { + MaybeScheduleFlushOrCompaction(); + } mutex_.Unlock(); if (status.IsTimedOut()) { @@ -3023,11 +3106,11 @@ Status DBImpl::DelayWrite(uint64_t expiration_time) { } Status DBImpl::ScheduleFlushes(WriteContext* context) { - bool schedule_bg_work = false; ColumnFamilyData* cfd; while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { - schedule_bg_work = true; auto status = SetNewMemtableAndNewLogFile(cfd, context); + SchedulePendingFlush(cfd); + context->schedule_bg_work_ = true; if (cfd->Unref()) { delete cfd; } @@ -3035,9 +3118,6 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { return status; } } - if (schedule_bg_work) { - MaybeScheduleFlushOrCompaction(); - } return Status::OK(); } @@ -3113,7 +3193,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, new_mem->Ref(); cfd->SetMemtable(new_mem); context->superversions_to_free_.push_back( - InstallSuperVersion(cfd, new_superversion, mutable_cf_options)); + InstallSuperVersion(cfd, new_superversion, mutable_cf_options, true)); return s; } @@ -3380,12 +3460,6 @@ Status DBImpl::DeleteFile(std::string name) { PurgeObsoleteFiles(job_context); } job_context.Clean(); - { - MutexLock l(&mutex_); - // schedule flush if file deletion means we freed the space for flushes to - // continue - MaybeScheduleFlushOrCompaction(); - } return status; } @@ -3620,7 +3694,6 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); impl->DeleteObsoleteFiles(); - impl->MaybeScheduleFlushOrCompaction(); s = impl->db_directory_->Fsync(); } } diff --git a/db/db_impl.h b/db/db_impl.h index 5e27df2c6..7a3a7984d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -362,6 +362,8 @@ class DBImpl : public DB { ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); void MaybeScheduleFlushOrCompaction(); + void SchedulePendingFlush(ColumnFamilyData* cfd); + void SchedulePendingCompaction(ColumnFamilyData* cfd); static void BGWorkCompaction(void* db); static void BGWorkFlush(void* db); void BackgroundCallCompaction(); @@ -393,6 +395,12 @@ class DBImpl : public DB { // hold the data set. Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1); + // helper functions for adding and removing from flush & compaction queues + void AddToCompactionQueue(ColumnFamilyData* cfd); + ColumnFamilyData* PopFirstFromCompactionQueue(); + void AddToFlushQueue(ColumnFamilyData* cfd); + ColumnFamilyData* PopFirstFromFlushQueue(); + // table_cache_ provides its own synchronization std::shared_ptr table_cache_; @@ -460,9 +468,32 @@ class DBImpl : public DB { // State is protected with db mutex. std::list pending_outputs_; - // At least one compaction or flush job is pending but not yet scheduled - // because of the max background thread limit. - bool bg_schedule_needed_; + // flush_queue_ and compaction_queue_ hold column families that we need to + // flush and compact, respectively. + // A column family is inserted into flush_queue_ when it satisfies condition + // cfd->imm()->IsFlushPending() + // A column family is inserted into compaction_queue_ when it satisfied + // condition cfd->NeedsCompaction() + // Column families in this list are all Ref()-erenced + // TODO(icanadi) Provide some kind of ReferencedColumnFamily class that will + // do RAII on ColumnFamilyData + // Column families are in this queue when they need to be flushed or + // compacted. Consumers of these queues are flush and compaction threads. When + // column family is put on this queue, we increase unscheduled_flushes_ and + // unscheduled_compactions_. When these variables are bigger than zero, that + // means we need to schedule background threads for compaction and thread. + // Once the background threads are scheduled, we decrease unscheduled_flushes_ + // and unscheduled_compactions_. That way we keep track of number of + // compaction and flush threads we need to schedule. This scheduling is done + // in MaybeScheduleFlushOrCompaction() + // invariant(column family present in flush_queue_ <==> + // ColumnFamilyData::pending_flush_ == true) + std::deque flush_queue_; + // invariant(column family present in compaction_queue_ <==> + // ColumnFamilyData::pending_compaction_ == true) + std::deque compaction_queue_; + int unscheduled_flushes_; + int unscheduled_compactions_; // count how many background compactions are running or have been scheduled int bg_compaction_scheduled_; @@ -553,9 +584,17 @@ class DBImpl : public DB { ColumnFamilyData* cfd, JobContext* job_context, const MutableCFOptions& mutable_cf_options); - SuperVersion* InstallSuperVersion( - ColumnFamilyData* cfd, SuperVersion* new_sv, - const MutableCFOptions& mutable_cf_options); + // All ColumnFamily state changes go through this function. Here we analyze + // the new state and we schedule background work if we detect that the new + // state needs flush or compaction. + // If dont_schedule_bg_work == true, then caller asks us to not schedule flush + // or compaction here, but it also promises to schedule needed background + // work. We use this to scheduling background compactions when we are in the + // write thread, which is very performance critical. Caller schedules + // background work as soon as it exits the write thread + SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, SuperVersion* new_sv, + const MutableCFOptions& mutable_cf_options, + bool dont_schedule_bg_work = false); // Find Super version and reference it. Based on options, it might return // the thread local cached one.