From 00b33c2474ef5fc07f10b7bf4c5684eebe4436c6 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Tue, 9 Jan 2018 08:47:46 -0800 Subject: [PATCH] WritePrepared Txn: address some pending TODOs Summary: This patch addresses a couple of minor TODOs for WritePrepared Txn such as double checking some assert statements at runtime as well, skip extra AddPrepared in non-2pc transactions, and safety check for infinite loops. Closes https://github.com/facebook/rocksdb/pull/3302 Differential Revision: D6617002 Pulled By: maysamyabandeh fbshipit-source-id: ef6673c139cb49f64c0879508d2f573b78609aca --- tools/ldb_cmd.cc | 6 ++ .../pessimistic_transaction_db.cc | 3 + utilities/transactions/transaction_test.cc | 63 ++++++++++++++++++ .../write_prepared_transaction_test.cc | 66 ------------------- utilities/transactions/write_prepared_txn.cc | 60 +++++++++++++---- .../transactions/write_prepared_txn_db.cc | 66 ++++++++++--------- .../transactions/write_prepared_txn_db.h | 34 +++++++--- 7 files changed, 181 insertions(+), 117 deletions(-) diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 56590161a..8da148b44 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1883,6 +1883,12 @@ class InMemoryHandler : public WriteBatch::Handler { return Status::OK(); } + virtual Status MarkNoop(bool) + override { + row_ << "NOOP "; + return Status::OK(); + } + virtual Status DeleteCF(uint32_t cf, const Slice& key) override { row_ << "DELETE(" << cf << ") : "; row_ << LDBCommand::StringToHex(key.ToString()) << " "; diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 7edeb9a81..9fff27e77 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -11,6 +11,7 @@ #include "utilities/transactions/pessimistic_transaction_db.h" +#include #include #include #include @@ -186,6 +187,8 @@ Status TransactionDB::Open( Status s; DB* db; + ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is " PRId32, + static_cast(txn_db_options.write_policy)); std::vector column_families_copy = column_families; std::vector compaction_enabled_cf_indices; DBOptions db_options_2pc = db_options; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index bea0a9224..129e9f576 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -4977,6 +4977,69 @@ TEST_P(TransactionTest, SeqAdvanceTest) { } } +// Test that the transactional db can handle duplicate keys in the write batch +TEST_P(TransactionTest, DuplicateKeyTest) { + for (bool do_prepare : {true, false}) { + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + auto s = txn0->SetName("xid"); + ASSERT_OK(s); + s = txn0->Put(Slice("foo0"), Slice("bar0a")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo0"), Slice("bar0b")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo1"), Slice("bar1")); + ASSERT_OK(s); + s = txn0->Merge(Slice("foo2"), Slice("bar2a")); + ASSERT_OK(s); + // TODO(myabandeh): enable this after duplicatae merge keys are supported + // s = txn0->Merge(Slice("foo2"), Slice("bar2a")); + // ASSERT_OK(s); + s = txn0->Put(Slice("foo2"), Slice("bar2b")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + // TODO(myabandeh): enable this after duplicatae merge keys are supported + // s = txn0->Merge(Slice("foo3"), Slice("bar3")); + // ASSERT_OK(s); + s = txn0->Put(Slice("foo4"), Slice("bar4")); + ASSERT_OK(s); + s = txn0->Delete(Slice("foo4")); + ASSERT_OK(s); + s = txn0->SingleDelete(Slice("foo4")); + ASSERT_OK(s); + if (do_prepare) { + s = txn0->Prepare(); + ASSERT_OK(s); + } + s = txn0->Commit(); + ASSERT_OK(s); + if (!do_prepare) { + auto pdb = reinterpret_cast(db); + pdb->UnregisterTransaction(txn0); + } + delete txn0; + ReadOptions ropt; + PinnableSlice pinnable_val; + + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0b")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar2b")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar3")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index ca1ec8da5..b41b076dc 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1283,72 +1283,6 @@ TEST_P(WritePreparedTransactionTest, RollbackTest) { } } -// TODO(myabandeh): move it to transaction_test when it is extended to -// WROTE_PREPARED. - -// Test that the transactional db can handle duplicate keys in the write batch -TEST_P(WritePreparedTransactionTest, DuplicateKeyTest) { - for (bool do_prepare : {true, false}) { - TransactionOptions txn_options; - WriteOptions write_options; - Transaction* txn0 = db->BeginTransaction(write_options, txn_options); - auto s = txn0->SetName("xid"); - ASSERT_OK(s); - s = txn0->Put(Slice("foo0"), Slice("bar0a")); - ASSERT_OK(s); - s = txn0->Put(Slice("foo0"), Slice("bar0b")); - ASSERT_OK(s); - s = txn0->Put(Slice("foo1"), Slice("bar1")); - ASSERT_OK(s); - s = txn0->Merge(Slice("foo2"), Slice("bar2a")); - ASSERT_OK(s); - // TODO(myabandeh): enable this after duplicatae merge keys are supported - // s = txn0->Merge(Slice("foo2"), Slice("bar2a")); - // ASSERT_OK(s); - s = txn0->Put(Slice("foo2"), Slice("bar2b")); - ASSERT_OK(s); - s = txn0->Put(Slice("foo3"), Slice("bar3")); - ASSERT_OK(s); - // TODO(myabandeh): enable this after duplicatae merge keys are supported - // s = txn0->Merge(Slice("foo3"), Slice("bar3")); - // ASSERT_OK(s); - s = txn0->Put(Slice("foo4"), Slice("bar4")); - ASSERT_OK(s); - s = txn0->Delete(Slice("foo4")); - ASSERT_OK(s); - s = txn0->SingleDelete(Slice("foo4")); - ASSERT_OK(s); - if (do_prepare) { - s = txn0->Prepare(); - ASSERT_OK(s); - } - s = txn0->Commit(); - ASSERT_OK(s); - if (!do_prepare) { - auto pdb = reinterpret_cast(db); - pdb->UnregisterTransaction(txn0); - } - delete txn0; - ReadOptions ropt; - PinnableSlice pinnable_val; - - s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); - ASSERT_OK(s); - ASSERT_TRUE(pinnable_val == ("bar0b")); - s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); - ASSERT_OK(s); - ASSERT_TRUE(pinnable_val == ("bar1")); - s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val); - ASSERT_OK(s); - ASSERT_TRUE(pinnable_val == ("bar2b")); - s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val); - ASSERT_OK(s); - ASSERT_TRUE(pinnable_val == ("bar3")); - s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val); - ASSERT_TRUE(s.IsNotFound()); - } -} - TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) { // Use large buffer to avoid memtable flush after 1024 insertions options.write_buffer_size = 1024 * 1024; diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index b4caaa4c7..7fea5a9df 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -7,6 +7,11 @@ #include "utilities/transactions/write_prepared_txn.h" +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include #include #include "db/column_family.h" @@ -55,7 +60,7 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, Iterator* db_iter = wpt_db_->NewIterator(options, column_family); assert(db_iter); - return write_batch_.NewIteratorWithBase(db_iter); + return write_batch_.NewIteratorWithBase(column_family, db_iter); } Status WritePreparedTxn::PrepareInternal() { @@ -75,10 +80,15 @@ Status WritePreparedTxn::PrepareInternal() { db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used); - assert(seq_used != kMaxSequenceNumber); + assert(!s.ok() || seq_used != kMaxSequenceNumber); auto prepare_seq = seq_used; SetId(prepare_seq); - wpt_db_->AddPrepared(prepare_seq); + // TODO(myabandeh): AddPrepared better to be called in the pre-release + // callback otherwise there is a non-zero chance of max dvancing prepare_seq + // and readers assume the data as committed. + if (s.ok()) { + wpt_db_->AddPrepared(prepare_seq); + } return s; } @@ -92,8 +102,15 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { } Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { + ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, + "CommitBatchInternal"); // TODO(myabandeh): handle the duplicate keys in the batch bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; + bool sync = write_options_.sync; + if (!do_one_write) { + // No need to sync on the first write + write_options_.sync = false; + } // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(batch); const bool DISABLE_MEMTABLE = true; @@ -105,14 +122,25 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr, no_log_ref, !DISABLE_MEMTABLE, &seq_used, do_one_write ? &update_commit_map : nullptr); - assert(seq_used != kMaxSequenceNumber); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + uint64_t& prepare_seq = seq_used; + SetId(prepare_seq); if (!s.ok()) { return s; } if (do_one_write) { return s; } // else do the 2nd write for commit - uint64_t& prepare_seq = seq_used; + // Set the original value of sync + write_options_.sync = sync; + ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, + "CommitBatchInternal 2nd write prepare_seq: %" PRIu64, + prepare_seq); + // Note: we skip AddPrepared here. This could be further optimized by skip + // erasing prepare_seq from prepared_txn_ in the following callback. + // TODO(myabandeh): What if max advances the prepare_seq_ in the meanwhile and + // readers assume the prepared data as committed? Almost zero probability. + // Commit the batch by writing an empty batch to the 2nd queue that will // release the commit sequence number to readers. WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( @@ -124,11 +152,13 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, no_log_ref, DISABLE_MEMTABLE, &seq_used, &update_commit_map_with_prepare); - assert(seq_used != kMaxSequenceNumber); + assert(!s.ok() || seq_used != kMaxSequenceNumber); return s; } Status WritePreparedTxn::CommitInternal() { + ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, + "CommitInternal prepare_seq: %" PRIu64, GetID()); // We take the commit-time batch and append the Commit marker. // The Memtable will ignore the Commit marker in non-recovery mode WriteBatch* working_batch = GetCommitTimeWriteBatch(); @@ -143,8 +173,6 @@ Status WritePreparedTxn::CommitInternal() { WriteBatchInternal::SetAsLastestPersistentState(working_batch); } - // TODO(myabandeh): Reject a commit request if AddCommitted cannot encode - // commit_seq. This happens if prep_seq <<< commit_seq. auto prepare_seq = GetId(); const bool includes_data = !empty && !for_recovery; WritePreparedCommitEntryPreReleaseCallback update_commit_map( @@ -158,11 +186,13 @@ Status WritePreparedTxn::CommitInternal() { auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, zero_log_number, disable_memtable, &seq_used, &update_commit_map); - assert(seq_used != kMaxSequenceNumber); + assert(!s.ok() || seq_used != kMaxSequenceNumber); return s; } Status WritePreparedTxn::RollbackInternal() { + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "RollbackInternal prepare_seq: %" PRIu64, GetId()); WriteBatch rollback_batch; assert(GetId() != kMaxSequenceNumber); assert(GetId() > 0); @@ -242,18 +272,20 @@ Status WritePreparedTxn::RollbackInternal() { s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, no_log_ref, !DISABLE_MEMTABLE, &seq_used, do_one_write ? &update_commit_map : nullptr); - assert(seq_used != kMaxSequenceNumber); + assert(!s.ok() || seq_used != kMaxSequenceNumber); if (!s.ok()) { return s; } if (do_one_write) { - // TODO(myabandeh): what if max has already advanced rollback_seq? // Mark the txn as rolled back uint64_t& rollback_seq = seq_used; wpt_db_->RollbackPrepared(GetId(), rollback_seq); return s; } // else do the 2nd write for commit uint64_t& prepare_seq = seq_used; + ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, + "RollbackInternal 2nd write prepare_seq: %" PRIu64, + prepare_seq); // Commit the batch by writing an empty batch to the queue that will release // the commit sequence number to readers. WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( @@ -265,10 +297,12 @@ Status WritePreparedTxn::RollbackInternal() { s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, no_log_ref, DISABLE_MEMTABLE, &seq_used, &update_commit_map_with_prepare); - assert(seq_used != kMaxSequenceNumber); + assert(!s.ok() || seq_used != kMaxSequenceNumber); // Mark the txn as rolled back uint64_t& rollback_seq = seq_used; - wpt_db_->RollbackPrepared(GetId(), rollback_seq); + if (s.ok()) { + wpt_db_->RollbackPrepared(GetId(), rollback_seq); + } return s; } diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 5334a4543..867b757da 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -22,6 +22,7 @@ #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" #include "util/mutexlock.h" +#include "util/string_util.h" #include "util/sync_point.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/transaction_db_mutex_impl.h" @@ -168,36 +169,31 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { // Adcance max_evicted_seq_ no more than 100 times before the cache wraps // around. INC_STEP_FOR_MAX_EVICTED = - std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast(1)); + std::max(COMMIT_CACHE_SIZE / 100, static_cast(1)); snapshot_cache_ = unique_ptr[]>( new std::atomic[SNAPSHOT_CACHE_SIZE] {}); commit_cache_ = unique_ptr[]>( new std::atomic[COMMIT_CACHE_SIZE] {}); } -#define ROCKSDB_LOG_DETAILS(LGR, FMT, ...) \ - ; // due to overhead by default skip such lines -// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__) - // Returns true if commit_seq <= snapshot_seq bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq) const { // Here we try to infer the return value without looking into prepare list. // This would help avoiding synchronization over a shared map. - // TODO(myabandeh): read your own writes // TODO(myabandeh): optimize this. This sequence of checks must be correct but // not necessary efficient if (prep_seq == 0) { // Compaction will output keys to bottom-level with sequence number 0 if // it is visible to the earliest snapshot. - ROCKSDB_LOG_DETAILS( + ROCKS_LOG_DETAILS( info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, prep_seq, snapshot_seq, 1); return true; } if (snapshot_seq < prep_seq) { // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq - ROCKSDB_LOG_DETAILS( + ROCKS_LOG_DETAILS( info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, prep_seq, snapshot_seq, 0); return false; @@ -207,7 +203,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, ReadLock rl(&prepared_mutex_); if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { // Then it is not committed yet - ROCKSDB_LOG_DETAILS( + ROCKS_LOG_DETAILS( info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, prep_seq, snapshot_seq, 0); return false; @@ -219,7 +215,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); if (exist && prep_seq == cached.prep_seq) { // It is committed and also not evicted from commit cache - ROCKSDB_LOG_DETAILS( + ROCKS_LOG_DETAILS( info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); return cached.commit_seq <= snapshot_seq; @@ -230,9 +226,10 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, // At this point we dont know if it was committed or it is still prepared auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire); + // max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now if (max_evicted_seq < prep_seq) { // Not evicted from cache and also not present, so must be still prepared - ROCKSDB_LOG_DETAILS( + ROCKS_LOG_DETAILS( info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, prep_seq, snapshot_seq, 0); return false; @@ -247,7 +244,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, // only (iii) is the case: committed // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < // snapshot_seq - ROCKSDB_LOG_DETAILS( + ROCKS_LOG_DETAILS( info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, prep_seq, snapshot_seq, 1); return true; @@ -256,7 +253,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, // If there was no overlapping commit entry, then it is committed with a // commit_seq lower than any live snapshot, including snapshot_seq. if (old_commit_map_empty_.load(std::memory_order_acquire)) { - ROCKSDB_LOG_DETAILS( + ROCKS_LOG_DETAILS( info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, prep_seq, snapshot_seq, 1); return true; @@ -273,30 +270,34 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, found = std::binary_search(vec.begin(), vec.end(), prep_seq); } if (!found) { - ROCKSDB_LOG_DETAILS( + ROCKS_LOG_DETAILS( info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, prep_seq, snapshot_seq, 1); return true; } } // (ii) it the case: it is committed but after the snapshot_seq - ROCKSDB_LOG_DETAILS( - info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, - prep_seq, snapshot_seq, 0); + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 0); return false; } void WritePreparedTxnDB::AddPrepared(uint64_t seq) { - ROCKSDB_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq); - // TODO(myabandeh): Add a runtime check to ensure the following assert. + ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq); assert(seq > max_evicted_seq_); + if (seq <= max_evicted_seq_) { + throw std::runtime_error( + "Added prepare_seq is larger than max_evicted_seq_: " + ToString(seq) + + " <= " + ToString(max_evicted_seq_.load())); + } WriteLock wl(&prepared_mutex_); prepared_txns_.push(seq); } void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq) { - ROCKSDB_LOG_DETAILS( + ROCKS_LOG_DETAILS( info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "", prep_seq, rollback_seq); std::vector snapshots = @@ -322,10 +323,10 @@ void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq, } } -void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, - uint64_t commit_seq) { - ROCKSDB_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, - prepare_seq, commit_seq); +void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, + bool prepare_skipped, uint8_t loop_cnt) { + ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, + prepare_seq, commit_seq); TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start"); TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause"); auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; @@ -334,9 +335,9 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted); if (to_be_evicted) { auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); - ROCKSDB_LOG_DETAILS(info_log_, - "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64, - evicted.prep_seq, evicted.commit_seq, prev_max); + ROCKS_LOG_DETAILS(info_log_, + "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64, + evicted.prep_seq, evicted.commit_seq, prev_max); if (prev_max < evicted.commit_seq) { // Inc max in larger steps to avoid frequent updates auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED; @@ -351,11 +352,13 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, if (!succ) { // A very rare event, in which the commit entry is updated before we do. // Here we apply a very simple solution of retrying. - // TODO(myabandeh): do precautions to detect bugs that cause infinite loops - AddCommitted(prepare_seq, commit_seq); + if (loop_cnt > 100) { + throw std::runtime_error("Infinite loop in AddCommitted!"); + } + AddCommitted(prepare_seq, commit_seq, prepare_skipped, ++loop_cnt); return; } - { + if (!prepare_skipped) { WriteLock wl(&prepared_mutex_); prepared_txns_.erase(prepare_seq); bool was_empty = delayed_prepared_.empty(); @@ -442,6 +445,7 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, const std::vector WritePreparedTxnDB::GetSnapshotListFromDB( SequenceNumber max) { + ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max); InstrumentedMutex(db_impl_->mutex()); return db_impl_->snapshots().GetAll(nullptr, max); } @@ -479,6 +483,8 @@ void WritePreparedTxnDB::ReleaseSnapshotInternal( void WritePreparedTxnDB::UpdateSnapshots( const std::vector& snapshots, const SequenceNumber& version) { + ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64, + version); TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start"); TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start"); #ifndef NDEBUG diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index b2a1af8f6..fd92a04e2 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -20,6 +20,7 @@ #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" +#include "util/string_util.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/transaction_lock_mgr.h" @@ -27,6 +28,10 @@ namespace rocksdb { +#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \ + ; // due to overhead by default skip such lines +// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__) + // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. // In this way some data in the DB might not be committed. The DB provides // mechanisms to tell such data apart from committed data. @@ -102,8 +107,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // be used to idenitfy the snapshots that overlap with the rolled back txn. void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq); // Add the transaction with prepare sequence prepare_seq and commit sequence - // commit_seq to the commit map - void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq); + // commit_seq to the commit map. prepare_skipped is set if the prpeare phase + // is skipped for this commit. loop_cnt is to detect infinite loops. + void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, + bool prepare_skipped = false, uint8_t loop_cnt = 0); struct CommitEntry { uint64_t prep_seq; @@ -120,7 +127,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { : INDEX_BITS(index_bits), PREP_BITS(static_cast(64 - PAD_BITS - INDEX_BITS)), COMMIT_BITS(static_cast(64 - PREP_BITS)), - COMMIT_FILTER(static_cast((1ull << COMMIT_BITS) - 1)) {} + COMMIT_FILTER(static_cast((1ull << COMMIT_BITS) - 1)), + DELTA_UPPERBOUND(static_cast((1ull << COMMIT_BITS))) {} // Number of higher bits of a sequence number that is not used. They are // used to encode the value type, ... const size_t PAD_BITS = static_cast(8); @@ -133,6 +141,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { const size_t COMMIT_BITS; // Filter to encode/decode commit seq const uint64_t COMMIT_FILTER; + // The value of commit_seq - prepare_seq + 1 must be less than this bound + const uint64_t DELTA_UPPERBOUND; }; // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ... @@ -157,7 +167,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { uint64_t delta = cs - ps + 1; // make initialized delta always >= 1 // zero is reserved for uninitialized entries assert(0 < delta); - assert(delta < static_cast((1ull << format.COMMIT_BITS))); + assert(delta < format.DELTA_UPPERBOUND); + if (delta >= format.DELTA_UPPERBOUND) { + throw std::runtime_error( + "commit_seq >> prepare_seq. The allowed distance is " + + ToString(format.DELTA_UPPERBOUND) + " commit_seq is " + + ToString(cs) + " prepare_seq is " + ToString(ps)); + } rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER; rep_ = rep_ | delta; } @@ -332,7 +348,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // commit_cache_ must be initialized to zero to tell apart an empty index from // a filled one. Thread-safety is provided with commit_cache_mutex_. unique_ptr[]> commit_cache_; - // The largest evicted *commit* sequence number from the commit_cache_ + // The largest evicted *commit* sequence number from the commit_cache_. If a + // seq is smaller than max_evicted_seq_ is might or might not be present in + // commit_cache_. So commit_cache_ must first be checked before consulting + // with max_evicted_seq_. std::atomic max_evicted_seq_ = {}; // Advance max_evicted_seq_ by this value each time it needs an update. The // larger the value, the less frequent advances we would have. We do not want @@ -397,9 +416,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { } // else there was no prepare phase if (includes_data_) { // Commit the data that is accompnaied with the commit request - // TODO(myabandeh): skip AddPrepared - db_->AddPrepared(commit_seq); - db_->AddCommitted(commit_seq, commit_seq); + const bool PREPARE_SKIPPED = true; + db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED); } if (db_impl_->immutable_db_options().two_write_queues) { // Publish the sequence number. We can do that here assuming the callback