From 1e8322c0f5e33f610d77495de81af68f9a45231a Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 19 Nov 2021 09:55:10 -0800 Subject: [PATCH] Fix a bug in FlushJob picking more memtables beyond synced WALs (#9142) Summary: After RocksDB 6.19 and before this PR, RocksDB FlushJob may pick more memtables to flush beyond synced WALs. This can be problematic if there are multiple column families, since it can prematurely advance the flushed column family's log_number. Should subsequent attempts fail to sync the latest WALs and the database goes through a recovery, it may detect corrupted WAL number below the flushed column family's log number and complain about column family inconsistency. To fix, we record the maximum memtable ID of the column family being flushed. Then we call SyncClosedLogs() so that all closed WALs at the time when memtable ID is recorded will be synced. I also disabled a unit test temporarily due to reasons described in https://github.com/facebook/rocksdb/issues/9151 Pull Request resolved: https://github.com/facebook/rocksdb/pull/9142 Test Plan: make check Reviewed By: ajkr Differential Revision: D32299956 Pulled By: riversand963 fbshipit-source-id: 0da75888177d91905cf8c9d00605b73afb5970a7 --- HISTORY.md | 1 + db/db_flush_test.cc | 46 +++++++++++++++++++++++++- db/db_impl/db_impl_compaction_flush.cc | 45 +++++++++++++++++-------- 3 files changed, 78 insertions(+), 14 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 73a309d7e..27b4ae002 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -21,6 +21,7 @@ * In some cases outside of the DB read and compaction paths, SST block checksums are now checked where they were not before. * Explicitly check for and disallow the `BlockBasedTableOptions` if insertion into one of {`block_cache`, `block_cache_compressed`, `persistent_cache`} can show up in another of these. (RocksDB expects to be able to use the same key for different physical data among tiers.) * Users who configured a dedicated thread pool for bottommost compactions by explicitly adding threads to the `Env::Priority::BOTTOM` pool will no longer see RocksDB schedule automatic compactions exceeding the DB's compaction concurrency limit. For details on per-DB compaction concurrency limit, see API docs of `max_background_compactions` and `max_background_jobs`. +* Fixed a bug of background flush thread picking more memtables to flush and prematurely advancing column family's log_number. ### Behavior Changes * `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 76d066a7f..0326a2cc7 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -1199,7 +1199,7 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { ASSERT_EQ(Get(KEY5), p_v5); } -TEST_F(DBFlushTest, MemPurgeWALSupport) { +TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) { Options options = CurrentOptions(); options.statistics = CreateDBStatistics(); @@ -1858,6 +1858,50 @@ TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest2) { Destroy(options); } +TEST_F(DBFlushTest, PickRightMemtables) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + options.create_if_missing = true; + + const std::string test_cf_name = "test_cf"; + options.max_write_buffer_number = 128; + CreateColumnFamilies({test_cf_name}, options); + + Close(); + + ReopenWithColumnFamilies({kDefaultColumnFamilyName, test_cf_name}, options); + + ASSERT_OK(db_->Put(WriteOptions(), "key", "value")); + + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "key", "value")); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncClosedLogs:BeforeReLock", [&](void* /*arg*/) { + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "what", "v")); + auto* cfhi = + static_cast_with_check(handles_[1]); + assert(cfhi); + ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfhi->cfd())); + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", [&](void* arg) { + auto* job = reinterpret_cast(arg); + assert(job); + const auto& mems = job->GetMemTables(); + assert(mems.size() == 1); + assert(mems[0]); + ASSERT_EQ(1, mems[0]->GetID()); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->Flush(FlushOptions(), handles_[1])); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + class DBFlushTestBlobError : public DBFlushTest, public testing::WithParamInterface { public: diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 4a5f4a646..b1ffe632f 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -103,6 +103,8 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { if (!logs_to_sync.empty()) { mutex_.Unlock(); + assert(job_context); + for (log::Writer* log : logs_to_sync) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, @@ -125,6 +127,8 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); } + TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedLogs:BeforeReLock", + /*arg=*/nullptr); mutex_.Lock(); // "number <= current_log_number - 1" is equivalent to @@ -153,14 +157,34 @@ Status DBImpl::FlushMemTableToOutputFile( Env::Priority thread_pri) { mutex_.AssertHeld(); assert(cfd); + assert(cfd->imm()); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); + assert(versions_); + assert(versions_->GetColumnFamilySet()); + // If there are more than one column families, we need to make sure that + // all the log files except the most recent one are synced. Otherwise if + // the host crashes after flushing and before WAL is persistent, the + // flushed SST may contain data from write batches whose updates to + // other (unflushed) column families are missing. + const bool needs_to_sync_closed_wals = + logfile_number_ > 0 && + versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1; + // If needs_to_sync_closed_wals is true, we need to record the current + // maximum memtable ID of this column family so that a later PickMemtables() + // call will not pick memtables whose IDs are higher. This is due to the fact + // that SyncClosedLogs() may release the db mutex, and memtable switch can + // happen for this column family in the meantime. The newly created memtables + // have their data backed by unsynced WALs, thus they cannot be included in + // this flush job. + uint64_t max_memtable_id = needs_to_sync_closed_wals + ? cfd->imm()->GetLatestMemTableID() + : port::kMaxUint64; FlushJob flush_job( - dbname_, cfd, immutable_db_options_, mutable_cf_options, - port::kMaxUint64 /* memtable_id */, file_options_for_compaction_, - versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, - earliest_write_conflict_snapshot, snapshot_checker, job_context, - log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U), + dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, + file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, + job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri, @@ -176,13 +200,7 @@ Status DBImpl::FlushMemTableToOutputFile( Status s; bool need_cancel = false; IOStatus log_io_s = IOStatus::OK(); - if (logfile_number_ > 0 && - versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) { - // If there are more than one column families, we need to make sure that - // all the log files except the most recent one are synced. Otherwise if - // the host crashes after flushing and before WAL is persistent, the - // flushed SST may contain data from write batches whose updates to - // other column families are missing. + if (needs_to_sync_closed_wals) { // SyncClosedLogs() may unlock and re-lock the db_mutex. log_io_s = SyncClosedLogs(job_context); if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && @@ -201,7 +219,8 @@ Status DBImpl::FlushMemTableToOutputFile( flush_job.PickMemTable(); need_cancel = true; } - TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables"); + TEST_SYNC_POINT_CALLBACK( + "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job); bool switched_to_mempurge = false; // Within flush_job.Run, rocksdb may call event listener to notify // file creation and deletion.