diff --git a/HISTORY.md b/HISTORY.md index 739d0a25e..3af78fb16 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,8 @@ # Rocksdb Change Log +## Unreleased +### Bug Fixes +* Fixed a race condition when 2PC is disabled and WAL tracking in the MANIFEST is enabled. The race condition is between two background flush threads trying to install flush results, causing a WAL deletion not tracked in the MANIFEST. A future DB open may fail. + ## 7.1.1 (04/07/2022) ### Bug Fixes * Fix segfault in FilePrefetchBuffer with async_io as it doesn't wait for pending jobs to complete on destruction. diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index b50380d92..1790ed836 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -23,11 +23,7 @@ namespace ROCKSDB_NAMESPACE { uint64_t DBImpl::MinLogNumberToKeep() { - if (allow_2pc()) { - return versions_->min_log_number_to_keep_2pc(); - } else { - return versions_->MinLogNumberWithUnflushedData(); - } + return versions_->min_log_number_to_keep(); } uint64_t DBImpl::MinObsoleteSstNumberToKeep() { @@ -224,7 +220,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } // Add log files in wal_dir - if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) { std::vector log_files; Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files); @@ -234,6 +229,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, log_file, immutable_db_options_.wal_dir); } } + // Add info log files in db_log_dir if (!immutable_db_options_.db_log_dir.empty() && immutable_db_options_.db_log_dir != dbname_) { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 94ad8bf85..419c2e9af 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -866,6 +866,11 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, bool flushed = false; uint64_t corrupted_wal_number = kMaxSequenceNumber; uint64_t min_wal_number = MinLogNumberToKeep(); + if (!allow_2pc()) { + // In non-2pc mode, we skip WALs that do not back unflushed data. + min_wal_number = + std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData()); + } for (auto wal_number : wal_numbers) { if (wal_number < min_wal_number) { ROCKS_LOG_INFO(immutable_db_options_.info_log, @@ -1270,9 +1275,16 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, } std::unique_ptr wal_deletion; - if (immutable_db_options_.track_and_verify_wals_in_manifest) { - wal_deletion.reset(new VersionEdit); - wal_deletion->DeleteWalsBefore(max_wal_number + 1); + if (flushed) { + wal_deletion = std::make_unique(); + if (immutable_db_options_.track_and_verify_wals_in_manifest) { + wal_deletion->DeleteWalsBefore(max_wal_number + 1); + } + if (!allow_2pc()) { + // In non-2pc mode, flushing the memtables of the column families + // means we can advance min_log_number_to_keep. + wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1); + } edit_lists.back().push_back(wal_deletion.get()); } @@ -1351,7 +1363,14 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { // FindObsoleteFiles() total_log_size_ = 0; log_empty_ = false; + uint64_t min_wal_with_unflushed_data = + versions_->MinLogNumberWithUnflushedData(); for (auto wal_number : wal_numbers) { + if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) { + // In non-2pc mode, the WAL files not backing unflushed data are not + // alive, thus should not be added to the alive_log_files_. + continue; + } // We preallocate space for wals, but then after a crash and restart, those // preallocated space are not needed anymore. It is likely only the last // log has such preallocated space, so we only truncate for the last log. diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 8a61dadaf..f2b94ed07 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1491,6 +1491,93 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) { ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options)); } +TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) { + Options options = CurrentOptions(); + options.env = env_; + options.track_and_verify_wals_in_manifest = true; + // The following make sure there are two bg flush threads. + options.max_background_jobs = 8; + + const std::string cf1_name("cf1"); + CreateAndReopenWithCF({cf1_name}, options); + assert(handles_.size() == 2); + + { + dbfull()->TEST_LockMutex(); + ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes); + dbfull()->TEST_UnlockMutex(); + } + + ASSERT_OK(dbfull()->PauseBackgroundWork()); + + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value")); + ASSERT_OK(db_->Put(WriteOptions(), "foo", "value")); + + ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[1])); + + ASSERT_OK(db_->Put(WriteOptions(), "foo", "value")); + ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[0])); + + bool called = false; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + // This callback will be called when the first bg flush thread reaches the + // point before entering the MANIFEST write queue after flushing the SST + // file. + // The purpose of the sync points here is to ensure both bg flush threads + // finish computing `min_wal_number_to_keep` before any of them updates the + // `log_number` for the column family that's being flushed. + SyncPoint::GetInstance()->SetCallBack( + "MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep", + [&](void* /*arg*/) { + dbfull()->mutex()->AssertHeld(); + if (!called) { + // We are the first bg flush thread in the MANIFEST write queue. + // We set up the dependency between sync points for two threads that + // will be executing the same code. + // For the interleaving of events, see + // https://github.com/facebook/rocksdb/pull/9715. + // bg flush thread1 will release the db mutex while in the MANIFEST + // write queue. In the meantime, bg flush thread2 locks db mutex and + // computes the min_wal_number_to_keep (before thread1 writes to + // MANIFEST thus before cf1->log_number is updated). Bg thread2 joins + // the MANIFEST write queue afterwards and bg flush thread1 proceeds + // with writing to MANIFEST. + called = true; + SyncPoint::GetInstance()->LoadDependency({ + {"VersionSet::LogAndApply:WriteManifestStart", + "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"}, + {"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2", + "VersionSet::LogAndApply:WriteManifest"}, + }); + } else { + // The other bg flush thread has already been in the MANIFEST write + // queue, and we are after. + TEST_SYNC_POINT( + "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(dbfull()->ContinueBackgroundWork()); + + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0])); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); + + ASSERT_TRUE(called); + + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + DB* db1 = nullptr; + Status s = DB::OpenForReadOnly(options, dbname_, &db1); + ASSERT_OK(s); + assert(db1); + delete db1; +} + // Test scope: // - We expect to open data store under all circumstances // - We expect only data upto the point where the first error was encountered diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 88bf8cc69..3ec0e8da1 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -95,8 +95,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( jwriter << "cf_name" << cf_name << "job" << job_id << "event" << "table_file_creation" << "file_number" << fd.GetNumber() << "file_size" - << fd.GetFileSize() << "file_checksum" << file_checksum - << "file_checksum_func_name" << file_checksum_func_name; + << fd.GetFileSize() << "file_checksum" + << Slice(file_checksum).ToString(true) << "file_checksum_func_name" + << file_checksum_func_name; // table_properties { diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 133562b7b..b0d29bcd2 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -489,8 +489,8 @@ Status MemTableList::TryInstallMemtableFlushResults( // TODO(myabandeh): Not sure how batch_count could be 0 here. if (batch_count > 0) { uint64_t min_wal_number_to_keep = 0; + assert(edit_list.size() > 0); if (vset->db_options()->allow_2pc) { - assert(edit_list.size() > 0); // Note that if mempurge is successful, the edit_list will // not be applicable (contains info of new min_log number to keep, // and level 0 file path of SST file created during normal flush, @@ -499,23 +499,26 @@ Status MemTableList::TryInstallMemtableFlushResults( min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( vset, *cfd, edit_list, memtables_to_flush, prep_tracker); - // We piggyback the information of earliest log file to keep in the + // We piggyback the information of earliest log file to keep in the // manifest entry for the last file flushed. - edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep); + } else { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list); } + edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep); std::unique_ptr wal_deletion; if (vset->db_options()->track_and_verify_wals_in_manifest) { - if (!vset->db_options()->allow_2pc) { - min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list); - } if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { wal_deletion.reset(new VersionEdit); wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); edit_list.push_back(wal_deletion.get()); } + TEST_SYNC_POINT_CALLBACK( + "MemTableList::TryInstallMemtableFlushResults:" + "AfterComputeMinWalToKeep", + nullptr); } const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, @@ -798,15 +801,14 @@ Status InstallMemtableAtomicFlushResults( if (vset->db_options()->allow_2pc) { min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( vset, cfds, edit_lists, mems_list, prep_tracker); - edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep); + } else { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists); } + edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep); std::unique_ptr wal_deletion; if (vset->db_options()->track_and_verify_wals_in_manifest) { - if (!vset->db_options()->allow_2pc) { - min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists); - } if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { wal_deletion.reset(new VersionEdit); wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 208527fb8..f3e24016b 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -394,7 +394,7 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader, if (s->ok()) { version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily( version_edit_params_.max_column_family_); - version_set_->MarkMinLogNumberToKeep2PC( + version_set_->MarkMinLogNumberToKeep( version_edit_params_.min_log_number_to_keep_); version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_); version_set_->MarkFileNumberUsed(version_edit_params_.log_number_); @@ -970,12 +970,11 @@ void DumpManifestHandler::CheckIterationResult(const log::Reader& reader, fprintf(stdout, "next_file_number %" PRIu64 " last_sequence %" PRIu64 " prev_log_number %" PRIu64 " max_column_family %" PRIu32 - " min_log_number_to_keep " - "%" PRIu64 "\n", + " min_log_number_to_keep %" PRIu64 "\n", version_set_->current_next_file_number(), version_set_->LastSequence(), version_set_->prev_log_number(), version_set_->column_family_set_->GetMaxColumnFamily(), - version_set_->min_log_number_to_keep_2pc()); + version_set_->min_log_number_to_keep()); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index daf1d9145..8c6c9fce1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4141,7 +4141,7 @@ void VersionSet::Reset() { } db_id_.clear(); next_file_number_.store(2); - min_log_number_to_keep_2pc_.store(0); + min_log_number_to_keep_.store(0); manifest_file_number_ = 0; options_file_number_ = 0; pending_manifest_file_number_ = 0; @@ -4610,8 +4610,7 @@ Status VersionSet::ProcessManifestWrites( } if (last_min_log_number_to_keep != 0) { - // Should only be set in 2PC mode. - MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep); + MarkMinLogNumberToKeep(last_min_log_number_to_keep); } for (int i = 0; i < static_cast(versions.size()); ++i) { @@ -4965,7 +4964,7 @@ Status VersionSet::Recover( ",min_log_number_to_keep is %" PRIu64 "\n", manifest_path.c_str(), manifest_file_number_, next_file_number_.load(), last_sequence_.load(), log_number, prev_log_number_, - column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); + column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep()); for (auto cfd : *column_family_set_) { if (cfd->IsDropped()) { @@ -5378,9 +5377,9 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { } // Called only either from ::LogAndApply which is protected by mutex or during // recovery which is single-threaded. -void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { - if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) { - min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed); +void VersionSet::MarkMinLogNumberToKeep(uint64_t number) { + if (min_log_number_to_keep_.load(std::memory_order_relaxed) < number) { + min_log_number_to_keep_.store(number, std::memory_order_relaxed); } } @@ -5506,7 +5505,7 @@ Status VersionSet::WriteCurrentStateToManifest( // min_log_number_to_keep is for the whole db, not for specific column family. // So it does not need to be set for every column family, just need to be set once. // Since default CF can never be dropped, we set the min_log to the default CF here. - uint64_t min_log = min_log_number_to_keep_2pc(); + uint64_t min_log = min_log_number_to_keep(); if (min_log != 0) { edit.SetMinLogNumberToKeep(min_log); } diff --git a/db/version_set.h b/db/version_set.h index 38e6616cf..f30cc0b76 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1128,8 +1128,8 @@ class VersionSet { uint64_t current_next_file_number() const { return next_file_number_.load(); } - uint64_t min_log_number_to_keep_2pc() const { - return min_log_number_to_keep_2pc_.load(); + uint64_t min_log_number_to_keep() const { + return min_log_number_to_keep_.load(); } // Allocate and return a new file number @@ -1187,7 +1187,7 @@ class VersionSet { // Mark the specified log number as deleted // REQUIRED: this is only called during single-threaded recovery or repair, or // from ::LogAndApply where the global mutex is held. - void MarkMinLogNumberToKeep2PC(uint64_t number); + void MarkMinLogNumberToKeep(uint64_t number); // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. @@ -1196,10 +1196,12 @@ class VersionSet { // Returns the minimum log number which still has data not flushed to any SST // file. // In non-2PC mode, all the log numbers smaller than this number can be safely - // deleted. + // deleted, although we still use `min_log_number_to_keep_` to determine when + // to delete a WAL file. uint64_t MinLogNumberWithUnflushedData() const { return PreComputeMinLogNumberWithUnflushedData(nullptr); } + // Returns the minimum log number which still has data not flushed to any SST // file. // Empty column families' log number is considered to be @@ -1399,9 +1401,8 @@ class VersionSet { const ImmutableDBOptions* const db_options_; std::atomic next_file_number_; // Any WAL number smaller than this should be ignored during recovery, - // and is qualified for being deleted in 2PC mode. In non-2PC mode, this - // number is ignored. - std::atomic min_log_number_to_keep_2pc_ = {0}; + // and is qualified for being deleted. + std::atomic min_log_number_to_keep_ = {0}; uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t options_file_size_; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 422afaf49..c48547508 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -3424,6 +3424,7 @@ TEST_F(VersionSetTestMissingFiles, NoFileMissing) { } TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) { + db_options_.allow_2pc = true; NewDB(); SstInfo sst(100, kDefaultColumnFamilyName, "a"); @@ -3435,12 +3436,12 @@ TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) { edit.AddFile(0, file_metas[0]); edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC); ASSERT_OK(LogAndApplyToDefaultCF(edit)); - ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC); + ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC); for (int i = 0; i < 3; i++) { CreateNewManifest(); ReopenDB(); - ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC); + ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC); } }