From a07175af6500275e8a79c4edcf493d9769838da5 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 3 Jan 2019 20:53:52 -0800 Subject: [PATCH] Refactor atomic flush result installation to MANIFEST (#4791) Summary: as titled. Since different bg flush threads can flush different sets of column families (due to column family creation and drop), we decide not to let one thread perform atomic flush result installation for other threads. Bg flush threads will install their atomic flush results sequentially to MANIFEST, using a conditional variable, i.e. atomic_flush_install_cv_ to coordinate. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4791 Differential Revision: D13498930 Pulled By: riversand963 fbshipit-source-id: dd7482fc41f4bd22dad1e1ef7d4764ef424688d7 --- db/db_impl.cc | 2 +- db/db_impl.h | 19 +- db/db_impl_compaction_flush.cc | 74 +++++-- db/flush_job_test.cc | 9 +- db/memtable.h | 14 +- db/memtable_list.cc | 323 +++++++++-------------------- db/memtable_list.h | 51 +++-- db/memtable_list_test.cc | 363 +++++++++------------------------ db/version_set_test.cc | 4 +- 9 files changed, 314 insertions(+), 545 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 9c485bd53..cdc66701b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -220,7 +220,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, preserve_deletes_(options.preserve_deletes), closed_(false), error_handler_(this, immutable_db_options_, &mutex_), - atomic_flush_commit_in_progress_(false) { + atomic_flush_install_cv_(&mutex_) { // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); diff --git a/db/db_impl.h b/db/db_impl.h index d6e62bcd7..760c5a672 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -1630,15 +1630,16 @@ class DBImpl : public DB { ErrorHandler error_handler_; - // True if the DB is committing atomic flush. - // TODO (yanqin) the current impl assumes that the entire DB belongs to - // a single atomic flush group. In the future we need to add a new class - // (struct) similar to the following to make it more general. - // struct AtomicFlushGroup { - // bool commit_in_progress_; - // std::vector imm_lists; - // }; - bool atomic_flush_commit_in_progress_; + // Conditional variable to coordinate installation of atomic flush results. + // With atomic flush, each bg thread installs the result of flushing multiple + // column families, and different threads can flush different column + // families. It's difficult to rely on one thread to perform batch + // installation for all threads. This is different from the non-atomic flush + // case. + // atomic_flush_install_cv_ makes sure that threads install atomic flush + // results sequentially. Flush results of memtables with lower IDs get + // installed to MANIFEST first. + InstrumentedCondVar atomic_flush_install_cv_; }; extern Options SanitizeOptions(const std::string& db, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 95f24d7f5..feac06461 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -404,34 +404,65 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } } } + } - if (s.ok()) { - autovector*> mems_list; - for (int i = 0; i != num_cfs; ++i) { - if (cfds[i]->IsDropped()) { - continue; - } + if (s.ok()) { + auto wait_to_install_func = [&]() { + bool ready = true; + for (size_t i = 0; i != cfds.size(); ++i) { const auto& mems = jobs[i].GetMemTables(); - mems_list.emplace_back(&mems); - } - autovector all_cfds; - autovector imm_lists; - autovector mutable_cf_options_list; - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { + if (cfds[i]->IsDropped()) { + // If the column family is dropped, then do not wait. continue; + } else if (!mems.empty() && + cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) { + // If a flush job needs to install the flush result for mems and + // mems[0] is not the earliest memtable, it means another thread must + // be installing flush results for the same column family, then the + // current thread needs to wait. + ready = false; + break; + } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <= + bg_flush_args[i].max_memtable_id_) { + // If a flush job does not need to install flush results, then it has + // to wait until all memtables up to max_memtable_id_ (inclusive) are + // installed. + ready = false; + break; } - all_cfds.emplace_back(cfd); - imm_lists.emplace_back(cfd->imm()); - mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions()); } + return ready; + }; - s = MemTableList::TryInstallMemtableFlushResults( - imm_lists, all_cfds, mutable_cf_options_list, mems_list, - &atomic_flush_commit_in_progress_, &logs_with_prep_tracker_, - versions_.get(), &mutex_, file_meta, &job_context->memtables_to_free, - directories_.GetDbDir(), log_buffer); + bool resuming_from_bg_err = error_handler_.IsDBStopped(); + while ((!error_handler_.IsDBStopped() || + error_handler_.GetRecoveryError().ok()) && + !wait_to_install_func()) { + atomic_flush_install_cv_.Wait(); } + + s = resuming_from_bg_err ? error_handler_.GetRecoveryError() + : error_handler_.GetBGError(); + } + + if (s.ok()) { + autovector tmp_cfds; + autovector*> mems_list; + autovector mutable_cf_options_list; + for (int i = 0; i != num_cfs; ++i) { + const auto& mems = jobs[i].GetMemTables(); + if (!cfds[i]->IsDropped() && !mems.empty()) { + tmp_cfds.emplace_back(cfds[i]); + mems_list.emplace_back(&mems); + mutable_cf_options_list.emplace_back( + cfds[i]->GetLatestMutableCFOptions()); + } + } + + s = InstallMemtableAtomicFlushResults( + nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list, + versions_.get(), &mutex_, file_meta, &job_context->memtables_to_free, + directories_.GetDbDir(), log_buffer); } if (s.ok() || s.IsShutdownInProgress()) { @@ -2104,6 +2135,7 @@ void DBImpl::BackgroundCallFlush() { bg_flush_scheduled_--; // See if there's more work to be done MaybeScheduleFlushOrCompaction(); + atomic_flush_install_cv_.SignalAll(); bg_cv_.SignalAll(); // IMPORTANT: there should be no code after calling SignalAll. This call may // signal the DB destructor that it's OK to proceed with destruction. In diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 519e01f2a..5ac5f2f93 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -279,7 +279,6 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); mem->SetID(i); mem->Ref(); - mem->TEST_AtomicFlushSequenceNumber() = 123; for (size_t j = 0; j != num_keys_per_memtable; ++j) { std::string key(ToString(j + i * num_keys_per_memtable)); @@ -325,17 +324,13 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { const auto& mems = flush_jobs[i].GetMemTables(); mems_list.push_back(&mems); } - autovector imm_lists; autovector mutable_cf_options_list; for (auto cfd : all_cfds) { - imm_lists.push_back(cfd->imm()); mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); } - bool atomic_flush_commit_in_progress = false; - Status s = MemTableList::TryInstallMemtableFlushResults( - imm_lists, all_cfds, mutable_cf_options_list, mems_list, - &atomic_flush_commit_in_progress, nullptr /* logs_prep_tracker */, + Status s = InstallMemtableAtomicFlushResults( + nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list, versions_.get(), &mutex_, file_metas, &job_context.memtables_to_free, nullptr /* db_directory */, nullptr /* log_buffer */); ASSERT_OK(s); diff --git a/db/memtable.h b/db/memtable.h index 93a4a3ea5..46a746ad6 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -386,14 +386,16 @@ class MemTable { uint64_t GetID() const { return id_; } - SequenceNumber& TEST_AtomicFlushSequenceNumber() { - return atomic_flush_seqno_; + void SetFlushCompleted(bool completed) { flush_completed_ = completed; } + + uint64_t GetFileNumber() const { return file_number_; } + + void SetFileNumber(uint64_t file_num) { file_number_ = file_num; } + + void SetFlushInProgress(bool in_progress) { + flush_in_progress_ = in_progress; } - void TEST_SetFlushCompleted(bool completed) { flush_completed_ = completed; } - - void TEST_SetFileNumber(uint64_t file_num) { file_number_ = file_num; } - private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 36c0a8f1d..459d392d5 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -260,228 +260,6 @@ void MemTableListVersion::TrimHistory(autovector* to_delete) { } } -// Try to record multiple successful flush to the MANIFEST as an atomic unit. -// This function may just return Status::OK if there has already been -// a concurrent thread performing actual recording. -Status MemTableList::TryInstallMemtableFlushResults( - autovector& imm_lists, - const autovector& cfds, - const autovector& mutable_cf_options_list, - const autovector*>& mems_list, - bool* atomic_flush_commit_in_progress, LogsWithPrepTracker* prep_tracker, - VersionSet* vset, InstrumentedMutex* mu, - const autovector& file_metas, - autovector* to_delete, Directory* db_directory, - LogBuffer* log_buffer) { - AutoThreadOperationStageUpdater stage_updater( - ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); - mu->AssertHeld(); - - for (size_t k = 0; k != mems_list.size(); ++k) { - for (size_t i = 0; i != mems_list[k]->size(); ++i) { - assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0); - (*mems_list[k])[i]->flush_completed_ = true; - (*mems_list[k])[i]->file_number_ = file_metas[k].fd.GetNumber(); - } - } - - assert(atomic_flush_commit_in_progress != nullptr); - Status s; - if (*atomic_flush_commit_in_progress) { - // If the function reaches here, there must be a concurrent thread that - // have already started recording to MANIFEST. Therefore we should just - // return Status::OK and let the othe thread finish writing to MANIFEST on - // our behalf. - return s; - } - - // If the function reaches here, the current thread will start writing to - // MANIFEST. It may record to MANIFEST the flush results of other flushes. - *atomic_flush_commit_in_progress = true; - - auto comp = [&imm_lists](size_t lh, size_t rh) { - const auto& memlist1 = imm_lists[lh]->current_->memlist_; - const auto& memlist2 = imm_lists[rh]->current_->memlist_; - auto it1 = memlist1.rbegin(); - auto it2 = memlist2.rbegin(); - return (*it1)->atomic_flush_seqno_ > (*it2)->atomic_flush_seqno_; - }; - // The top of the heap is the memtable with smallest atomic_flush_seqno_. - std::priority_queue, decltype(comp)> heap(comp); - // Sequence number of the oldest unfinished atomic flush. - SequenceNumber min_unfinished_seqno = kMaxSequenceNumber; - // Populate the heap with first element of each imm iff. it has been - // flushed to storage, i.e. flush_completed_ is true. - size_t num = imm_lists.size(); - assert(num == cfds.size()); - for (size_t i = 0; i != num; ++i) { - std::list& memlist = imm_lists[i]->current_->memlist_; - if (memlist.empty()) { - continue; - } - auto it = memlist.rbegin(); - if ((*it)->flush_completed_) { - heap.emplace(i); - } else if (min_unfinished_seqno > (*it)->atomic_flush_seqno_) { - min_unfinished_seqno = (*it)->atomic_flush_seqno_; - } - } - - while (s.ok() && !heap.empty()) { - autovector batch; - SequenceNumber seqno = kMaxSequenceNumber; - // Pop from the heap the memtables that belong to the same atomic flush, - // namely their atomic_flush_seqno_ are equal. - do { - size_t pos = heap.top(); - const auto& memlist = imm_lists[pos]->current_->memlist_; - MemTable* mem = *(memlist.rbegin()); - if (seqno == kMaxSequenceNumber) { - // First mem in this batch. - seqno = mem->atomic_flush_seqno_; - batch.emplace_back(pos); - heap.pop(); - } else if (mem->atomic_flush_seqno_ == seqno) { - // mem has the same atomic_flush_seqno_, thus in the same atomic flush. - batch.emplace_back(pos); - heap.pop(); - } else if (mem->atomic_flush_seqno_ > seqno) { - // mem belongs to another atomic flush with higher seqno, break the - // loop. - break; - } - } while (!heap.empty()); - if (seqno >= min_unfinished_seqno) { - // If there is an older, unfinished atomic flush, then we should not - // proceed. - TEST_SYNC_POINT_CALLBACK( - "MemTableList::TryInstallMemtableFlushResults:" - "HasOlderUnfinishedAtomicFlush:0", - nullptr); - break; - } - - // Found the earliest, complete atomic flush. No earlier atomic flush is - // pending. Therefore ready to record it to the MANIFEST. - uint32_t num_entries = 0; - autovector tmp_cfds; - autovector tmp_mutable_cf_options_list; - std::vector> memtables_to_flush; - autovector> edit_lists; - for (auto pos : batch) { - tmp_cfds.emplace_back(cfds[pos]); - tmp_mutable_cf_options_list.emplace_back(mutable_cf_options_list[pos]); - const auto& memlist = imm_lists[pos]->current_->memlist_; - uint64_t batch_file_number = 0; - autovector tmp_mems; - autovector edits; - for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { - MemTable* m = *it; - if (!m->flush_completed_ || - (it != memlist.rbegin() && m->file_number_ != batch_file_number)) { - break; - } - if (it == memlist.rbegin()) { - batch_file_number = m->file_number_; - edits.push_back(m->GetEdits()); - ++num_entries; - } - tmp_mems.push_back(m); - } - edit_lists.push_back(edits); - memtables_to_flush.push_back(tmp_mems); - } - TEST_SYNC_POINT_CALLBACK( - "MemTableList::TryInstallMemtableFlushResults:FoundBatchToCommit:0", - &num_entries); - - // Mark the version edits as an atomic group - uint32_t remaining = num_entries; - for (auto& edit_list : edit_lists) { - assert(edit_list.size() == 1); - edit_list[0]->MarkAtomicGroup(--remaining); - } - assert(remaining == 0); - - size_t batch_sz = batch.size(); - assert(batch_sz > 0); - assert(batch_sz == memtables_to_flush.size()); - assert(batch_sz == tmp_cfds.size()); - assert(batch_sz == edit_lists.size()); - - if (vset->db_options()->allow_2pc) { - for (size_t i = 0; i != batch_sz; ++i) { - auto& edit_list = edit_lists[i]; - assert(!edit_list.empty()); - edit_list.back()->SetMinLogNumberToKeep( - PrecomputeMinLogNumberToKeep(vset, *tmp_cfds[i], edit_list, - memtables_to_flush[i], prep_tracker)); - } - } - // this can release and reacquire the mutex. - s = vset->LogAndApply(tmp_cfds, tmp_mutable_cf_options_list, edit_lists, mu, - db_directory); - - for (const auto pos : batch) { - imm_lists[pos]->InstallNewVersion(); - } - - if (s.ok() || s.IsShutdownInProgress()) { - for (size_t i = 0; i != batch_sz; ++i) { - if (tmp_cfds[i]->IsDropped()) { - continue; - } - size_t pos = batch[i]; - for (auto m : memtables_to_flush[i]) { - assert(m->file_number_ > 0); - uint64_t mem_id = m->GetID(); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " done", - tmp_cfds[i]->GetName().c_str(), m->file_number_, - mem_id); - imm_lists[pos]->current_->Remove(m, to_delete); - } - } - } else { - for (size_t i = 0; i != batch_sz; ++i) { - size_t pos = batch[i]; - for (auto m : memtables_to_flush[i]) { - uint64_t mem_id = m->GetID(); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " failed", - tmp_cfds[i]->GetName().c_str(), m->file_number_, - mem_id); - m->flush_completed_ = false; - m->flush_in_progress_ = false; - m->edit_.Clear(); - m->file_number_ = 0; - imm_lists[pos]->num_flush_not_started_++; - } - imm_lists[pos]->imm_flush_needed.store(true, std::memory_order_release); - } - } - // Adjust the heap AFTER installing new MemTableListVersions because the - // compare function 'comp' needs to capture the most up-to-date state of - // imm_lists. - for (auto pos : batch) { - const auto& memlist = imm_lists[pos]->current_->memlist_; - if (!memlist.empty()) { - MemTable* mem = *(memlist.rbegin()); - if (mem->flush_completed_) { - heap.emplace(pos); - } else if (min_unfinished_seqno > mem->atomic_flush_seqno_) { - min_unfinished_seqno = mem->atomic_flush_seqno_; - } - } - } - } - - *atomic_flush_commit_in_progress = false; - return s; -} - // Returns true if there is at least one memtable on which flush has // not yet started. bool MemTableList::IsFlushPending() const { @@ -749,4 +527,105 @@ uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( return min_log; } +// Commit a successful atomic flush in the manifest file. +Status InstallMemtableAtomicFlushResults( + const autovector* imm_lists, + const autovector& cfds, + const autovector& mutable_cf_options_list, + const autovector*>& mems_list, VersionSet* vset, + InstrumentedMutex* mu, const autovector& file_metas, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); + mu->AssertHeld(); + + size_t num = mems_list.size(); + assert(cfds.size() == num); + if (imm_lists != nullptr) { + assert(imm_lists->size() == num); + } + for (size_t k = 0; k != num; ++k) { +#ifndef NDEBUG + const auto* imm = + (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k); + if (!mems_list[k]->empty()) { + assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID()); + } +#endif + for (size_t i = 0; i != mems_list[k]->size(); ++i) { + assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0); + (*mems_list[k])[i]->SetFlushCompleted(true); + (*mems_list[k])[i]->SetFileNumber(file_metas[k].fd.GetNumber()); + } + } + + Status s; + + autovector> edit_lists; + uint32_t num_entries = 0; + for (const auto mems : mems_list) { + assert(mems != nullptr); + autovector edits; + assert(!mems->empty()); + edits.emplace_back((*mems)[0]->GetEdits()); + ++num_entries; + edit_lists.emplace_back(edits); + } + // Mark the version edits as an atomic group + for (auto& edits : edit_lists) { + assert(edits.size() == 1); + edits[0]->MarkAtomicGroup(--num_entries); + } + assert(0 == num_entries); + + // this can release and reacquire the mutex. + s = vset->LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, + db_directory); + + for (size_t k = 0; k != cfds.size(); ++k) { + auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k); + imm->InstallNewVersion(); + } + + if (s.ok() || s.IsShutdownInProgress()) { + for (size_t i = 0; i != cfds.size(); ++i) { + if (cfds[i]->IsDropped()) { + continue; + } + auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); + for (auto m : *mems_list[i]) { + assert(m->GetFileNumber() > 0); + uint64_t mem_id = m->GetID(); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + mem_id); + imm->current_->Remove(m, to_delete); + } + } + } else { + for (size_t i = 0; i != cfds.size(); ++i) { + auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); + for (auto m : *mems_list[i]) { + uint64_t mem_id = m->GetID(); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " failed", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + mem_id); + m->SetFlushCompleted(false); + m->SetFlushInProgress(false); + m->GetEdits()->Clear(); + m->SetFileNumber(0); + imm->num_flush_not_started_++; + } + imm->imm_flush_needed.store(true, std::memory_order_release); + } + } + + return s; +} + } // namespace rocksdb diff --git a/db/memtable_list.h b/db/memtable_list.h index 6315167a1..be3f93562 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -31,6 +31,7 @@ class ColumnFamilyData; class InternalKeyComparator; class InstrumentedMutex; class MergeIteratorBuilder; +class MemTableList; // keeps a list of immutable memtables in a vector. the list is immutable // if refcount is bigger than one. It is used as a state for Get() and @@ -114,6 +115,18 @@ class MemTableListVersion { SequenceNumber GetEarliestSequenceNumber(bool include_history = false) const; private: + friend class MemTableList; + + friend Status InstallMemtableAtomicFlushResults( + const autovector* imm_lists, + const autovector& cfds, + const autovector& mutable_cf_options_list, + const autovector*>& mems_list, + VersionSet* vset, InstrumentedMutex* mu, + const autovector& file_meta, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer); + // REQUIRE: m is an immutable memtable void Add(MemTable* m, autovector* to_delete); // REQUIRE: m is an immutable memtable @@ -132,8 +145,6 @@ class MemTableListVersion { void UnrefMemTable(autovector* to_delete, MemTable* m); - friend class MemTableList; - // Immutable MemTables that have not yet been flushed. std::list memlist_; @@ -163,18 +174,6 @@ class MemTableListVersion { // write thread.) class MemTableList { public: - // Commit a successful atomic flush in the manifest file - static Status TryInstallMemtableFlushResults( - autovector& imm_lists, - const autovector& cfds, - const autovector& mutable_cf_options_list, - const autovector*>& mems_list, - bool* atomic_flush_commit_in_progress, LogsWithPrepTracker* prep_tracker, - VersionSet* vset, InstrumentedMutex* mu, - const autovector& file_meta, - autovector* to_delete, Directory* db_directory, - LogBuffer* log_buffer); - // A list of memtables. explicit MemTableList(int min_write_buffer_number_to_merge, int max_write_buffer_number_to_maintain) @@ -296,6 +295,16 @@ class MemTableList { } private: + friend Status InstallMemtableAtomicFlushResults( + const autovector* imm_lists, + const autovector& cfds, + const autovector& mutable_cf_options_list, + const autovector*>& mems_list, + VersionSet* vset, InstrumentedMutex* mu, + const autovector& file_meta, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer); + // DB mutex held void InstallNewVersion(); @@ -317,4 +326,18 @@ class MemTableList { size_t current_memory_usage_; }; +// Installs memtable atomic flush results. +// In most cases, imm_lists is nullptr, and the function simply uses the +// immutable memtable lists associated with the cfds. There are unit tests that +// installs flush results for external immutable memtable lists other than the +// cfds' own immutable memtable lists, e.g. MemTableLIstTest. In this case, +// imm_lists parameter is not nullptr. +extern Status InstallMemtableAtomicFlushResults( + const autovector* imm_lists, + const autovector& cfds, + const autovector& mutable_cf_options_list, + const autovector*>& mems_list, VersionSet* vset, + InstrumentedMutex* mu, const autovector& file_meta, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer); } // namespace rocksdb diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 96032a465..d67eed9fa 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -85,17 +85,46 @@ class MemTableListTest : public testing::Test { Status Mock_InstallMemtableFlushResults( MemTableList* list, const MutableCFOptions& mutable_cf_options, const autovector& m, autovector* to_delete) { - autovector lists; - lists.emplace_back(list); - autovector*> mems_list; - mems_list.emplace_back(&m); - return Mock_InstallMemtableFlushResults( - lists, {0} /* cf_ids */, {&mutable_cf_options}, mems_list, to_delete); + // Create a mock Logger + test::NullLogger logger; + LogBuffer log_buffer(DEBUG_LEVEL, &logger); + + CreateDB(); + // Create a mock VersionSet + DBOptions db_options; + ImmutableDBOptions immutable_db_options(db_options); + EnvOptions env_options; + std::shared_ptr table_cache(NewLRUCache(50000, 16)); + WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); + WriteController write_controller(10000000u); + + VersionSet versions(dbname, &immutable_db_options, env_options, + table_cache.get(), &write_buffer_manager, + &write_controller); + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); + cf_descs.emplace_back("one", ColumnFamilyOptions()); + cf_descs.emplace_back("two", ColumnFamilyOptions()); + + EXPECT_OK(versions.Recover(cf_descs, false)); + + // Create mock default ColumnFamilyData + auto column_family_set = versions.GetColumnFamilySet(); + LogsWithPrepTracker dummy_prep_tracker; + auto cfd = column_family_set->GetDefault(); + EXPECT_TRUE(nullptr != cfd); + uint64_t file_num = file_number.fetch_add(1); + // Create dummy mutex. + InstrumentedMutex mutex; + InstrumentedMutexLock l(&mutex); + return list->TryInstallMemtableFlushResults( + cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, + file_num, to_delete, nullptr, &log_buffer); } // Calls MemTableList::InstallMemtableFlushResults() and sets up all // structures needed to call this function. - Status Mock_InstallMemtableFlushResults( + Status Mock_InstallMemtableAtomicFlushResults( autovector& lists, const autovector& cf_ids, const autovector& mutable_cf_options_list, const autovector*>& mems_list, @@ -127,25 +156,6 @@ class MemTableListTest : public testing::Test { auto column_family_set = versions.GetColumnFamilySet(); LogsWithPrepTracker dummy_prep_tracker; - if (1 == cf_ids.size()) { - auto cfd = column_family_set->GetColumnFamily(cf_ids[0]); - EXPECT_TRUE(nullptr != cfd); - EXPECT_EQ(1, lists.size()); - MemTableList* list = lists[0]; - EXPECT_EQ(1, mutable_cf_options_list.size()); - const MutableCFOptions& mutable_cf_options = - *(mutable_cf_options_list.at(0)); - const autovector* mems = mems_list.at(0); - EXPECT_TRUE(nullptr != mems); - - uint64_t file_num = file_number.fetch_add(1); - // Create dummy mutex. - InstrumentedMutex mutex; - InstrumentedMutexLock l(&mutex); - return list->TryInstallMemtableFlushResults( - cfd, mutable_cf_options, *mems, &dummy_prep_tracker, &versions, - &mutex, file_num, to_delete, nullptr, &log_buffer); - } autovector cfds; for (int i = 0; i != static_cast(cf_ids.size()); ++i) { cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i])); @@ -158,13 +168,11 @@ class MemTableListTest : public testing::Test { meta.fd = FileDescriptor(file_num, 0, 0); file_metas.emplace_back(meta); } - bool atomic_flush_commit_in_progress = false; InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); - return MemTableList::TryInstallMemtableFlushResults( - lists, cfds, mutable_cf_options_list, mems_list, - &atomic_flush_commit_in_progress, &dummy_prep_tracker, &versions, - &mutex, file_metas, to_delete, nullptr, &log_buffer); + return InstallMemtableAtomicFlushResults( + &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex, + file_metas, to_delete, nullptr, &log_buffer); } }; @@ -730,18 +738,28 @@ TEST_F(MemTableListTest, FlushPendingTest) { to_delete.clear(); } -TEST_F(MemTableListTest, FlushMultipleCFsTest) { +TEST_F(MemTableListTest, EmptyAtomicFlusTest) { + autovector lists; + autovector cf_ids; + autovector options_list; + autovector*> to_flush; + autovector to_delete; + Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list, + to_flush, &to_delete); + ASSERT_OK(s); + ASSERT_TRUE(to_delete.empty()); +} + +TEST_F(MemTableListTest, AtomicFlusTest) { const int num_cfs = 3; - const int num_tables_per_cf = 5; + const int num_tables_per_cf = 2; SequenceNumber seq = 1; - Status s; auto factory = std::make_shared(); options.memtable_factory = factory; ImmutableCFOptions ioptions(options); InternalKeyComparator cmp(BytewiseComparator()); WriteBufferManager wb(options.db_write_buffer_size); - autovector to_delete; // Create MemTableLists int min_write_buffer_number_to_merge = 3; @@ -782,135 +800,72 @@ TEST_F(MemTableListTest, FlushMultipleCFsTest) { std::vector> flush_candidates(num_cfs); // Nothing to flush - for (int i = 0; i != num_cfs; ++i) { - auto list = lists[i]; + for (auto i = 0; i != num_cfs; ++i) { + auto* list = lists[i]; ASSERT_FALSE(list->IsFlushPending()); ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire)); list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]); - ASSERT_EQ(0, static_cast(flush_candidates[i].size())); + ASSERT_EQ(0, flush_candidates[i].size()); } - // Request flush even though there is nothing to flush - for (int i = 0; i != num_cfs; ++i) { - auto list = lists[i]; + for (auto i = 0; i != num_cfs; ++i) { + auto* list = lists[i]; list->FlushRequested(); ASSERT_FALSE(list->IsFlushPending()); ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire)); } - - // Add tables to column families - for (int i = 0; i != num_cfs; ++i) { - for (int j = 0; j != num_tables_per_cf; ++j) { + autovector to_delete; + // Add tables to the immutable memtalbe lists associated with column families + for (auto i = 0; i != num_cfs; ++i) { + for (auto j = 0; j != num_tables_per_cf; ++j) { lists[i]->Add(tables[i][j], &to_delete); } ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed()); ASSERT_TRUE(lists[i]->IsFlushPending()); ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire)); } - + std::vector flush_memtable_ids = {1, 1, 0}; + // +----+ + // list[0]: |0 1| + // list[1]: |0 1| + // | +--+ + // list[2]: |0| 1 + // +-+ + // Pick memtables to flush + for (auto i = 0; i != num_cfs; ++i) { + flush_candidates[i].clear(); + lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i], + &flush_candidates[i]); + ASSERT_EQ(flush_memtable_ids[i] - 0 + 1, + static_cast(flush_candidates[i].size())); + } + autovector tmp_lists; + autovector tmp_cf_ids; + autovector tmp_options_list; autovector*> to_flush; - std::vector prev_memtable_ids; - // For each column family, determine the memtables to flush - for (int k = 0; k != 4; ++k) { - std::vector flush_memtable_ids; - if (0 == k) { - // +----+ - // list[0]: |0 1| 2 3 4 - // list[1]: |0 1| 2 3 4 - // | +--+ - // list[2]: |0| 1 2 3 4 - // +-+ - flush_memtable_ids = {1, 1, 0}; - } else if (1 == k) { - // +----+ +---+ - // list[0]: |0 1| |2 3| 4 - // list[1]: |0 1| |2 3| 4 - // | +--+ +---+ - // list[2]: |0| 1 2 3 4 - // +-+ - flush_memtable_ids = {3, 3, 0}; - } else if (2 == k) { - // +-----+ +---+ - // list[0]: |0 1| |2 3| 4 - // list[1]: |0 1| |2 3| 4 - // | +---+ +---+ - // | | +-------+ - // list[2]: |0| |1 2 3| 4 - // +-+ +-------+ - flush_memtable_ids = {3, 3, 3}; - } else { - // +-----+ +---+ +-+ - // list[0]: |0 1| |2 3| |4| - // list[1]: |0 1| |2 3| |4| - // | +---+ +---+ | | - // | | +-------+ | | - // list[2]: |0| |1 2 3| |4| - // +-+ +-------+ +-+ - flush_memtable_ids = {4, 4, 4}; - } - assert(num_cfs == static_cast(flush_memtable_ids.size())); - - // Pick memtables to flush - for (int i = 0; i != num_cfs; ++i) { - flush_candidates[i].clear(); - lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i], - &flush_candidates[i]); - for (auto mem : flush_candidates[i]) { - mem->TEST_AtomicFlushSequenceNumber() = SequenceNumber(k); - } - if (prev_memtable_ids.empty()) { - ASSERT_EQ(flush_memtable_ids[i] - 0 + 1, flush_candidates[i].size()); - } else { - ASSERT_EQ(flush_memtable_ids[i] - prev_memtable_ids[i], - flush_candidates[i].size()); - } - ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed()); - ASSERT_FALSE(lists[i]->HasFlushRequested()); - if (flush_memtable_ids[i] == num_tables_per_cf - 1) { - ASSERT_FALSE( - lists[i]->imm_flush_needed.load(std::memory_order_acquire)); - } else { - ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire)); - } - } - prev_memtable_ids = flush_memtable_ids; - - if (k < 3) { - for (const auto& mems : flush_candidates) { - uint64_t file_num = file_number.fetch_add(1); - for (auto m : mems) { - m->TEST_SetFlushCompleted(true); - m->TEST_SetFileNumber(file_num); - } - } - } - - if (k == 0) { - // Rollback first pick of tables - for (int i = 0; i != num_cfs; ++i) { - auto list = lists[i]; - const auto& mems = flush_candidates[i]; - for (auto m : mems) { - m->TEST_SetFileNumber(0); - } - list->RollbackMemtableFlush(flush_candidates[i], 0); - ASSERT_TRUE(list->IsFlushPending()); - ASSERT_TRUE(list->imm_flush_needed.load(std::memory_order_acquire)); - } - prev_memtable_ids.clear(); - } - - if (k == 3) { - for (int i = 0; i != num_cfs; ++i) { - to_flush.emplace_back(&flush_candidates[i]); - } + for (auto i = 0; i != num_cfs; ++i) { + if (!flush_candidates[i].empty()) { + to_flush.push_back(&flush_candidates[i]); + tmp_lists.push_back(lists[i]); + tmp_cf_ids.push_back(i); + tmp_options_list.push_back(mutable_cf_options_list[i]); } } - - s = Mock_InstallMemtableFlushResults(lists, cf_ids, mutable_cf_options_list, - to_flush, &to_delete); + Status s = Mock_InstallMemtableAtomicFlushResults( + tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete); ASSERT_OK(s); + for (auto i = 0; i != num_cfs; ++i) { + for (auto j = 0; j != num_tables_per_cf; ++j) { + if (static_cast(j) <= flush_memtable_ids[i]) { + ASSERT_LT(0, tables[i][j]->GetFileNumber()); + } + } + ASSERT_EQ( + static_cast(num_tables_per_cf) - flush_candidates[i].size(), + lists[i]->NumNotFlushed()); + } + to_delete.clear(); for (auto list : lists) { list->current()->Unref(&to_delete); @@ -932,126 +887,6 @@ TEST_F(MemTableListTest, FlushMultipleCFsTest) { ASSERT_EQ(m, m->Unref()); delete m; } - to_delete.clear(); -} - -TEST_F(MemTableListTest, HasOlderAtomicFlush) { - const size_t num_cfs = 3; - const size_t num_memtables_per_cf = 2; - SequenceNumber seq = 1; - Status s; - - auto factory = std::make_shared(); - options.memtable_factory = factory; - ImmutableCFOptions ioptions(options); - InternalKeyComparator cmp(BytewiseComparator()); - WriteBufferManager wb(options.db_write_buffer_size); - autovector to_delete; - - // Create MemTableLists - int min_write_buffer_number_to_merge = 3; - int max_write_buffer_number_to_maintain = 7; - autovector lists; - for (size_t i = 0; i != num_cfs; ++i) { - lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge, - max_write_buffer_number_to_maintain)); - } - - autovector cf_ids; - std::vector> tables; - autovector mutable_cf_options_list; - uint32_t cf_id = 0; - for (size_t k = 0; k != num_cfs; ++k) { - std::vector elem; - mutable_cf_options_list.emplace_back(new MutableCFOptions(options)); - uint64_t memtable_id = 0; - for (int i = 0; i != num_memtables_per_cf; ++i) { - MemTable* mem = - new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb, - kMaxSequenceNumber, cf_id); - mem->SetID(memtable_id++); - mem->Ref(); - - std::string value; - - mem->Add(++seq, kTypeValue, "key1", ToString(i)); - mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN"); - mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value"); - mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM"); - mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), ""); - - elem.push_back(mem); - } - tables.emplace_back(elem); - cf_ids.push_back(cf_id++); - } - - // Add tables to column families' immutable memtable lists - for (size_t i = 0; i != num_cfs; ++i) { - for (size_t j = 0; j != num_memtables_per_cf; ++j) { - lists[i]->Add(tables[i][j], &to_delete); - } - lists[i]->FlushRequested(); - ASSERT_EQ(num_memtables_per_cf, lists[i]->NumNotFlushed()); - ASSERT_TRUE(lists[i]->IsFlushPending()); - ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire)); - } - std::vector> flush_candidates(num_cfs); - for (size_t i = 0; i != num_cfs; ++i) { - lists[i]->PickMemtablesToFlush(nullptr, &flush_candidates[i]); - for (auto m : flush_candidates[i]) { - m->TEST_AtomicFlushSequenceNumber() = 123; - } - lists[i]->RollbackMemtableFlush(flush_candidates[i], 0); - } - uint64_t memtable_id = num_memtables_per_cf - 1; - autovector other_flush_candidates; - lists[0]->PickMemtablesToFlush(&memtable_id, &other_flush_candidates); - for (auto m : other_flush_candidates) { - m->TEST_AtomicFlushSequenceNumber() = 124; - m->TEST_SetFlushCompleted(true); - m->TEST_SetFileNumber(1); - } - autovector*> to_flush; - to_flush.emplace_back(&other_flush_candidates); - bool has_older_unfinished_atomic_flush = false; - bool found_batch_to_commit = false; - - SyncPoint::GetInstance()->SetCallBack( - "MemTableList::TryInstallMemtableFlushResults:" - "HasOlderUnfinishedAtomicFlush:0", - [&](void* /*arg*/) { has_older_unfinished_atomic_flush = true; }); - SyncPoint::GetInstance()->SetCallBack( - "MemTableList::TryInstallMemtableFlushResults:FoundBatchToCommit:0", - [&](void* /*arg*/) { found_batch_to_commit = true; }); - SyncPoint::GetInstance()->EnableProcessing(); - - s = Mock_InstallMemtableFlushResults(lists, cf_ids, mutable_cf_options_list, - to_flush, &to_delete); - ASSERT_OK(s); - ASSERT_TRUE(has_older_unfinished_atomic_flush); - ASSERT_FALSE(found_batch_to_commit); - - SyncPoint::GetInstance()->ClearAllCallBacks(); - - ASSERT_TRUE(to_delete.empty()); - for (auto list : lists) { - list->current()->Unref(&to_delete); - delete list; - } - lists.clear(); - ASSERT_EQ(num_cfs * num_memtables_per_cf, to_delete.size()); - for (auto m : to_delete) { - m->Ref(); - ASSERT_EQ(m, m->Unref()); - delete m; - } - to_delete.clear(); - for (auto& opts : mutable_cf_options_list) { - delete opts; - opts = nullptr; - } - mutable_cf_options_list.clear(); } } // namespace rocksdb diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 8b478ceb0..0379bd58a 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1019,7 +1019,9 @@ TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) { auto cfd_to_drop = versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name); ASSERT_NE(nullptr, cfd_to_drop); - cfd_to_drop->Ref(); // Increase its refcount because cfd_to_drop is used later + // Increase its refcount because cfd_to_drop is used later, and we need to + // prevent it from being deleted. + cfd_to_drop->Ref(); drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID()); mutex_.Lock(); s = versions_->LogAndApply(cfd_to_drop,