From e5451b30dbe0150040ee60a24d57194c29e6fb71 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 16 Feb 2022 23:07:48 -0800 Subject: [PATCH] Fix a silent data loss for write-committed txn (#9571) Summary: The following sequence of events can cause silent data loss for write-committed transactions. ``` Time thread 1 bg flush | db->Put("a") | txn = NewTxn() | txn->Put("b", "v") | txn->Prepare() // writes only to 5.log | db->SwitchMemtable() // memtable 1 has "a" | // close 5.log, | // creates 8.log | trigger flush | pick memtable 1 | unlock db mutex | write new sst | txn->ctwb->Put("gtid", "1") // writes 8.log | txn->Commit() // writes to 8.log | // writes to memtable 2 | compute min_log_number_to_keep_2pc, this | will be 8 (incorrect). | | Purge obsolete wals, including 5.log | V ``` At this point, writes of txn exists only in memtable. Close db without flush because db thinks the data in memtable are backed by log. Then reopen, the writes are lost except key-value pair {"gtid"->"1"}, only the commit marker of txn is in 8.log The reason lies in `PrecomputeMinLogNumberToKeep2PC()` which calls `FindMinPrepLogReferencedByMemTable()`. In the above example, when bg flush thread tries to find obsolete wals, it uses the information computed by `PrecomputeMinLogNumberToKeep2PC()`. The return value of `PrecomputeMinLogNumberToKeep2PC()` depends on three components - `PrecomputeMinLogNumberToKeepNon2PC()`. This represents the WAL that has unflushed data. As the name of this method suggests, it does not account for 2PC. Although the keys reside in the prepare section of a previous WAL, the column family references the current WAL when they are actually inserted into the memtable during txn commit. - `prep_tracker->FindMinLogContainingOutstandingPrep()`. This represents the WAL with a prepare section but the txn hasn't committed. - `FindMinPrepLogReferencedByMemTable()`. This represents the WAL on which some memtables (mutable and immutable) depend for their unflushed data. The bug lies in `FindMinPrepLogReferencedByMemTable()`. Originally, this function skips checking the column families that are being flushed, but the unit test added in this PR shows that they should not be. In this unit test, there is only the default column family, and one of its memtables has unflushed data backed by a prepare section in 5.log. We should return this information via `FindMinPrepLogReferencedByMemTable()`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9571 Test Plan: ``` ./transaction_test --gtest_filter=*/TransactionTest.SwitchMemtableDuringPrepareAndCommit_WC/* make check ``` Reviewed By: siying Differential Revision: D34235236 Pulled By: riversand963 fbshipit-source-id: 120eb21a666728a38dda77b96276c6af72b008b1 --- HISTORY.md | 4 ++ db/db_impl/db_impl.h | 5 +- db/db_impl/db_impl_debug.cc | 3 +- db/db_impl/db_impl_files.cc | 19 +++--- db/db_impl/db_impl_open.cc | 6 ++ utilities/transactions/transaction_test.cc | 72 ++++++++++++++++++++++ 6 files changed, 93 insertions(+), 16 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 2d81854e9..3a811e8f0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,8 @@ # Rocksdb Change Log +## 6.29.3 (02/17/2022) +### Bug Fixes +* Fix a data loss bug for 2PC write-committed transaction caused by concurrent transaction commit and memtable switch (#9571). + ## 6.29.2 (02/15/2022) ### Performance Improvements * DisableManualCompaction() doesn't have to wait scheduled manual compaction to be executed in thread-pool to cancel the job. diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 121fe7cef..bd6c909e8 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2386,11 +2386,10 @@ extern uint64_t PrecomputeMinLogNumberToKeepNon2PC( // will not depend on any WAL file. nullptr means no memtable is being flushed. // The function is only applicable to 2pc mode. extern uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const ColumnFamilyData* cfd_to_flush, - const autovector& memtables_to_flush); + VersionSet* vset, const autovector& memtables_to_flush); // For atomic flush. extern uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const autovector& cfds_to_flush, + VersionSet* vset, const autovector*>& memtables_to_flush); // Fix user-supplied options to be reasonable diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index c93b55c1e..324c372be 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -262,8 +262,7 @@ size_t DBImpl::TEST_LogsWithPrepSize() { uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { autovector empty_list; - return FindMinPrepLogReferencedByMemTable(versions_.get(), nullptr, - empty_list); + return FindMinPrepLogReferencedByMemTable(versions_.get(), empty_list); } Status DBImpl::TEST_GetLatestMutableCFOptions( diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 7ce1a0ae4..b50380d92 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -670,8 +670,7 @@ void DBImpl::DeleteObsoleteFiles() { } uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const ColumnFamilyData* cfd_to_flush, - const autovector& memtables_to_flush) { + VersionSet* vset, const autovector& memtables_to_flush) { uint64_t min_log = 0; // we must look through the memtables for two phase transactions @@ -679,7 +678,7 @@ uint64_t FindMinPrepLogReferencedByMemTable( std::unordered_set memtables_to_flush_set( memtables_to_flush.begin(), memtables_to_flush.end()); for (auto loop_cfd : *vset->GetColumnFamilySet()) { - if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) { + if (loop_cfd->IsDropped()) { continue; } @@ -701,18 +700,16 @@ uint64_t FindMinPrepLogReferencedByMemTable( } uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const autovector& cfds_to_flush, + VersionSet* vset, const autovector*>& memtables_to_flush) { uint64_t min_log = 0; - std::unordered_set cfds_to_flush_set(cfds_to_flush.begin(), - cfds_to_flush.end()); std::unordered_set memtables_to_flush_set; for (const autovector* memtables : memtables_to_flush) { memtables_to_flush_set.insert(memtables->begin(), memtables->end()); } for (auto loop_cfd : *vset->GetColumnFamilySet()) { - if (loop_cfd->IsDropped() || cfds_to_flush_set.count(loop_cfd)) { + if (loop_cfd->IsDropped()) { continue; } @@ -828,8 +825,8 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( min_log_number_to_keep = min_log_in_prep_heap; } - uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( - vset, &cfd_to_flush, memtables_to_flush); + uint64_t min_log_refed_by_mem = + FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush); if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < min_log_number_to_keep) { @@ -859,8 +856,8 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( min_log_number_to_keep = min_log_in_prep_heap; } - uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( - vset, cfds_to_flush, memtables_to_flush); + uint64_t min_log_refed_by_mem = + FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush); if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < min_log_number_to_keep) { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index f73060a9e..2fe4000dd 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1371,6 +1371,12 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); + assert(cfd); + assert(cfd->imm()); + // The immutable memtable list must be empty. + assert(std::numeric_limits::max() == + cfd->imm()->GetEarliestMemTableID()); + const uint64_t start_micros = immutable_db_options_.clock->NowMicros(); FileMetaData meta; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index d8d12c92a..82d5073c9 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -148,6 +148,78 @@ TEST_P(TransactionTest, SuccessTest) { delete txn; } +TEST_P(TransactionTest, SwitchMemtableDuringPrepareAndCommit_WC) { + const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); + + if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) { + ROCKSDB_GTEST_BYPASS("Test applies to write-committed only"); + return; + } + + ASSERT_OK(db->Put(WriteOptions(), "key0", "value")); + + TransactionOptions txn_opts; + txn_opts.use_only_the_last_commit_time_batch_for_recovery = true; + Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opts); + assert(txn); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table", [&](void* arg) { + // db mutex not held. + auto* mems = reinterpret_cast*>(arg); + assert(mems); + ASSERT_EQ(1, mems->size()); + auto* ctwb = txn->GetCommitTimeWriteBatch(); + ASSERT_OK(ctwb->Put("gtid", "123")); + ASSERT_OK(txn->Commit()); + delete txn; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(txn->Put("key1", "value")); + ASSERT_OK(txn->SetName("txn1")); + + ASSERT_OK(txn->Prepare()); + + auto dbimpl = static_cast_with_check(db->GetRootDB()); + ASSERT_OK(dbimpl->TEST_SwitchMemtable(nullptr)); + ASSERT_OK(dbimpl->TEST_FlushMemTable( + /*wait=*/false, /*allow_write_stall=*/true, /*cfh=*/nullptr)); + + ASSERT_OK(dbimpl->TEST_WaitForFlushMemTable()); + + { + std::string value; + ASSERT_OK(db->Get(ReadOptions(), "key1", &value)); + ASSERT_EQ("value", value); + } + + delete db; + db = nullptr; + Status s; + if (use_stackable_db_ == false) { + s = TransactionDB::Open(options, txn_db_options, dbname, &db); + } else { + s = OpenWithStackableDB(); + } + ASSERT_OK(s); + assert(db); + + { + std::string value; + ASSERT_OK(db->Get(ReadOptions(), "gtid", &value)); + ASSERT_EQ("123", value); + + ASSERT_OK(db->Get(ReadOptions(), "key1", &value)); + ASSERT_EQ("value", value); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + // The test clarifies the contract of do_validate and assume_tracked // in GetForUpdate and Put/Merge/Delete TEST_P(TransactionTest, AssumeExclusiveTracked) {