Fix a silent data loss for write-committed txn (#9571)

Summary:
The following sequence of events can cause silent data loss for write-committed
transactions.
```
Time    thread 1                                       bg flush
 |   db->Put("a")
 |   txn = NewTxn()
 |   txn->Put("b", "v")
 |   txn->Prepare()       // writes only to 5.log
 |   db->SwitchMemtable() // memtable 1 has "a"
 |                        // close 5.log,
 |                        // creates 8.log
 |   trigger flush
 |                                                  pick memtable 1
 |                                                  unlock db mutex
 |                                                  write new sst
 |   txn->ctwb->Put("gtid", "1") // writes 8.log
 |   txn->Commit() // writes to 8.log
 |                 // writes to memtable 2
 |                                               compute min_log_number_to_keep_2pc, this
 |                                               will be 8 (incorrect).
 |
 |                                             Purge obsolete wals, including 5.log
 |
 V
```

At this point, writes of txn exists only in memtable. Close db without flush because db thinks the data in
memtable are backed by log. Then reopen, the writes are lost except key-value pair {"gtid"->"1"},
only the commit marker of txn is in 8.log

The reason lies in `PrecomputeMinLogNumberToKeep2PC()` which calls `FindMinPrepLogReferencedByMemTable()`.
In the above example, when bg flush thread tries to find obsolete wals, it uses the information
computed by `PrecomputeMinLogNumberToKeep2PC()`. The return value of `PrecomputeMinLogNumberToKeep2PC()`
depends on three components
- `PrecomputeMinLogNumberToKeepNon2PC()`. This represents the WAL that has unflushed data. As the name of this method suggests, it does not account for 2PC. Although the keys reside in the prepare section of a previous WAL, the column family references the current WAL when they are actually inserted into the memtable during txn commit.
- `prep_tracker->FindMinLogContainingOutstandingPrep()`. This represents the WAL with a prepare section but the txn hasn't committed.
- `FindMinPrepLogReferencedByMemTable()`. This represents the WAL on which some memtables (mutable and immutable) depend for their unflushed data.

The bug lies in `FindMinPrepLogReferencedByMemTable()`. Originally, this function skips checking the column families
that are being flushed, but the unit test added in this PR shows that they should not be. In this unit test, there is
only the default column family, and one of its memtables has unflushed data backed by a prepare section in 5.log.
We should return this information via `FindMinPrepLogReferencedByMemTable()`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9571

Test Plan:
```
./transaction_test --gtest_filter=*/TransactionTest.SwitchMemtableDuringPrepareAndCommit_WC/*
make check
```

Reviewed By: siying

Differential Revision: D34235236

Pulled By: riversand963

fbshipit-source-id: 120eb21a666728a38dda77b96276c6af72b008b1
This commit is contained in:
Yanqin Jin 2022-02-16 23:07:48 -08:00
parent 877f8b43df
commit e5451b30db
6 changed files with 93 additions and 16 deletions

View File

@ -1,4 +1,8 @@
# Rocksdb Change Log
## 6.29.3 (02/17/2022)
### Bug Fixes
* Fix a data loss bug for 2PC write-committed transaction caused by concurrent transaction commit and memtable switch (#9571).
## 6.29.2 (02/15/2022)
### Performance Improvements
* DisableManualCompaction() doesn't have to wait scheduled manual compaction to be executed in thread-pool to cancel the job.

View File

@ -2386,11 +2386,10 @@ extern uint64_t PrecomputeMinLogNumberToKeepNon2PC(
// will not depend on any WAL file. nullptr means no memtable is being flushed.
// The function is only applicable to 2pc mode.
extern uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
const autovector<MemTable*>& memtables_to_flush);
VersionSet* vset, const autovector<MemTable*>& memtables_to_flush);
// For atomic flush.
extern uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
VersionSet* vset,
const autovector<const autovector<MemTable*>*>& memtables_to_flush);
// Fix user-supplied options to be reasonable

View File

