From 385049baf29d2c1ce7c96f5db2b0fc497639b1b1 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Thu, 28 Sep 2017 16:43:04 -0700 Subject: [PATCH] WritePrepared Txn: Recovery Summary: Recover txns from the WAL. Also added some unit tests. Closes https://github.com/facebook/rocksdb/pull/2901 Differential Revision: D5859596 Pulled By: maysamyabandeh fbshipit-source-id: 6424967b231388093b4effffe0a3b1b7ec8caeb0 --- db/db_impl.cc | 4 + db/db_impl.h | 12 +- db/db_impl_files.cc | 2 + db/db_impl_write.cc | 4 +- db/memtable.cc | 2 +- db/version_set.cc | 18 +- db/write_batch.cc | 86 ++- db/write_batch_test.cc | 5 +- db/write_thread.cc | 1 + db/write_thread.h | 3 + include/rocksdb/utilities/transaction.h | 15 + include/rocksdb/write_batch.h | 2 +- .../pessimistic_transaction_db.cc | 43 +- .../transactions/pessimistic_transaction_db.h | 16 +- utilities/transactions/transaction_test.cc | 164 +++--- utilities/transactions/transaction_test.h | 99 ++++ .../write_prepared_transaction_test.cc | 505 +++++++++++++++--- utilities/transactions/write_prepared_txn.cc | 20 +- utilities/transactions/write_prepared_txn.h | 3 +- 19 files changed, 802 insertions(+), 202 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 101c3fe7c..09b59fad3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -940,6 +940,10 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, SequenceNumber snapshot; if (read_options.snapshot != nullptr) { + // Note: In WritePrepared txns this is not necessary but not harmful either. + // Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is + // specified we should be fine with skipping seq numbers that are greater + // than that. snapshot = reinterpret_cast( read_options.snapshot)->number_; } else { diff --git a/db/db_impl.h b/db/db_impl.h index 9ffd9f1ea..718638fc5 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -510,9 +510,11 @@ class DBImpl : public DB { uint64_t log_number_; std::string name_; WriteBatch* batch_; + // The seq number of the first key in the batch + SequenceNumber seq_; explicit RecoveredTransaction(const uint64_t log, const std::string& name, - WriteBatch* batch) - : log_number_(log), name_(name), batch_(batch) {} + WriteBatch* batch, SequenceNumber seq) + : log_number_(log), name_(name), batch_(batch), seq_(seq) {} ~RecoveredTransaction() { delete batch_; } }; @@ -534,8 +536,9 @@ class DBImpl : public DB { } void InsertRecoveredTransaction(const uint64_t log, const std::string& name, - WriteBatch* batch) { - recovered_transactions_[name] = new RecoveredTransaction(log, name, batch); + WriteBatch* batch, SequenceNumber seq) { + recovered_transactions_[name] = + new RecoveredTransaction(log, name, batch, seq); MarkLogAsContainingPrepSection(log); } @@ -640,6 +643,7 @@ class DBImpl : public DB { friend class PessimisticTransaction; friend class WriteCommittedTxn; friend class WritePreparedTxn; + friend class WritePreparedTxnDB; friend class WriteBatchWithIndex; #ifndef ROCKSDB_LITE friend class ForwardIterator; diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 4add98a81..66030fd7e 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -48,6 +48,7 @@ uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() { return min_log; } +// TODO(myabandeh): Avoid using locks void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { assert(log != 0); std::lock_guard lock(prep_heap_mutex_); @@ -56,6 +57,7 @@ void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { it->second += 1; } +// TODO(myabandeh): Avoid using locks void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) { assert(log != 0); std::lock_guard lock(prep_heap_mutex_); diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 71304dcc2..eb8e5dd20 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -264,9 +264,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } else { SequenceNumber next_sequence = current_sequence; for (auto* writer : write_group) { - if (writer->ShouldWriteToMemtable()) { - writer->sequence = next_sequence; - } + writer->sequence = next_sequence; if (seq_per_batch_) { next_sequence++; } else if (writer->ShouldWriteToMemtable()) { diff --git a/db/memtable.cc b/db/memtable.cc index 4d5247e96..19232f9d7 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -475,7 +475,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, } // The first sequence number inserted into the memtable - assert(first_seqno_ == 0 || s > first_seqno_); + assert(first_seqno_ == 0 || s >= first_seqno_); if (first_seqno_ == 0) { first_seqno_.store(s, std::memory_order_relaxed); diff --git a/db/version_set.cc b/db/version_set.cc index 870edb6e7..25f873edb 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2655,7 +2655,14 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { assert(edit->IsColumnFamilyManipulation()); edit->SetNextFile(next_file_number_.load()); - edit->SetLastSequence(last_sequence_); + // The log might have data that is not visible to memtbale and hence have not + // updated the last_sequence_ yet. It is also possible that the log has is + // expecting some new data that is not written yet. Since LastSequence is an + // upper bound on the sequence, it is ok to record + // last_to_be_written_sequence_ as the last sequence. + edit->SetLastSequence(db_options_->concurrent_prepare + ? last_to_be_written_sequence_ + : last_sequence_); if (edit->is_column_family_drop_) { // if we drop column family, we have to make sure to save max column family, // so that we don't reuse existing ID @@ -2678,7 +2685,14 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, edit->SetPrevLogNumber(prev_log_number_); } edit->SetNextFile(next_file_number_.load()); - edit->SetLastSequence(last_sequence_); + // The log might have data that is not visible to memtbale and hence have not + // updated the last_sequence_ yet. It is also possible that the log has is + // expecting some new data that is not written yet. Since LastSequence is an + // upper bound on the sequence, it is ok to record + // last_to_be_written_sequence_ as the last sequence. + edit->SetLastSequence(db_options_->concurrent_prepare + ? last_to_be_written_sequence_ + : last_sequence_); builder->Apply(edit); } diff --git a/db/write_batch.cc b/db/write_batch.cc index 90f361140..8e7a537cd 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -366,7 +366,11 @@ Status WriteBatch::Iterate(Handler* handler) const { input.remove_prefix(WriteBatchInternal::kHeader); Slice key, value, blob, xid; - bool first_tag = true; + // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as + // the batch boundry sybmols otherwise we would mis-count the number of + // batches. We do that by checking whether the accumulated batch is empty + // before seeing the next Noop. + bool empty_batch = true; int found = 0; Status s; while (s.ok() && !input.empty() && handler->Continue()) { @@ -385,6 +389,7 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); s = handler->PutCF(column_family, key, value); + empty_batch = false; found++; break; case kTypeColumnFamilyDeletion: @@ -392,6 +397,7 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); s = handler->DeleteCF(column_family, key); + empty_batch = false; found++; break; case kTypeColumnFamilySingleDeletion: @@ -399,6 +405,7 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); s = handler->SingleDeleteCF(column_family, key); + empty_batch = false; found++; break; case kTypeColumnFamilyRangeDeletion: @@ -406,6 +413,7 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE)); s = handler->DeleteRangeCF(column_family, key, value); + empty_batch = false; found++; break; case kTypeColumnFamilyMerge: @@ -413,38 +421,44 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); s = handler->MergeCF(column_family, key, value); + empty_batch = false; found++; break; case kTypeLogData: handler->LogData(blob); + empty_batch = true; break; case kTypeBeginPrepareXID: assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); handler->MarkBeginPrepare(); + empty_batch = false; break; case kTypeEndPrepareXID: assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE)); handler->MarkEndPrepare(xid); + empty_batch = true; break; case kTypeCommitXID: assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT)); handler->MarkCommit(xid); + empty_batch = true; break; case kTypeRollbackXID: assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK)); handler->MarkRollback(xid); + empty_batch = true; break; case kTypeNoop: - handler->MarkNoop(first_tag); + handler->MarkNoop(empty_batch); + empty_batch = true; break; default: return Status::Corruption("unknown WriteBatch tag"); } - first_tag = false; } if (!s.ok()) { return s; @@ -841,9 +855,12 @@ class MemTableInserter : public WriteBatch::Handler { PostMapType mem_post_info_map_; // current recovered transaction we are rebuilding (recovery) WriteBatch* rebuilding_trx_; + SequenceNumber rebuilding_trx_seq_; // Increase seq number once per each write batch. Otherwise increase it once // per key. bool seq_per_batch_; + // Whether the memtable write will be done only after the commit + bool write_after_commit_; MemPostInfoMap& GetPostMap() { assert(concurrent_memtable_writes_); @@ -873,7 +890,11 @@ class MemTableInserter : public WriteBatch::Handler { post_info_created_(false), has_valid_writes_(has_valid_writes), rebuilding_trx_(nullptr), - seq_per_batch_(seq_per_batch) { + seq_per_batch_(seq_per_batch), + // Write after commit currently uses one seq per key (instead of per + // batch). So seq_per_batch being false indicates write_after_commit + // approach. + write_after_commit_(!seq_per_batch) { assert(cf_mems_); } @@ -952,7 +973,10 @@ class MemTableInserter : public WriteBatch::Handler { const Slice& value) override { if (rebuilding_trx_ != nullptr) { WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); - return Status::OK(); + if (write_after_commit_) { + return Status::OK(); + } + // else insert the values to the memtable right away } Status seek_status; @@ -1030,7 +1054,10 @@ class MemTableInserter : public WriteBatch::Handler { const Slice& key) override { if (rebuilding_trx_ != nullptr) { WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); - return Status::OK(); + if (write_after_commit_) { + return Status::OK(); + } + // else insert the values to the memtable right away } Status seek_status; @@ -1046,7 +1073,10 @@ class MemTableInserter : public WriteBatch::Handler { const Slice& key) override { if (rebuilding_trx_ != nullptr) { WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); - return Status::OK(); + if (write_after_commit_) { + return Status::OK(); + } + // else insert the values to the memtable right away } Status seek_status; @@ -1064,7 +1094,10 @@ class MemTableInserter : public WriteBatch::Handler { if (rebuilding_trx_ != nullptr) { WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, begin_key, end_key); - return Status::OK(); + if (write_after_commit_) { + return Status::OK(); + } + // else insert the values to the memtable right away } Status seek_status; @@ -1094,7 +1127,10 @@ class MemTableInserter : public WriteBatch::Handler { assert(!concurrent_memtable_writes_); if (rebuilding_trx_ != nullptr) { WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); - return Status::OK(); + if (write_after_commit_) { + return Status::OK(); + } + // else insert the values to the memtable right away } Status seek_status; @@ -1200,6 +1236,7 @@ class MemTableInserter : public WriteBatch::Handler { // we are now iterating through a prepared section rebuilding_trx_ = new WriteBatch(); + rebuilding_trx_seq_ = sequence_; if (has_valid_writes_ != nullptr) { *has_valid_writes_ = true; } @@ -1215,7 +1252,7 @@ class MemTableInserter : public WriteBatch::Handler { if (recovering_log_number_ != 0) { assert(db_->allow_2pc()); db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(), - rebuilding_trx_); + rebuilding_trx_, rebuilding_trx_seq_); rebuilding_trx_ = nullptr; } else { assert(rebuilding_trx_ == nullptr); @@ -1226,10 +1263,10 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } - Status MarkNoop(bool first_tag) override { + Status MarkNoop(bool empty_batch) override { // A hack in pessimistic transaction could result into a noop at the start // of the write batch, that should be ignored. - if (!first_tag) { + if (!empty_batch) { // In the absence of Prepare markers, a kTypeNoop tag indicates the end of // a batch. This happens when write batch commits skipping the prepare // phase. @@ -1257,12 +1294,13 @@ class MemTableInserter : public WriteBatch::Handler { // at this point individual CF lognumbers will prevent // duplicate re-insertion of values. assert(log_number_ref_ == 0); - // all insertes must reference this trx log number - log_number_ref_ = trx->log_number_; - s = trx->batch_->Iterate(this); - // TODO(myabandeh): In WritePrepared txn, a commit marker should - // reference the log that contains the prepare marker. - log_number_ref_ = 0; + if (write_after_commit_) { + // all insertes must reference this trx log number + log_number_ref_ = trx->log_number_; + s = trx->batch_->Iterate(this); + log_number_ref_ = 0; + } + // else the values are already inserted before the commit if (s.ok()) { db_->DeleteRecoveredTransaction(name.ToString()); @@ -1272,12 +1310,10 @@ class MemTableInserter : public WriteBatch::Handler { } } } else { - // TODO(myabandeh): In WritePrepared txn, a commit marker should - // reference the log that contains the prepare marker. This is to be able - // to reconsutrct the prepared list after recovery. - // TODO(myabandeh): In WritePrepared txn, we do not reach here since - // disable_memtable is set for commit. - assert(log_number_ref_ > 0); + // When writes are not delayed until commit, there is no disconnect + // between a memtable write and the WAL that supports it. So the commit + // need not reference any log as the only log to which it depends. + assert(!write_after_commit_ || log_number_ref_ > 0); } const bool batch_boundry = true; MaybeAdvanceSeq(batch_boundry); @@ -1330,6 +1366,8 @@ Status WriteBatchInternal::InsertInto( nullptr /*has_valid_writes*/, seq_per_batch); for (auto w : write_group) { if (!w->ShouldWriteToMemtable()) { + inserter.MaybeAdvanceSeq(true); + w->sequence = inserter.sequence(); continue; } SetSequence(w->batch, inserter.sequence()); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 624d39598..a3722045f 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -18,7 +18,6 @@ #include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" -#include "util/logging.h" #include "util/string_util.h" #include "util/testharness.h" @@ -299,8 +298,8 @@ namespace { seen += "MarkEndPrepare(" + xid.ToString() + ")"; return Status::OK(); } - virtual Status MarkNoop(bool first_tag) override { - seen += "MarkNoop(" + std::string(first_tag ? "true" : "false") + ")"; + virtual Status MarkNoop(bool empty_batch) override { + seen += "MarkNoop(" + std::string(empty_batch ? "true" : "false") + ")"; return Status::OK(); } virtual Status MarkCommit(const Slice& xid) override { diff --git a/db/write_thread.cc b/db/write_thread.cc index 2d3b34602..27c20e40b 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -394,6 +394,7 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, write_group->last_writer = w; write_group->size++; } + TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w); return size; } diff --git a/db/write_thread.h b/db/write_thread.h index 57ce71e08..67150a858 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -14,6 +14,7 @@ #include #include +#include "db/dbformat.h" #include "db/write_callback.h" #include "monitoring/instrumented_mutex.h" #include "rocksdb/options.h" @@ -142,6 +143,7 @@ class WriteThread { made_waitable(false), state(STATE_INIT), write_group(nullptr), + sequence(kMaxSequenceNumber), link_older(nullptr), link_newer(nullptr) {} @@ -158,6 +160,7 @@ class WriteThread { made_waitable(false), state(STATE_INIT), write_group(nullptr), + sequence(kMaxSequenceNumber), link_older(nullptr), link_newer(nullptr) {} diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 759988c5c..aa00dffa9 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -460,6 +460,14 @@ class Transaction { TransactionState GetState() const { return txn_state_; } void SetState(TransactionState state) { txn_state_ = state; } + // NOTE: Experimental feature + // The globally unique id with which the transaction is identified. This id + // might or might not be set depending on the implementation. Similarly the + // implementation decides the point in lifetime of a transaction at which it + // assigns the id. Although currently it is the case, the id is not guaranteed + // to remain the same across restarts. + uint64_t GetId() { return id_; } + protected: explicit Transaction(const TransactionDB* db) {} Transaction() {} @@ -472,7 +480,14 @@ class Transaction { // Execution status of the transaction. std::atomic txn_state_; + uint64_t id_ = 0; + virtual void SetId(uint64_t id) { + assert(id_ == 0); + id_ = id; + } + private: + friend class PessimisticTransactionDB; // No copying allowed Transaction(const Transaction&); void operator=(const Transaction&); diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 2ca40bbac..0dc844773 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -244,7 +244,7 @@ class WriteBatch : public WriteBatchBase { return Status::InvalidArgument("MarkEndPrepare() handler not defined."); } - virtual Status MarkNoop(bool first_tag) { + virtual Status MarkNoop(bool empty_batch) { return Status::InvalidArgument("MarkNoop() handler not defined."); } diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 90783bb17..301c5d5b9 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -115,6 +115,8 @@ Status PessimisticTransactionDB::Initialize( Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr); assert(real_trx); real_trx->SetLogNumber(recovered_trx->log_number_); + assert(recovered_trx->seq_ != kMaxSequenceNumber); + real_trx->SetId(recovered_trx->seq_); s = real_trx->SetName(recovered_trx->name_); if (!s.ok()) { @@ -133,6 +135,23 @@ Status PessimisticTransactionDB::Initialize( return s; } +Status WritePreparedTxnDB::Initialize( + const std::vector& compaction_enabled_cf_indices, + const std::vector& handles) { + auto dbimpl = reinterpret_cast(GetRootDB()); + assert(dbimpl != nullptr); + auto rtxns = dbimpl->recovered_transactions(); + for (auto rtxn : rtxns) { + AddPrepared(rtxn.second->seq_); + } + SequenceNumber prev_max = max_evicted_seq_; + SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); + AdvanceMaxEvictedSeq(prev_max, last_seq); + auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, + handles); + return s; +} + Transaction* WriteCommittedTxnDB::BeginTransaction( const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) { @@ -547,6 +566,19 @@ void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) { transactions_.erase(it); } +Status WritePreparedTxnDB::Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) { + // We are fine with the latest committed value. This could be done by + // specifying the snapshot as kMaxSequenceNumber. + WritePreparedTxnReadCallback callback(this, kMaxSequenceNumber); + bool* dont_care = nullptr; + // Note: no need to specify a snapshot for read options as no specific + // snapshot is requested by the user. + return db_impl_->GetImpl(options, column_family, key, value, dont_care, + &callback); +} + // Returns true if commit_seq <= snapshot_seq bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq) { @@ -571,14 +603,14 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, CommitEntry64b dont_care; CommitEntry cached; bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); - if (!exist) { - // It is not committed, so it must be still prepared - return false; - } - if (prep_seq == cached.prep_seq) { + if (exist && prep_seq == cached.prep_seq) { // It is committed and also not evicted from commit cache return cached.commit_seq <= snapshot_seq; } + // else it could be committed but not inserted in the map which could happen + // after recovery, or it could be committed and evicted by another commit, or + // never committed. + // At this point we dont know if it was committed or it is still prepared auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire); if (max_evicted_seq < prep_seq) { @@ -618,6 +650,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, void WritePreparedTxnDB::AddPrepared(uint64_t seq) { ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq); + assert(seq > max_evicted_seq_); WriteLock wl(&prepared_mutex_); prepared_txns_.push(seq); } diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 3a0ad153a..4a75990b2 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -33,8 +33,9 @@ class PessimisticTransactionDB : public TransactionDB { virtual ~PessimisticTransactionDB(); - Status Initialize(const std::vector& compaction_enabled_cf_indices, - const std::vector& handles); + virtual Status Initialize( + const std::vector& compaction_enabled_cf_indices, + const std::vector& handles); Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, @@ -191,10 +192,19 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { virtual ~WritePreparedTxnDB() {} + virtual Status Initialize( + const std::vector& compaction_enabled_cf_indices, + const std::vector& handles) override; + Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) override; + using DB::Get; + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) override; + // Check whether the transaction that wrote the value with seqeunce number seq // is visible to the snapshot with sequence number snapshot_seq bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq); @@ -294,6 +304,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class PreparedHeap_BasicsTest_Test; friend class WritePreparedTxnDBMock; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; + friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; + friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; void init(const TransactionDBOptions& /* unused */) { // Adcance max_evicted_seq_ no more than 100 times before the cache wraps diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index df8ba5c86..bb03a22b5 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -4640,89 +4640,107 @@ TEST_P(TransactionTest, MemoryLimitTest) { delete txn; } -// This test clarfies the existing expectation from the sequence number +// This test clarifies the existing expectation from the sequence number // algorithm. It could detect mistakes in updating the code but it is not // necessarily the one acceptable way. If the algorithm is legitimately changed, // this unit test should be updated as well. TEST_P(TransactionTest, SeqAdvanceTest) { - auto pdb = reinterpret_cast(db); - DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); - auto seq = db_impl->GetLatestSequenceNumber(); - auto exp_seq = seq; - - // Test DB's internal txn. It involves no prepare phase nor a commit marker. WriteOptions wopts; - auto s = db->Put(wopts, "key", "value"); - // Consume one seq per key - exp_seq++; - ASSERT_OK(s); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); + FlushOptions fopt; - // Doing it twice might detect some bugs - s = db->Put(wopts, "key", "value"); - exp_seq++; - ASSERT_OK(s); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); + // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some + // of the branches. This is the same as counting a binary number where i-th + // bit represents whether we take branch i in the represented by the number. + const size_t NUM_BRANCHES = 8; + // Helper function that shows if the branch is to be taken in the run + // represented by the number n. + auto branch_do = [&](size_t n, size_t* branch) { + assert(*branch < NUM_BRANCHES); + const size_t filter = static_cast(1) << *branch; + return n & filter; + }; + const size_t max_n = static_cast(1) << NUM_BRANCHES; + for (size_t n = 0; n < max_n; n++, ReOpen()) { + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + size_t branch = 0; + auto seq = db_impl->GetLatestSequenceNumber(); + exp_seq = seq; + txn_t0(0); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); - // Testing directly writing a write batch. Functionality-wise it is equivalent - // to commit without prepare. - WriteBatch wb; - wb.Put("k1", "v1"); - wb.Put("k2", "v2"); - wb.Put("k3", "v3"); - s = db->Write(wopts, &wb); - // One seq per key. - exp_seq += 3; - ASSERT_OK(s); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); + if (branch_do(n, &branch)) { + db_impl->Flush(fopt); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + if (branch_do(n, &branch)) { + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } - // A full 2pc txn that also involves a commit marker. - TransactionOptions txn_options; - WriteOptions write_options; - Transaction* txn = db->BeginTransaction(write_options, txn_options); - s = txn->SetName("xid"); - ASSERT_OK(s); - s = txn->Put(Slice("foo"), Slice("bar")); - s = txn->Put(Slice("foo2"), Slice("bar2")); - s = txn->Put(Slice("foo3"), Slice("bar3")); - s = txn->Put(Slice("foo4"), Slice("bar4")); - s = txn->Put(Slice("foo5"), Slice("bar5")); - ASSERT_OK(s); - s = txn->Prepare(); - ASSERT_OK(s); - // Consume one seq per key - exp_seq += 5; - s = txn->Commit(); - ASSERT_OK(s); + // Doing it twice might detect some bugs + txn_t0(1); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); - s = db->Put(wopts, "key", "value"); - exp_seq++; - ASSERT_OK(s); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - delete txn; + txn_t1(0); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); - // Commit without prepare. It shoudl write to DB without a commit marker. - txn = db->BeginTransaction(write_options, txn_options); - s = txn->SetName("xid2"); - ASSERT_OK(s); - s = txn->Put(Slice("foo"), Slice("bar")); - s = txn->Put(Slice("foo2"), Slice("bar2")); - s = txn->Put(Slice("foo3"), Slice("bar3")); - s = txn->Put(Slice("foo4"), Slice("bar4")); - s = txn->Put(Slice("foo5"), Slice("bar5")); - ASSERT_OK(s); - s = txn->Commit(); - ASSERT_OK(s); - // One seq per key - exp_seq += 5; - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - pdb->UnregisterTransaction(txn); - delete txn; + if (branch_do(n, &branch)) { + db_impl->Flush(fopt); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + if (branch_do(n, &branch)) { + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + + txn_t3(0); + // Since commit marker does not write to memtable, the last seq number is + // not updated immediately. But the advance should be visible after the next + // write. + + if (branch_do(n, &branch)) { + db_impl->Flush(fopt); + } + if (branch_do(n, &branch)) { + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + + txn_t0(0); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + txn_t2(0); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + if (branch_do(n, &branch)) { + db_impl->Flush(fopt); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + if (branch_do(n, &branch)) { + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + } } } // namespace rocksdb diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 7fc028a4e..1a9daaddc 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -124,6 +124,105 @@ class TransactionTest : public ::testing::TestWithParam< } return s; } + + std::atomic linked = {0}; + std::atomic exp_seq = {0}; + std::atomic commit_writes = {0}; + std::atomic expected_commits = {0}; + std::function txn_t0_with_status = [&](size_t index, + Status exp_s) { + // Test DB's internal txn. It involves no prepare phase nor a commit marker. + WriteOptions wopts; + auto s = db->Put(wopts, "key" + std::to_string(index), "value"); + ASSERT_EQ(exp_s, s); + if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { + // Consume one seq per key + exp_seq++; + } else { + // Consume one seq per batch + exp_seq++; + } + }; + std::function txn_t0 = [&](size_t index) { + return txn_t0_with_status(index, Status::OK()); + }; + std::function txn_t1 = [&](size_t index) { + // Testing directly writing a write batch. Functionality-wise it is + // equivalent to commit without prepare. + WriteBatch wb; + auto istr = std::to_string(index); + wb.Put("k1" + istr, "v1"); + wb.Put("k2" + istr, "v2"); + wb.Put("k3" + istr, "v3"); + WriteOptions wopts; + auto s = db->Write(wopts, &wb); + if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { + // Consume one seq per key + exp_seq += 3; + ; + } else { + // Consume one seq per batch + exp_seq++; + } + ASSERT_OK(s); + }; + std::function txn_t2 = [&](size_t index) { + // Commit without prepare. It should write to DB without a commit marker. + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + auto istr = std::to_string(index); + auto s = txn->SetName("xid" + istr); + ASSERT_OK(s); + s = txn->Put(Slice("foo" + istr), Slice("bar")); + s = txn->Put(Slice("foo2" + istr), Slice("bar2")); + s = txn->Put(Slice("foo3" + istr), Slice("bar3")); + s = txn->Put(Slice("foo4" + istr), Slice("bar4")); + ASSERT_OK(s); + s = txn->Commit(); + ASSERT_OK(s); + if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { + // Consume one seq per key + exp_seq += 4; + } else { + // Consume one seq per batch + exp_seq++; + } + auto pdb = reinterpret_cast(db); + pdb->UnregisterTransaction(txn); + delete txn; + }; + std::function txn_t3 = [&](size_t index) { + // A full 2pc txn that also involves a commit marker. + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + auto istr = std::to_string(index); + auto s = txn->SetName("xid" + istr); + ASSERT_OK(s); + s = txn->Put(Slice("foo" + istr), Slice("bar")); + s = txn->Put(Slice("foo2" + istr), Slice("bar2")); + s = txn->Put(Slice("foo3" + istr), Slice("bar3")); + s = txn->Put(Slice("foo4" + istr), Slice("bar4")); + s = txn->Put(Slice("foo5" + istr), Slice("bar5")); + ASSERT_OK(s); + expected_commits++; + s = txn->Prepare(); + ASSERT_OK(s); + commit_writes++; + s = txn->Commit(); + ASSERT_OK(s); + if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { + // Consume one seq per key + exp_seq += 5; + } else { + // Consume one seq per batch + exp_seq++; + // Consume one seq per commit marker + exp_seq++; + } + delete txn; + }; }; class MySQLStyleTransactionTest : public TransactionTest {}; diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 044133855..fb0ff7a8e 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -285,9 +285,10 @@ class WritePreparedTransactionTest : public TransactionTest { } }; +// TODO(myabandeh): enable it for concurrent_prepare INSTANTIATE_TEST_CASE_P(WritePreparedTransactionTest, WritePreparedTransactionTest, - ::testing::Values(std::make_tuple(false, true, + ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED))); TEST_P(WritePreparedTransactionTest, CommitMapTest) { @@ -552,95 +553,441 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) { } } -// This test clarfies the existing expectation from the sequence number -// algorithm. It could detect mistakes in updating the code but it is not -// necessarily the one acceptable way. If the algorithm is legitimately changed, -// this unit test should be updated as well. +// TODO(myabandeh): remove this redundant test after transaction_test is enabled +// with WRITE_PREPARED too This test clarifies the existing expectation from the +// sequence number algorithm. It could detect mistakes in updating the code but +// it is not necessarily the one acceptable way. If the algorithm is +// legitimately changed, this unit test should be updated as well. TEST_P(WritePreparedTransactionTest, SeqAdvanceTest) { - auto pdb = reinterpret_cast(db); - DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); - auto seq = db_impl->GetLatestSequenceNumber(); - auto exp_seq = seq; - - // Test DB's internal txn. It involves no prepare phase nor a commit marker. WriteOptions wopts; - auto s = db->Put(wopts, "key", "value"); - // Consume one seq per batch - exp_seq++; - ASSERT_OK(s); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); + FlushOptions fopt; - // Doing it twice might detect some bugs - s = db->Put(wopts, "key", "value"); - exp_seq++; - ASSERT_OK(s); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); + // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some + // of the branches. This is the same as counting a binary number where i-th + // bit represents whether we take branch i in the represented by the number. + const size_t NUM_BRANCHES = 8; + // Helper function that shows if the branch is to be taken in the run + // represented by the number n. + auto branch_do = [&](size_t n, size_t* branch) { + assert(*branch < NUM_BRANCHES); + const size_t filter = static_cast(1) << *branch; + return n & filter; + }; + const size_t max_n = static_cast(1) << NUM_BRANCHES; + for (size_t n = 0; n < max_n; n++, ReOpen()) { + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + size_t branch = 0; + auto seq = db_impl->GetLatestSequenceNumber(); + exp_seq = seq; + txn_t0(0); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); - // Testing directly writing a write batch. Functionality-wise it is equivalent - // to commit without prepare. - WriteBatch wb; - wb.Put("k1", "v1"); - wb.Put("k2", "v2"); - wb.Put("k3", "v3"); - s = pdb->Write(wopts, &wb); - // Consume one seq per batch - exp_seq++; - ASSERT_OK(s); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); + if (branch_do(n, &branch)) { + db_impl->Flush(fopt); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + if (branch_do(n, &branch)) { + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + + // Doing it twice might detect some bugs + txn_t0(1); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + txn_t1(0); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + if (branch_do(n, &branch)) { + db_impl->Flush(fopt); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + if (branch_do(n, &branch)) { + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + + txn_t3(0); + // Since commit marker does not write to memtable, the last seq number is + // not updated immediately. But the advance should be visible after the next + // write. + + if (branch_do(n, &branch)) { + db_impl->Flush(fopt); + } + if (branch_do(n, &branch)) { + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + + txn_t0(0); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + txn_t2(0); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + if (branch_do(n, &branch)) { + db_impl->Flush(fopt); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + if (branch_do(n, &branch)) { + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } + } +} + +TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { + // Given the sequential run of txns, with this timeout we should never see a + // deadlock nor a timeout unless we have a key conflict, which should be + // almost infeasible. + txn_db_options.transaction_lock_timeout = 1000; + txn_db_options.default_lock_timeout = 1000; + ReOpen(); + FlushOptions fopt; + + // Number of different txn types we use in this test + const size_t type_cnt = 4; + // The size of the first write group + // TODO(myabandeh): This should be increase for pre-release tests + const size_t first_group_size = 2; + // Total number of txns we run in each test + const size_t txn_cnt = first_group_size * 2; + + size_t base[txn_cnt + 1] = { + 1, + }; + for (size_t bi = 1; bi <= txn_cnt; bi++) { + base[bi] = base[bi - 1] * type_cnt; + } + const size_t max_n = static_cast(std::pow(type_cnt, txn_cnt)); + printf("Number of cases being tested is %" PRIu64 "\n", max_n); + for (size_t n = 0; n < max_n; n++, ReOpen()) { + if (n % 1000 == 0) { + printf("Tested %" PRIu64 " cases so far\n", n); + } + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + auto seq = db_impl->GetLatestSequenceNumber(); + exp_seq = seq; + // This is increased before writing the batch for commit + commit_writes = 0; + // This is increased before txn starts linking if it expects to do a commit + // eventually + expected_commits = 0; + std::vector threads; + + linked = 0; + std::atomic batch_formed(false); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::EnterAsBatchGroupLeader:End", + [&](void* arg) { batch_formed = true; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + linked++; + if (linked == 1) { + // Wait until the others are linked too. + while (linked < first_group_size) { + } + } else if (linked == 1 + first_group_size) { + // Make the 2nd batch of the rest of writes plus any followup + // commits from the first batch + while (linked < txn_cnt + commit_writes) { + } + } + // Then we will have one or more batches consisting of follow-up + // commits from the 2nd batch. There is a bit of non-determinism here + // but it should be tolerable. + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + for (size_t bi = 0; bi < txn_cnt; bi++) { + size_t d = + (n % base[bi + 1]) / + base[bi]; // get the bi-th digit in number system based on type_cnt + switch (d) { + case 0: + threads.emplace_back(txn_t0, bi); + break; + case 1: + threads.emplace_back(txn_t1, bi); + break; + case 2: + threads.emplace_back(txn_t2, bi); + break; + case 3: + threads.emplace_back(txn_t3, bi); + break; + default: + assert(false); + } + // wait to be linked + while (linked.load() <= bi) { + } + if (bi + 1 == + first_group_size) { // after a queue of size first_group_size + while (!batch_formed) { + } + // to make it more deterministic, wait until the commits are linked + while (linked.load() <= bi + expected_commits) { + } + } + } + for (auto& t : threads) { + t.join(); + } + if (txn_db_options.write_policy == WRITE_PREPARED) { + // In this case none of the above scheduling tricks to deterministically + // form merged bactches works because the writes go to saparte queues. + // This would result in different write groups in each run of the test. We + // still keep the test since althgouh non-deterministic and hard to debug, + // it is still useful to have. Since in this case we could finish with + // commit writes that dont write to memtable, the seq is not advanced in + // this code path. It will be after the next write. So we do one more + // write to make the impact of last seq visible. + txn_t0(0); + } + // Check if memtable inserts advanced seq number as expected + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Check if recovery preserves the last sequence number + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + // Check if flush preserves the last sequence number + db_impl->Flush(fopt); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + // Check if recovery after flush preserves the last sequence number + db_impl->FlushWAL(true); + ReOpenNoDelete(); + db_impl = reinterpret_cast(db->GetRootDB()); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + } +} + +// Run a couple of differnet txns among them some uncommitted. Restart the db at +// a couple points to check whether the list of uncommitted txns are recovered +// properly. +TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) { + options.disable_auto_compactions = true; + ReOpen(); + WritePreparedTxnDB* wp_db = dynamic_cast(db); + + txn_t0(0); - // A full 2pc txn that also involves a commit marker. TransactionOptions txn_options; WriteOptions write_options; - Transaction* txn = db->BeginTransaction(write_options, txn_options); - s = txn->SetName("xid"); + size_t index = 1000; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + auto istr0 = std::to_string(index); + auto s = txn0->SetName("xid" + istr0); ASSERT_OK(s); - s = txn->Put(Slice("foo"), Slice("bar")); - s = txn->Put(Slice("foo2"), Slice("bar2")); - s = txn->Put(Slice("foo3"), Slice("bar3")); - s = txn->Put(Slice("foo4"), Slice("bar4")); - s = txn->Put(Slice("foo5"), Slice("bar5")); + s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0)); ASSERT_OK(s); - s = txn->Prepare(); - ASSERT_OK(s); - // Consume one seq per batch - exp_seq++; - s = txn->Commit(); - ASSERT_OK(s); - // Consume one seq per commit marker - exp_seq++; - // Since commit marker does not write to memtable, the last seq number is not - // updated immedaitely. But the advance should be visible after the next - // write. + s = txn0->Prepare(); + auto prep_seq_0 = txn0->GetId(); - s = db->Put(wopts, "key", "value"); - // Consume one seq per batch - exp_seq++; - ASSERT_OK(s); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - delete txn; + txn_t1(0); - // Commit without prepare. It shoudl write to DB without a commit marker. - txn = db->BeginTransaction(write_options, txn_options); - s = txn->SetName("xid2"); + index++; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + auto istr1 = std::to_string(index); + s = txn1->SetName("xid" + istr1); ASSERT_OK(s); - s = txn->Put(Slice("foo"), Slice("bar")); - s = txn->Put(Slice("foo2"), Slice("bar2")); - s = txn->Put(Slice("foo3"), Slice("bar3")); - s = txn->Put(Slice("foo4"), Slice("bar4")); - s = txn->Put(Slice("foo5"), Slice("bar5")); + s = txn1->Put(Slice("foo1" + istr1), Slice("bar")); ASSERT_OK(s); - s = txn->Commit(); + s = txn1->Prepare(); + auto prep_seq_1 = txn1->GetId(); + + txn_t2(0); + + ReadOptions ropt; + PinnableSlice pinnable_val; + // Check the value is not committed before restart + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + pinnable_val.Reset(); + + delete txn0; + delete txn1; + wp_db->db_impl_->FlushWAL(true); + ReOpenNoDelete(); + wp_db = dynamic_cast(db); + // After recovery, all the uncommitted txns (0 and 1) should be inserted into + // delayed_prepared_ + ASSERT_TRUE(wp_db->prepared_txns_.empty()); + ASSERT_FALSE(wp_db->delayed_prepared_empty_); + ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_); + ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_); + { + ReadLock rl(&wp_db->prepared_mutex_); + ASSERT_EQ(2, wp_db->delayed_prepared_.size()); + ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) != + wp_db->delayed_prepared_.end()); + ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) != + wp_db->delayed_prepared_.end()); + } + + // Check the value is still not committed after restart + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + pinnable_val.Reset(); + + txn_t3(0); + + // Test that a recovered txns will be properly marked committed for the next + // recovery + txn1 = db->GetTransactionByName("xid" + istr1); + ASSERT_NE(txn1, nullptr); + txn1->Commit(); + + index++; + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + auto istr2 = std::to_string(index); + s = txn2->SetName("xid" + istr2); ASSERT_OK(s); - // Consume one seq per batch - exp_seq++; - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - pdb->UnregisterTransaction(txn); - delete txn; + s = txn2->Put(Slice("foo2" + istr2), Slice("bar")); + ASSERT_OK(s); + s = txn2->Prepare(); + auto prep_seq_2 = txn2->GetId(); + + delete txn2; + wp_db->db_impl_->FlushWAL(true); + ReOpenNoDelete(); + wp_db = dynamic_cast(db); + ASSERT_TRUE(wp_db->prepared_txns_.empty()); + ASSERT_FALSE(wp_db->delayed_prepared_empty_); + + // 0 and 2 are prepared and 1 is committed + { + ReadLock rl(&wp_db->prepared_mutex_); + ASSERT_EQ(2, wp_db->delayed_prepared_.size()); + const auto& end = wp_db->delayed_prepared_.end(); + ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end); + ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end); + ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end); + } + ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_); + ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_); + + // Commit all the remaining txns + txn0 = db->GetTransactionByName("xid" + istr0); + ASSERT_NE(txn0, nullptr); + txn0->Commit(); + txn2 = db->GetTransactionByName("xid" + istr2); + ASSERT_NE(txn2, nullptr); + txn2->Commit(); + + // Check the value is committed after commit + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); + ASSERT_TRUE(s.ok()); + ASSERT_TRUE(pinnable_val == ("bar0" + istr0)); + pinnable_val.Reset(); + + delete txn0; + delete txn2; + wp_db->db_impl_->FlushWAL(true); + ReOpenNoDelete(); + wp_db = dynamic_cast(db); + ASSERT_TRUE(wp_db->prepared_txns_.empty()); + ASSERT_TRUE(wp_db->delayed_prepared_empty_); + + // Check the value is still committed after recovery + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); + ASSERT_TRUE(s.ok()); + ASSERT_TRUE(pinnable_val == ("bar0" + istr0)); + pinnable_val.Reset(); +} + +// After recovery the new transactions should still conflict with recovered +// transactions. +TEST_P(WritePreparedTransactionTest, ConflictDetectionAfterRecoveryTest) { + options.disable_auto_compactions = true; + ReOpen(); + + TransactionOptions txn_options; + WriteOptions write_options; + size_t index = 0; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + auto istr0 = std::to_string(index); + auto s = txn0->SetName("xid" + istr0); + ASSERT_OK(s); + s = txn0->Put(Slice("key" + istr0), Slice("bar0" + istr0)); + ASSERT_OK(s); + s = txn0->Prepare(); + + // With the same index 0 and key prefix, txn_t0 should conflict with txn0 + txn_t0_with_status(0, Status::TimedOut()); + delete txn0; + + auto db_impl = reinterpret_cast(db->GetRootDB()); + db_impl->FlushWAL(true); + ReOpenNoDelete(); + + // It should still conflict after the recovery + txn_t0_with_status(0, Status::TimedOut()); + + db_impl = reinterpret_cast(db->GetRootDB()); + db_impl->FlushWAL(true); + ReOpenNoDelete(); + + // Check that a recovered txn will still cause conflicts after 2nd recovery + txn_t0_with_status(0, Status::TimedOut()); + + txn0 = db->GetTransactionByName("xid" + istr0); + ASSERT_NE(txn0, nullptr); + txn0->Commit(); + delete txn0; + + db_impl = reinterpret_cast(db->GetRootDB()); + db_impl->FlushWAL(true); + ReOpenNoDelete(); + + // tnx0 is now committed and should no longer cause a conflict + txn_t0_with_status(0, Status::OK()); +} + +// After recovery the commit map is empty while the max is set. The code would +// go through a different path which requires a separate test. +TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) { + WritePreparedTxnDB* wp_db = dynamic_cast(db); + wp_db->max_evicted_seq_ = 100; + ASSERT_FALSE(wp_db->IsInSnapshot(50, 40)); + ASSERT_TRUE(wp_db->IsInSnapshot(50, 50)); + ASSERT_TRUE(wp_db->IsInSnapshot(50, 100)); + ASSERT_TRUE(wp_db->IsInSnapshot(50, 150)); + ASSERT_FALSE(wp_db->IsInSnapshot(100, 80)); + ASSERT_TRUE(wp_db->IsInSnapshot(100, 100)); + ASSERT_TRUE(wp_db->IsInSnapshot(100, 150)); } // Test WritePreparedTxnDB's IsInSnapshot against different ordering of @@ -683,6 +1030,8 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { // We keep the list of txns comitted before we take the last snaphot. // These should be the only seq numbers that will be found in the snapshot std::set committed_before; + // The set of commit seq numbers to be excluded from IsInSnapshot queries + std::set commit_seqs; DBImpl* mock_db = new DBImpl(options, dbname); std::unique_ptr wp_db(new WritePreparedTxnDBMock( mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits)); @@ -701,6 +1050,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { } else { // else commit it seq++; wp_db->AddCommitted(cur_txn, seq); + commit_seqs.insert(seq); if (!snapshot) { committed_before.insert(cur_txn); } @@ -725,7 +1075,8 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { // it at each cycle to test that the system is still sound when // max_evicted_seq_ advances. if (snapshot) { - for (uint64_t s = 0; s <= seq; s++) { + for (uint64_t s = 1; + s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) { bool was_committed = (committed_before.find(s) != committed_before.end()); bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 78671aad8..4b2e918e4 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -52,8 +52,9 @@ Status WritePreparedTxn::PrepareInternal() { /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !disable_memtable, &seq_used); assert(seq_used != kMaxSequenceNumber); - prepare_seq_ = seq_used; - wpt_db_->AddPrepared(prepare_seq_); + auto prepare_seq = seq_used; + SetId(prepare_seq); + wpt_db_->AddPrepared(prepare_seq); return s; } @@ -66,9 +67,10 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { WriteBatchInternal::InsertNoop(batch); const bool disable_memtable = true; const uint64_t no_log_ref = 0; - uint64_t seq_used; + uint64_t seq_used = kMaxSequenceNumber; auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr, no_log_ref, !disable_memtable, &seq_used); + assert(seq_used != kMaxSequenceNumber); uint64_t& prepare_seq = seq_used; uint64_t& commit_seq = seq_used; // TODO(myabandeh): skip AddPrepared @@ -90,13 +92,19 @@ Status WritePreparedTxn::CommitInternal() { working_batch->MarkWalTerminationPoint(); const bool disable_memtable = true; - uint64_t seq_used; + uint64_t seq_used = kMaxSequenceNumber; + // Since the prepared batch is directly written to memtable, there is already + // a connection between the memtable and its WAL, so there is no need to + // redundantly reference the log that contains the prepared data. + const uint64_t zero_log_number = 0ull; auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, - log_number_, disable_memtable, &seq_used); + zero_log_number, disable_memtable, &seq_used); + assert(seq_used != kMaxSequenceNumber); uint64_t& commit_seq = seq_used; // TODO(myabandeh): Reject a commit request if AddCommitted cannot encode // commit_seq. This happens if prep_seq <<< commit_seq. - wpt_db_->AddCommitted(prepare_seq_, commit_seq); + auto prepare_seq = GetId(); + wpt_db_->AddCommitted(prepare_seq, commit_seq); return s; } diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 14d7dcc25..131a27575 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -53,6 +53,8 @@ class WritePreparedTxn : public PessimisticTransaction { Status Rollback() override; private: + friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; + Status PrepareInternal() override; Status CommitWithoutPrepareInternal() override; @@ -73,7 +75,6 @@ class WritePreparedTxn : public PessimisticTransaction { void operator=(const WritePreparedTxn&); WritePreparedTxnDB* wpt_db_; - uint64_t prepare_seq_; }; } // namespace rocksdb