diff --git a/HISTORY.md b/HISTORY.md index 73a309d7e..27b4ae002 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -21,6 +21,7 @@ * In some cases outside of the DB read and compaction paths, SST block checksums are now checked where they were not before. * Explicitly check for and disallow the `BlockBasedTableOptions` if insertion into one of {`block_cache`, `block_cache_compressed`, `persistent_cache`} can show up in another of these. (RocksDB expects to be able to use the same key for different physical data among tiers.) * Users who configured a dedicated thread pool for bottommost compactions by explicitly adding threads to the `Env::Priority::BOTTOM` pool will no longer see RocksDB schedule automatic compactions exceeding the DB's compaction concurrency limit. For details on per-DB compaction concurrency limit, see API docs of `max_background_compactions` and `max_background_jobs`. +* Fixed a bug of background flush thread picking more memtables to flush and prematurely advancing column family's log_number. ### Behavior Changes * `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 76d066a7f..0326a2cc7 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -1199,7 +1199,7 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { ASSERT_EQ(Get(KEY5), p_v5); } -TEST_F(DBFlushTest, MemPurgeWALSupport) { +TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) { Options options = CurrentOptions(); options.statistics = CreateDBStatistics(); @@ -1858,6 +1858,50 @@ TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest2) { Destroy(options); } +TEST_F(DBFlushTest, PickRightMemtables) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + options.create_if_missing = true; + + const std::string test_cf_name = "test_cf"; + options.max_write_buffer_number = 128; + CreateColumnFamilies({test_cf_name}, options); + + Close(); + + ReopenWithColumnFamilies({kDefaultColumnFamilyName, test_cf_name}, options); + + ASSERT_OK(db_->Put(WriteOptions(), "key", "value")); + + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "key", "value")); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncClosedLogs:BeforeReLock", [&](void* /*arg*/) { + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "what", "v")); + auto* cfhi = + static_cast_with_check(handles_[1]); + assert(cfhi); + ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfhi->cfd())); + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", [&](void* arg) { + auto* job = reinterpret_cast(arg); + assert(job); + const auto& mems = job->GetMemTables(); + assert(mems.size() == 1); + assert(mems[0]); + ASSERT_EQ(1, mems[0]->GetID()); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->Flush(FlushOptions(), handles_[1])); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + class DBFlushTestBlobError : public DBFlushTest, public testing::WithParamInterface { public: diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 4a5f4a646..b1ffe632f 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -103,6 +103,8 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { if (!logs_to_sync.empty()) { mutex_.Unlock(); + assert(job_context); + for (log::Writer* log : logs_to_sync) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, @@ -125,6 +127,8 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); } + TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedLogs:BeforeReLock", + /*arg=*/nullptr); mutex_.Lock(); // "number <= current_log_number - 1" is equivalent to @@ -153,14 +157,34 @@ Status DBImpl::FlushMemTableToOutputFile( Env::Priority thread_pri) { mutex_.AssertHeld(); assert(cfd); + assert(cfd->imm()); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); + assert(versions_); + assert(versions_->GetColumnFamilySet()); + // If there are more than one column families, we need to make sure that + // all the log files except the most recent one are synced. Otherwise if + // the host crashes after flushing and before WAL is persistent, the + // flushed SST may contain data from write batches whose updates to + // other (unflushed) column families are missing. + const bool needs_to_sync_closed_wals = + logfile_number_ > 0 && + versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1; + // If needs_to_sync_closed_wals is true, we need to record the current + // maximum memtable ID of this column family so that a later PickMemtables() + // call will not pick memtables whose IDs are higher. This is due to the fact + // that SyncClosedLogs() may release the db mutex, and memtable switch can + // happen for this column family in the meantime. The newly created memtables + // have their data backed by unsynced WALs, thus they cannot be included in + // this flush job. + uint64_t max_memtable_id = needs_to_sync_closed_wals + ? cfd->imm()->GetLatestMemTableID() + : port::kMaxUint64; FlushJob flush_job( - dbname_, cfd, immutable_db_options_, mutable_cf_options, - port::kMaxUint64 /* memtable_id */, file_options_for_compaction_, - versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, - earliest_write_conflict_snapshot, snapshot_checker, job_context, - log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U), + dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, + file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, + job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri, @@ -176,13 +200,7 @@ Status DBImpl::FlushMemTableToOutputFile( Status s; bool need_cancel = false; IOStatus log_io_s = IOStatus::OK(); - if (logfile_number_ > 0 && - versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) { - // If there are more than one column families, we need to make sure that - // all the log files except the most recent one are synced. Otherwise if - // the host crashes after flushing and before WAL is persistent, the - // flushed SST may contain data from write batches whose updates to - // other column families are missing. + if (needs_to_sync_closed_wals) { // SyncClosedLogs() may unlock and re-lock the db_mutex. log_io_s = SyncClosedLogs(job_context); if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && @@ -201,7 +219,8 @@ Status DBImpl::FlushMemTableToOutputFile( flush_job.PickMemTable(); need_cancel = true; } - TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables"); + TEST_SYNC_POINT_CALLBACK( + "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job); bool switched_to_mempurge = false; // Within flush_job.Run, rocksdb may call event listener to notify // file creation and deletion.