@ -262,8 +262,7 @@ size_t DBImpl::TEST_LogsWithPrepSize() {
uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() {
autovector<MemTable*> empty_list;
return FindMinPrepLogReferencedByMemTable(versions_.get(), nullptr,
empty_list);
return FindMinPrepLogReferencedByMemTable(versions_.get(), empty_list);
}
Status DBImpl::TEST_GetLatestMutableCFOptions(

View File

@ -670,8 +670,7 @@ void DBImpl::DeleteObsoleteFiles() {
}
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
const autovector<MemTable*>& memtables_to_flush) {
VersionSet* vset, const autovector<MemTable*>& memtables_to_flush) {
uint64_t min_log = 0;
// we must look through the memtables for two phase transactions
@ -679,7 +678,7 @@ uint64_t FindMinPrepLogReferencedByMemTable(
std::unordered_set<MemTable*> memtables_to_flush_set(
memtables_to_flush.begin(), memtables_to_flush.end());
for (auto loop_cfd : *vset->GetColumnFamilySet()) {
if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) {
if (loop_cfd->IsDropped()) {
continue;
}
@ -701,18 +700,16 @@ uint64_t FindMinPrepLogReferencedByMemTable(
}
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
VersionSet* vset,
const autovector<const autovector<MemTable*>*>& memtables_to_flush) {
uint64_t min_log = 0;
std::unordered_set<ColumnFamilyData*> cfds_to_flush_set(cfds_to_flush.begin(),
cfds_to_flush.end());
std::unordered_set<MemTable*> memtables_to_flush_set;
for (const autovector<MemTable*>* memtables : memtables_to_flush) {
memtables_to_flush_set.insert(memtables->begin(), memtables->end());
}
for (auto loop_cfd : *vset->GetColumnFamilySet()) {
if (loop_cfd->IsDropped() || cfds_to_flush_set.count(loop_cfd)) {
if (loop_cfd->IsDropped()) {
continue;
}
@ -828,8 +825,8 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
min_log_number_to_keep = min_log_in_prep_heap;
}
uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(
vset, &cfd_to_flush, memtables_to_flush);
uint64_t min_log_refed_by_mem =
FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush);
if (min_log_refed_by_mem != 0 &&
min_log_refed_by_mem < min_log_number_to_keep) {
@ -859,8 +856,8 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
min_log_number_to_keep = min_log_in_prep_heap;
}
uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(
vset, cfds_to_flush, memtables_to_flush);
uint64_t min_log_refed_by_mem =
FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush);
if (min_log_refed_by_mem != 0 &&
min_log_refed_by_mem < min_log_number_to_keep) {

View File

@ -1371,6 +1371,12 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();
assert(cfd);
assert(cfd->imm());
// The immutable memtable list must be empty.
assert(std::numeric_limits<uint64_t>::max() ==
cfd->imm()->GetEarliestMemTableID());
const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
FileMetaData meta;

View File

@ -148,6 +148,78 @@ TEST_P(TransactionTest, SuccessTest) {
delete txn;
}
TEST_P(TransactionTest, SwitchMemtableDuringPrepareAndCommit_WC) {
const TxnDBWritePolicy write_policy = std::get<2>(GetParam());
if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) {
ROCKSDB_GTEST_BYPASS("Test applies to write-committed only");
return;
}
ASSERT_OK(db->Put(WriteOptions(), "key0", "value"));
TransactionOptions txn_opts;
txn_opts.use_only_the_last_commit_time_batch_for_recovery = true;
Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opts);
assert(txn);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&](void* arg) {
// db mutex not held.
auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
assert(mems);
ASSERT_EQ(1, mems->size());
auto* ctwb = txn->GetCommitTimeWriteBatch();
ASSERT_OK(ctwb->Put("gtid", "123"));
ASSERT_OK(txn->Commit());
delete txn;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(txn->Put("key1", "value"));
ASSERT_OK(txn->SetName("txn1"));
ASSERT_OK(txn->Prepare());
auto dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
ASSERT_OK(dbimpl->TEST_SwitchMemtable(nullptr));
ASSERT_OK(dbimpl->TEST_FlushMemTable(
/*wait=*/false, /*allow_write_stall=*/true, /*cfh=*/nullptr));
ASSERT_OK(dbimpl->TEST_WaitForFlushMemTable());
{
std::string value;
ASSERT_OK(db->Get(ReadOptions(), "key1", &value));
ASSERT_EQ("value", value);
}
delete db;
db = nullptr;
Status s;
if (use_stackable_db_ == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else {
s = OpenWithStackableDB();
}
ASSERT_OK(s);
assert(db);
{
std::string value;
ASSERT_OK(db->Get(ReadOptions(), "gtid", &value));
ASSERT_EQ("123", value);
ASSERT_OK(db->Get(ReadOptions(), "key1", &value));
ASSERT_EQ("value", value);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
// The test clarifies the contract of do_validate and assume_tracked
// in GetForUpdate and Put/Merge/Delete
TEST_P(TransactionTest, AssumeExclusiveTracked) {