From f72fd5856585774063ac3fc8926f70626963d488 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 20 Oct 2021 21:33:32 -0700 Subject: [PATCH] Fix atomic flush waiting forever for MANIFEST write (#9034) Summary: In atomic flush, concurrent background flush threads will commit to the MANIFEST one by one, in the order of the IDs of their picked memtables for all included column families. Each time, a background flush thread decides whether to wait based on two criteria: - Is db stopped? If so, don't wait. - Am I the one to commit the currently earliest memtable? If so, don't wait and ready to go. When atomic flush was implemented, error writing to or syncing the MANIFEST would cause the db to be stopped. Therefore, this background thread does not have to check for the background error while waiting. If there has been such an error, `DBStopped()` would have been true, and this thread will **not** wait forever. After we improved error handling, RocksDB may map an IOError while writing to MANIFEST to a soft error, if there is no WAL. This requires the background threads to check for background error while waiting. Otherwise, a background flush thread may wait forever. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9034 Test Plan: make check Reviewed By: zhichao-cao Differential Revision: D31639225 Pulled By: riversand963 fbshipit-source-id: e9ab07c4d8f2eade238adeefe3e42dd9a5a3ebbd --- HISTORY.md | 1 + db/db_flush_test.cc | 116 +++++++++++++++++++++++++ db/db_impl/db_impl_compaction_flush.cc | 46 ++++++++-- 3 files changed, 155 insertions(+), 8 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 86d1829bf..e20260798 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ * Fixed a possible race condition impacting users of `WriteBufferManager` who constructed it with `allow_stall == true`. The race condition led to undefined behavior (in our experience, typically a process crash). * Fixed a bug where stalled writes would remain stalled forever after the user calls `WriteBufferManager::SetBufferSize()` with `new_size == 0` to dynamically disable memory limiting. * Make `DB::close()` thread-safe. +* Fix a bug in atomic flush where one bg flush thread will wait forever for a preceding bg flush thread to commit its result to MANIFEST but encounters an error which is mapped to a soft error (DB not stopped). ### New Features * Print information about blob files when using "ldb list_live_files_metadata" diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index a5bda71c8..fc9725516 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -2440,6 +2440,122 @@ TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) { SyncPoint::GetInstance()->ClearAllCallBacks(); } +// In atomic flush, concurrent bg flush threads commit to the MANIFEST in +// serial, in the order of their picked memtables for each column family. +// Only when a bg flush thread finds out that its memtables are the earliest +// unflushed ones for all the included column families will this bg flush +// thread continue to commit to MANIFEST. +// This unit test uses sync point to coordinate the execution of two bg threads +// executing the same sequence of functions. The interleaving are as follows. +// time bg1 bg2 +// | pick memtables to flush +// | flush memtables cf1_m1, cf2_m1 +// | join MANIFEST write queue +// | pick memtabls to flush +// | flush memtables cf1_(m1+1) +// | join MANIFEST write queue +// | wait to write MANIFEST +// | write MANIFEST +// | IO error +// | detect IO error and stop waiting +// V +TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + auto fault_injection_env = std::make_shared(env_); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.atomic_flush = true; + options.env = fault_injection_env.get(); + // Set a larger value than default so that RocksDB can schedule concurrent + // background flush threads. + options.max_background_jobs = 8; + options.max_write_buffer_number = 8; + CreateAndReopenWithCF({"pikachu"}, options); + + assert(2 == handles_.size()); + + WriteOptions write_opts; + write_opts.disableWAL = true; + + ASSERT_OK(Put(0, "a", "v_0_a", write_opts)); + ASSERT_OK(Put(1, "a", "v_1_a", write_opts)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + SyncPoint::GetInstance()->LoadDependency({ + {"BgFlushThr2:WaitToCommit", "BgFlushThr1:BeforeWriteManifest"}, + }); + + std::thread::id bg_flush_thr1, bg_flush_thr2; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:start", [&](void*) { + if (bg_flush_thr1 == std::thread::id()) { + bg_flush_thr1 = std::this_thread::get_id(); + } else if (bg_flush_thr2 == std::thread::id()) { + bg_flush_thr2 = std::this_thread::get_id(); + } + }); + + int called = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg) { + if (std::this_thread::get_id() == bg_flush_thr2) { + const auto* ptr = reinterpret_cast*>(arg); + assert(ptr); + if (0 == called) { + // When bg flush thread 2 reaches here for the first time. + ASSERT_OK(ptr->first); + ASSERT_TRUE(ptr->second); + } else if (1 == called) { + // When bg flush thread 2 reaches here for the second time. + ASSERT_TRUE(ptr->first.IsIOError()); + ASSERT_FALSE(ptr->second); + } + ++called; + TEST_SYNC_POINT("BgFlushThr2:WaitToCommit"); + } + }); + + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0", + [&](void*) { + if (std::this_thread::get_id() == bg_flush_thr1) { + TEST_SYNC_POINT("BgFlushThr1:BeforeWriteManifest"); + } + }); + + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void*) { + if (std::this_thread::get_id() != bg_flush_thr1) { + return; + } + ASSERT_OK(db_->Put(write_opts, "b", "v_1_b")); + + FlushOptions flush_opts; + flush_opts.wait = false; + std::vector cfhs(1, db_->DefaultColumnFamily()); + ASSERT_OK(dbfull()->Flush(flush_opts, cfhs)); + }); + + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) { + auto* ptr = reinterpret_cast(arg); + assert(ptr); + *ptr = IOStatus::IOError("Injected failure"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_TRUE(dbfull()->Flush(FlushOptions(), handles_).IsIOError()); + + Close(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 18226284b..bb56c64a1 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -559,7 +559,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } if (s.ok()) { - auto wait_to_install_func = [&]() { + const auto wait_to_install_func = + [&]() -> std::pair { + if (!versions_->io_status().ok()) { + // Something went wrong elsewhere, we cannot count on waiting for our + // turn to write/sync to MANIFEST or CURRENT. Just return. + return std::make_pair(versions_->io_status(), false); + } else if (shutting_down_.load(std::memory_order_acquire)) { + return std::make_pair(Status::ShutdownInProgress(), false); + } bool ready = true; for (size_t i = 0; i != cfds.size(); ++i) { const auto& mems = jobs[i]->GetMemTables(); @@ -583,18 +591,40 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( break; } } - return ready; + return std::make_pair(Status::OK(), !ready); }; bool resuming_from_bg_err = error_handler_.IsDBStopped(); - while ((!error_handler_.IsDBStopped() || - error_handler_.GetRecoveryError().ok()) && - !wait_to_install_func()) { + while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) { + std::pair res = wait_to_install_func(); + + TEST_SYNC_POINT_CALLBACK( + "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", &res); + + if (!res.first.ok()) { + s = res.first; + break; + } else if (!res.second) { + break; + } atomic_flush_install_cv_.Wait(); + + resuming_from_bg_err = error_handler_.IsDBStopped(); } - s = resuming_from_bg_err ? error_handler_.GetRecoveryError() - : error_handler_.GetBGError(); + if (!resuming_from_bg_err) { + // If not resuming from bg err, then we determine future action based on + // whether we hit background error. + if (s.ok()) { + s = error_handler_.GetBGError(); + } + } else if (s.ok()) { + // If resuming from bg err, we still rely on wait_to_install_func()'s + // result to determine future action. If wait_to_install_func() returns + // non-ok already, then we should not proceed to flush result + // installation. + s = error_handler_.GetRecoveryError(); + } } if (s.ok()) { @@ -2653,7 +2683,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { bool made_progress = false; JobContext job_context(next_job_id_.fetch_add(1), true); - TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start"); + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCallFlush:start", nullptr); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log.get());