From f4790bdd1b6fc909e7f01d3f7a72bafeb6eecc48 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 23 Mar 2022 19:41:31 -0700 Subject: [PATCH] Fix a race condition in WAL tracking causing DB open failure (#9715) Summary: There is a race condition if WAL tracking in the MANIFEST is enabled in a database that disables 2PC. The race condition is between two background flush threads trying to install flush results to the MANIFEST. Consider an example database with two column families: "default" (cfd0) and "cf1" (cfd1). Initially, both column families have one mutable (active) memtable whose data backed by 6.log. 1. Trigger a manual flush for "cf1", creating a 7.log 2. Insert another key to "default", and trigger flush for "default", creating 8.log 3. BgFlushThread1 finishes writing 9.sst 4. BgFlushThread2 finishes writing 10.sst ``` Time BgFlushThread1 BgFlushThread2 | mutex_.Lock() | precompute min_wal_to_keep as 6 | mutex_.Unlock() | mutex_.Lock() | precompute min_wal_to_keep as 6 | join MANIFEST write queue and mutex_.Unlock() | write to MANIFEST | mutex_.Lock() | cfd1->log_number = 7 | Signal bg_flush_2 and mutex_.Unlock() | wake up and mutex_.Lock() | cfd0->log_number = 8 | FindObsoleteFiles() with job_context->log_number == 7 | mutex_.Unlock() | PurgeObsoleteFiles() deletes 6.log V ``` As shown in the above, BgFlushThread2 thinks that the min wal to keep is 6.log because "cf1" has unflushed data in 6.log (cf1.log_number=6). Similarly, BgThread1 thinks that min wal to keep is also 6.log because "default" has unflushed data (default.log_number=6). No WAL deletion will be written to MANIFEST because 6 is equal to `versions_->wals_.min_wal_number_to_keep`, due to https://github.com/facebook/rocksdb/blob/7.1.fb/db/memtable_list.cc#L513:L514. The bg flush thread that finishes last will perform file purging. `job_context.log_number` will be evaluated as 7, i.e. the min wal that contains unflushed data, causing 6.log to be deleted. However, MANIFEST thinks 6.log should still exist. If you close the db at this point, you won't be able to re-open it if `track_and_verify_wal_in_manifest` is true. We must handle the case of multiple bg flush threads, and it is difficult for one bg flush thread to know the correct min wal number until the other bg flush threads have finished committing to the manifest and updated the `cfd::log_number`. To fix this issue, we rename an existing variable `min_log_number_to_keep_2pc` to `min_log_number_to_keep`, and use it to track WAL file deletion in non-2pc mode as well. This variable is updated only 1) during recovery with mutex held, or 2) in the MANIFEST write thread. `min_log_number_to_keep` means RocksDB will delete WALs below it, although there may be WALs above it which are also obsolete. Formally, we will have [min_wal_to_keep, max_obsolete_wal]. During recovery, we make sure that only WALs above max_obsolete_wal are checked and added back to `alive_log_files_`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9715 Test Plan: ``` make check ``` Also ran stress test below (with asan) to make sure it completes successfully. ``` TEST_TMPDIR=/dev/shm/rocksdb OPT=-g ASAN_OPTIONS=disable_coredump=0 \ CRASH_TEST_EXT_ARGS=--compression_type=zstd SKIP_FORMAT_BUCK_CHECKS=1 \ make J=52 -j52 blackbox_asan_crash_test ``` Reviewed By: ltamasi Differential Revision: D34984412 Pulled By: riversand963 fbshipit-source-id: c7b21a8d84751bb55ea79c9f387103d21b231005 --- HISTORY.md | 1 + db/db_impl/db_impl_files.cc | 8 +--- db/db_impl/db_impl_open.cc | 25 +++++++++-- db/db_wal_test.cc | 87 +++++++++++++++++++++++++++++++++++++ db/event_helpers.cc | 5 ++- db/memtable_list.cc | 26 ++++++----- db/version_edit_handler.cc | 7 ++- db/version_set.cc | 15 +++---- db/version_set.h | 15 ++++--- db/version_set_test.cc | 5 ++- 10 files changed, 150 insertions(+), 44 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 1bb490255..b90923585 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### Bug Fixes * Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled. * Fixed a race condition when mmaping a WritableFile on POSIX. +* Fixed a race condition when 2PC is disabled and WAL tracking in the MANIFEST is enabled. The race condition is between two background flush threads trying to install flush results, causing a WAL deletion not tracked in the MANIFEST. A future DB open may fail. ## 6.29.4 (03/22/2022) ### Bug Fixes diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index b50380d92..1790ed836 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -23,11 +23,7 @@ namespace ROCKSDB_NAMESPACE { uint64_t DBImpl::MinLogNumberToKeep() { - if (allow_2pc()) { - return versions_->min_log_number_to_keep_2pc(); - } else { - return versions_->MinLogNumberWithUnflushedData(); - } + return versions_->min_log_number_to_keep(); } uint64_t DBImpl::MinObsoleteSstNumberToKeep() { @@ -224,7 +220,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } // Add log files in wal_dir - if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) { std::vector log_files; Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files); @@ -234,6 +229,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, log_file, immutable_db_options_.wal_dir); } } + // Add info log files in db_log_dir if (!immutable_db_options_.db_log_dir.empty() && immutable_db_options_.db_log_dir != dbname_) { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 98d53a7d8..ec5fa6796 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -864,6 +864,11 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, bool flushed = false; uint64_t corrupted_wal_number = kMaxSequenceNumber; uint64_t min_wal_number = MinLogNumberToKeep(); + if (!allow_2pc()) { + // In non-2pc mode, we skip WALs that do not back unflushed data. + min_wal_number = + std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData()); + } for (auto wal_number : wal_numbers) { if (wal_number < min_wal_number) { ROCKS_LOG_INFO(immutable_db_options_.info_log, @@ -1268,9 +1273,16 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, } std::unique_ptr wal_deletion; - if (immutable_db_options_.track_and_verify_wals_in_manifest) { - wal_deletion.reset(new VersionEdit); - wal_deletion->DeleteWalsBefore(max_wal_number + 1); + if (flushed) { + wal_deletion = std::make_unique(); + if (immutable_db_options_.track_and_verify_wals_in_manifest) { + wal_deletion->DeleteWalsBefore(max_wal_number + 1); + } + if (!allow_2pc()) { + // In non-2pc mode, flushing the memtables of the column families + // means we can advance min_log_number_to_keep. + wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1); + } edit_lists.back().push_back(wal_deletion.get()); } @@ -1349,7 +1361,14 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { // FindObsoleteFiles() total_log_size_ = 0; log_empty_ = false; + uint64_t min_wal_with_unflushed_data = + versions_->MinLogNumberWithUnflushedData(); for (auto wal_number : wal_numbers) { + if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) { + // In non-2pc mode, the WAL files not backing unflushed data are not + // alive, thus should not be added to the alive_log_files_. + continue; + } // We preallocate space for wals, but then after a crash and restart, those // preallocated space are not needed anymore. It is likely only the last // log has such preallocated space, so we only truncate for the last log. diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index b24a5c633..cec2d4a3d 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1481,6 +1481,93 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) { ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options)); } +TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) { + Options options = CurrentOptions(); + options.env = env_; + options.track_and_verify_wals_in_manifest = true; + // The following make sure there are two bg flush threads. + options.max_background_jobs = 8; + + const std::string cf1_name("cf1"); + CreateAndReopenWithCF({cf1_name}, options); + assert(handles_.size() == 2); + + { + dbfull()->TEST_LockMutex(); + ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes); + dbfull()->TEST_UnlockMutex(); + } + + ASSERT_OK(dbfull()->PauseBackgroundWork()); + + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value")); + ASSERT_OK(db_->Put(WriteOptions(), "foo", "value")); + + ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[1])); + + ASSERT_OK(db_->Put(WriteOptions(), "foo", "value")); + ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[0])); + + bool called = false; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + // This callback will be called when the first bg flush thread reaches the + // point before entering the MANIFEST write queue after flushing the SST + // file. + // The purpose of the sync points here is to ensure both bg flush threads + // finish computing `min_wal_number_to_keep` before any of them updates the + // `log_number` for the column family that's being flushed. + SyncPoint::GetInstance()->SetCallBack( + "MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep", + [&](void* /*arg*/) { + dbfull()->mutex()->AssertHeld(); + if (!called) { + // We are the first bg flush thread in the MANIFEST write queue. + // We set up the dependency between sync points for two threads that + // will be executing the same code. + // For the interleaving of events, see + // https://github.com/facebook/rocksdb/pull/9715. + // bg flush thread1 will release the db mutex while in the MANIFEST + // write queue. In the meantime, bg flush thread2 locks db mutex and + // computes the min_wal_number_to_keep (before thread1 writes to + // MANIFEST thus before cf1->log_number is updated). Bg thread2 joins + // the MANIFEST write queue afterwards and bg flush thread1 proceeds + // with writing to MANIFEST. + called = true; + SyncPoint::GetInstance()->LoadDependency({ + {"VersionSet::LogAndApply:WriteManifestStart", + "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"}, + {"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2", + "VersionSet::LogAndApply:WriteManifest"}, + }); + } else { + // The other bg flush thread has already been in the MANIFEST write + // queue, and we are after. + TEST_SYNC_POINT( + "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(dbfull()->ContinueBackgroundWork()); + + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0])); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); + + ASSERT_TRUE(called); + + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + DB* db1 = nullptr; + Status s = DB::OpenForReadOnly(options, dbname_, &db1); + ASSERT_OK(s); + assert(db1); + delete db1; +} + // Test scope: // - We expect to open data store under all circumstances // - We expect only data upto the point where the first error was encountered diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 88bf8cc69..3ec0e8da1 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -95,8 +95,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( jwriter << "cf_name" << cf_name << "job" << job_id << "event" << "table_file_creation" << "file_number" << fd.GetNumber() << "file_size" - << fd.GetFileSize() << "file_checksum" << file_checksum - << "file_checksum_func_name" << file_checksum_func_name; + << fd.GetFileSize() << "file_checksum" + << Slice(file_checksum).ToString(true) << "file_checksum_func_name" + << file_checksum_func_name; // table_properties { diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 0955be675..bd186441d 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -494,8 +494,8 @@ Status MemTableList::TryInstallMemtableFlushResults( // TODO(myabandeh): Not sure how batch_count could be 0 here. if (batch_count > 0) { uint64_t min_wal_number_to_keep = 0; + assert(edit_list.size() > 0); if (vset->db_options()->allow_2pc) { - assert(edit_list.size() > 0); // Note that if mempurge is successful, the edit_list will // not be applicable (contains info of new min_log number to keep, // and level 0 file path of SST file created during normal flush, @@ -504,23 +504,26 @@ Status MemTableList::TryInstallMemtableFlushResults( min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( vset, *cfd, edit_list, memtables_to_flush, prep_tracker); - // We piggyback the information of earliest log file to keep in the + // We piggyback the information of earliest log file to keep in the // manifest entry for the last file flushed. - edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep); + } else { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list); } + edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep); std::unique_ptr wal_deletion; if (vset->db_options()->track_and_verify_wals_in_manifest) { - if (!vset->db_options()->allow_2pc) { - min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list); - } if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { wal_deletion.reset(new VersionEdit); wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); edit_list.push_back(wal_deletion.get()); } + TEST_SYNC_POINT_CALLBACK( + "MemTableList::TryInstallMemtableFlushResults:" + "AfterComputeMinWalToKeep", + nullptr); } const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, @@ -805,15 +808,14 @@ Status InstallMemtableAtomicFlushResults( if (vset->db_options()->allow_2pc) { min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( vset, cfds, edit_lists, mems_list, prep_tracker); - edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep); + } else { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists); } + edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep); std::unique_ptr wal_deletion; if (vset->db_options()->track_and_verify_wals_in_manifest) { - if (!vset->db_options()->allow_2pc) { - min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists); - } if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { wal_deletion.reset(new VersionEdit); wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 4d44cb917..90ede07c2 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -394,7 +394,7 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader, if (s->ok()) { version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily( version_edit_params_.max_column_family_); - version_set_->MarkMinLogNumberToKeep2PC( + version_set_->MarkMinLogNumberToKeep( version_edit_params_.min_log_number_to_keep_); version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_); version_set_->MarkFileNumberUsed(version_edit_params_.log_number_); @@ -970,12 +970,11 @@ void DumpManifestHandler::CheckIterationResult(const log::Reader& reader, fprintf(stdout, "next_file_number %" PRIu64 " last_sequence %" PRIu64 " prev_log_number %" PRIu64 " max_column_family %" PRIu32 - " min_log_number_to_keep " - "%" PRIu64 "\n", + " min_log_number_to_keep %" PRIu64 "\n", version_set_->current_next_file_number(), version_set_->LastSequence(), version_set_->prev_log_number(), version_set_->column_family_set_->GetMaxColumnFamily(), - version_set_->min_log_number_to_keep_2pc()); + version_set_->min_log_number_to_keep()); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index 5e13c9532..6f0e2d55f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4113,7 +4113,7 @@ void VersionSet::Reset() { } db_id_.clear(); next_file_number_.store(2); - min_log_number_to_keep_2pc_.store(0); + min_log_number_to_keep_.store(0); manifest_file_number_ = 0; options_file_number_ = 0; pending_manifest_file_number_ = 0; @@ -4580,8 +4580,7 @@ Status VersionSet::ProcessManifestWrites( } if (last_min_log_number_to_keep != 0) { - // Should only be set in 2PC mode. - MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep); + MarkMinLogNumberToKeep(last_min_log_number_to_keep); } for (int i = 0; i < static_cast(versions.size()); ++i) { @@ -4935,7 +4934,7 @@ Status VersionSet::Recover( ",min_log_number_to_keep is %" PRIu64 "\n", manifest_path.c_str(), manifest_file_number_, next_file_number_.load(), last_sequence_.load(), log_number, prev_log_number_, - column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); + column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep()); for (auto cfd : *column_family_set_) { if (cfd->IsDropped()) { @@ -5340,9 +5339,9 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { } // Called only either from ::LogAndApply which is protected by mutex or during // recovery which is single-threaded. -void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { - if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) { - min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed); +void VersionSet::MarkMinLogNumberToKeep(uint64_t number) { + if (min_log_number_to_keep_.load(std::memory_order_relaxed) < number) { + min_log_number_to_keep_.store(number, std::memory_order_relaxed); } } @@ -5464,7 +5463,7 @@ Status VersionSet::WriteCurrentStateToManifest( // min_log_number_to_keep is for the whole db, not for specific column family. // So it does not need to be set for every column family, just need to be set once. // Since default CF can never be dropped, we set the min_log to the default CF here. - uint64_t min_log = min_log_number_to_keep_2pc(); + uint64_t min_log = min_log_number_to_keep(); if (min_log != 0) { edit.SetMinLogNumberToKeep(min_log); } diff --git a/db/version_set.h b/db/version_set.h index 481f65f01..618c78f3c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1101,8 +1101,8 @@ class VersionSet { uint64_t current_next_file_number() const { return next_file_number_.load(); } - uint64_t min_log_number_to_keep_2pc() const { - return min_log_number_to_keep_2pc_.load(); + uint64_t min_log_number_to_keep() const { + return min_log_number_to_keep_.load(); } // Allocate and return a new file number @@ -1160,7 +1160,7 @@ class VersionSet { // Mark the specified log number as deleted // REQUIRED: this is only called during single-threaded recovery or repair, or // from ::LogAndApply where the global mutex is held. - void MarkMinLogNumberToKeep2PC(uint64_t number); + void MarkMinLogNumberToKeep(uint64_t number); // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. @@ -1169,10 +1169,12 @@ class VersionSet { // Returns the minimum log number which still has data not flushed to any SST // file. // In non-2PC mode, all the log numbers smaller than this number can be safely - // deleted. + // deleted, although we still use `min_log_number_to_keep_` to determine when + // to delete a WAL file. uint64_t MinLogNumberWithUnflushedData() const { return PreComputeMinLogNumberWithUnflushedData(nullptr); } + // Returns the minimum log number which still has data not flushed to any SST // file. // Empty column families' log number is considered to be @@ -1372,9 +1374,8 @@ class VersionSet { const ImmutableDBOptions* const db_options_; std::atomic next_file_number_; // Any WAL number smaller than this should be ignored during recovery, - // and is qualified for being deleted in 2PC mode. In non-2PC mode, this - // number is ignored. - std::atomic min_log_number_to_keep_2pc_ = {0}; + // and is qualified for being deleted. + std::atomic min_log_number_to_keep_ = {0}; uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t options_file_size_; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index bb3df039b..892dbc9fd 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -3204,6 +3204,7 @@ TEST_F(VersionSetTestMissingFiles, NoFileMissing) { } TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) { + db_options_.allow_2pc = true; NewDB(); SstInfo sst(100, kDefaultColumnFamilyName, "a"); @@ -3215,12 +3216,12 @@ TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) { edit.AddFile(0, file_metas[0]); edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC); ASSERT_OK(LogAndApplyToDefaultCF(edit)); - ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC); + ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC); for (int i = 0; i < 3; i++) { CreateNewManifest(); ReopenDB(); - ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC); + ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC); } }