From 199fabc197e980aeae6e11e7271b98c8b2a5d4a9 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Wed, 6 Feb 2019 08:43:16 -0800 Subject: [PATCH] WritePrepared: non-atomic commit of delayed prepared (#4947) Summary: Commit of delayed prepared has two non-atomic steps: add to commit cache, remove from delayed_prepared_. Similarly in ::IsInSnapshot we read from commit cache first and then look into delayed_prepared_. Due to non-atomicity thus the reader might not find the prep_seq that is just committed neither in commit cache nor in delayed_prepared_. To fix that i) we check if there was any delayed prepared BEFORE looking into commit cache, ii) if there was, we complete the search steps to be these: i) commit cache, ii) delayed prepared, commit cache again. In this way if the first query to commit cache missed the commit, the 2nd will catch it. The cost of the redundant read from commit cache is paid only if delayed_prepared_ is nonempty which should be a very rare scenario. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4947 Differential Revision: D13952754 Pulled By: maysamyabandeh fbshipit-source-id: 8f47826b13f8ce154398d842028342423f4ca2b2 --- .../write_prepared_transaction_test.cc | 74 +++++++++++++++++++ .../transactions/write_prepared_txn_db.cc | 7 +- .../transactions/write_prepared_txn_db.h | 28 ++++++- 3 files changed, 103 insertions(+), 6 deletions(-) diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 6af9ac085..8e05f7aa5 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -2696,6 +2696,80 @@ TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) { delete iter; } +// Committing an delayed prepared has two non-atomic steps: update commit cache, +// remove seq from delayed_prepared_. The read in IsInSnapshot also involves two +// non-atomic steps of checking these two data structures. This test breaks each +// in the middle to ensure correctness in spite of non-atomic execution. +// Note: This test is limitted to the case where snapshot is larger than the +// max_evicted_seq_. +TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfOldPrepared) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 3; // 8 entries + for (auto split_read : {true, false}) { + DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + // Fill up the commit cache + std::string init_value("value1"); + for (int i = 0; i < 10; i++) { + db->Put(WriteOptions(), Slice("key1"), Slice(init_value)); + } + // Prepare a transaction but do not commit it + Transaction* txn = + db->BeginTransaction(WriteOptions(), TransactionOptions()); + ASSERT_OK(txn->SetName("xid")); + ASSERT_OK(txn->Put(Slice("key1"), Slice("value2"))); + ASSERT_OK(txn->Prepare()); + // Commit a bunch of entires to advance max evicted seq and make the + // prepared a delayed prepared + for (int i = 0; i < 10; i++) { + db->Put(WriteOptions(), Slice("key3"), Slice("value3")); + } + // The snapshot should not see the delayed prepared entry + auto snap = db->GetSnapshot(); + + if (split_read) { + // split right after reading from the commit cache + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause", + "AtomicCommitOfOldPrepared:Commit:before"}, + {"AtomicCommitOfOldPrepared:Commit:after", + "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}}); + } else { // split commit + // split right before removing from delayed_preparped_ + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"WritePreparedTxnDB::RemovePrepared:pause", + "AtomicCommitOfOldPrepared:Read:before"}, + {"AtomicCommitOfOldPrepared:Read:after", + "WritePreparedTxnDB::RemovePrepared:resume"}}); + } + SyncPoint::GetInstance()->EnableProcessing(); + + rocksdb::port::Thread commit_thread([&]() { + TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Commit:before"); + ASSERT_OK(txn->Commit()); + TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Commit:after"); + delete txn; + }); + + rocksdb::port::Thread read_thread([&]() { + TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Read:before"); + ReadOptions roptions; + roptions.snapshot = snap; + PinnableSlice value; + auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value); + ASSERT_OK(s); + // It should not see the commit of delayed prpared + ASSERT_TRUE(value == init_value); + TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Read:after"); + db->ReleaseSnapshot(snap); + }); + + read_thread.join(); + commit_thread.join(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + } +} + // When an old prepared entry gets committed, there is a gap between the time // that it is published and when it is cleaned up from old_prepared_. This test // stresses such cacese. diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index a86e3b004..ab1c3490e 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -489,6 +489,8 @@ void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq, TEST_SYNC_POINT_CALLBACK( "RemovePrepared:Start", const_cast(reinterpret_cast(&prepare_seq))); + TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause"); + TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume"); ROCKS_LOG_DETAILS(info_log_, "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt, prepare_seq, batch_cnt); @@ -544,9 +546,8 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, const SequenceNumber& new_max) { ROCKS_LOG_DETAILS(info_log_, - "AdvanceMaxEvictedSeq overhead %" PRIu64 - " => %" PRIu64 prev_max, - new_max); + "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64, + prev_max, new_max); // Declare the intention before getting snapshot from the DB. This helps a // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if // it has not already. Otherwise the new snapshot is when we ask DB for diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 60fa28f62..b2ca89ae8 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -153,10 +153,21 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { prep_seq, snapshot_seq, 1, min_uncommitted); return true; } + // Commit of delayed prepared has two non-atomic steps: add to commit cache, + // remove from delayed prepared. Our reads from these two is also + // non-atomic. By looking into commit cache first thus we might not find the + // prep_seq neither in commit cache not in delayed_prepared_. To fix that i) + // we check if there was any delayed prepared BEFORE looking into commit + // cache, ii) if there was, we complete the search steps to be these: i) + // commit cache, ii) delayed prepared, commit cache again. In this way if + // the first query to commit cache missed the commit, the 2nd will catch it. + bool was_empty = delayed_prepared_empty_.load(std::memory_order_acquire); auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; CommitEntry64b dont_care; CommitEntry cached; bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); + TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause"); + TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"); if (exist && prep_seq == cached.prep_seq) { // It is committed and also not evicted from commit cache ROCKS_LOG_DETAILS( @@ -178,12 +189,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { prep_seq, snapshot_seq, 0); return false; } - if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { + if (!was_empty) { // We should not normally reach here WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); ReadLock rl(&prepared_mutex_); - ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64, - static_cast(delayed_prepared_.size())); + ROCKS_LOG_WARN(info_log_, + "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64, + static_cast(delayed_prepared_.size()), prep_seq); if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { // This is the order: 1) delayed_prepared_commits_ update, 2) publish 3) // delayed_prepared_ clean up. So check if it is the case of a late @@ -204,6 +216,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { snapshot_seq <= it->second); return it->second <= snapshot_seq; } + } else { + // 2nd query to commit cache. Refer to was_empty comment above. + exist = GetCommitEntry(indexed_seq, &dont_care, &cached); + if (exist && prep_seq == cached.prep_seq) { + ROCKS_LOG_DETAILS( + info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); + return cached.commit_seq <= snapshot_seq; + } } } // When advancing max_evicted_seq_, we move older entires from prepared to