From e48348d1961d76f12f0eee7cf4a82dc26667f963 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 7 Apr 2014 13:46:51 -0700 Subject: [PATCH] Make flush part of compaction process This will enable user to use only 1 background thread. --- db/db_impl.cc | 77 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 11 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index ec99a4a72..ca5141c72 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -276,9 +276,6 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { if (result.max_open_files != -1) { ClipToRange(&result.max_open_files, 20, 1000000); } - if (result.max_background_flushes == 0) { - result.max_background_flushes = 1; - } if (result.info_log == nullptr) { Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env, @@ -1895,9 +1892,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } if (is_flush_pending) { // memtable flush needed - // max_background_compactions should not be 0, because that means - // flush will never get executed - assert(options_.max_background_flushes != 0); if (bg_flush_scheduled_ < options_.max_background_flushes) { bg_flush_scheduled_++; env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); @@ -1914,10 +1908,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } } - // Schedule BGWorkCompaction if there's a compaction pending + // Schedule BGWorkCompaction if there's a compaction pending (or a memtable + // flush, but the HIGH pool is not enabled) // Do it only if max_background_compactions hasn't been reached and, in case // bg_manual_only_ > 0, if it's a manual compaction. - if ((manual_compaction_ || is_compaction_needed) && + if ((manual_compaction_ || is_compaction_needed || + (is_flush_pending && options_.max_background_flushes == 0)) && (!bg_manual_only_ || manual_compaction_)) { if (bg_compaction_scheduled_ < options_.max_background_compactions) { bg_compaction_scheduled_++; @@ -2134,14 +2130,57 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, *madeProgress = false; mutex_.AssertHeld(); - unique_ptr c; bool is_manual = (manual_compaction_ != nullptr) && (manual_compaction_->in_progress == false); + + if (is_manual) { + // another thread cannot pick up the same work + manual_compaction_->in_progress = true; + } + + // FLUSH preempts compaction + autovector to_delete; + Status flush_stat; + for (auto cfd : *versions_->GetColumnFamilySet()) { + while (cfd->imm()->IsFlushPending()) { + LogToBuffer( + log_buffer, + "BackgroundCompaction doing FlushMemTableToOutputFile, " + "compaction slots available %d", + options_.max_background_compactions - bg_compaction_scheduled_); + cfd->Ref(); + flush_stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state, + log_buffer); + if (cfd->Unref()) { + to_delete.push_back(cfd); + } + if (!flush_stat.ok()) { + if (is_manual) { + manual_compaction_->status = flush_stat; + manual_compaction_->done = true; + manual_compaction_->in_progress = false; + manual_compaction_ = nullptr; + } + break; + } + } + if (!flush_stat.ok()) { + break; + } + } + for (auto cfd : to_delete) { + delete cfd; + } + if (!flush_stat.ok()) { + return flush_stat; + } + + unique_ptr c; InternalKey manual_end_storage; InternalKey* manual_end = &manual_end_storage; if (is_manual) { ManualCompaction* m = manual_compaction_; - m->in_progress = true; + assert(m->in_progress); c.reset(m->cfd->CompactRange(m->input_level, m->output_level, m->begin, m->end, &manual_end)); if (!c) { @@ -2510,7 +2549,23 @@ Status DBImpl::ProcessKeyValueCompaction( compaction_filter = compaction_filter_from_factory.get(); } - for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { + while (input->Valid() && !shutting_down_.Acquire_Load() && + !cfd->IsDropped()) { + // FLUSH preempts compaction + // TODO(icanadi) this currently only checks if flush is necessary on + // compacting column family. we should also check if flush is necessary on + // other column families, too + if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) { + const uint64_t imm_start = env_->NowMicros(); + mutex_.Lock(); + if (cfd->imm()->IsFlushPending()) { + FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer); + bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + } + mutex_.Unlock(); + imm_micros += (env_->NowMicros() - imm_start); + } + Slice key; Slice value; // If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.