From 17731a43a6e6a212097c1d83392f81d310ffe2fa Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Wed, 1 Nov 2017 17:23:52 -0700 Subject: [PATCH] WritePrepared Txn: Optimize for recoverable state Summary: GetCommitTimeWriteBatch is currently used to store some state as part of commit in 2PC. In MyRocks it is specifically used to store some data that would be needed only during recovery. So it is not need to be stored in memtable right after each commit. This patch enables an optimization to write the GetCommitTimeWriteBatch only to the WAL. The batch will be written to memtable during recovery when the WAL is replayed. To cover the case when WAL is deleted after memtable flush, the batch is also buffered and written to memtable right before each memtable flush. Closes https://github.com/facebook/rocksdb/pull/3071 Differential Revision: D6148023 Pulled By: maysamyabandeh fbshipit-source-id: 2d09bae5565abe2017c0327421010d5c0d55eaa7 --- db/db_impl.h | 15 +- db/db_impl_compaction_flush.cc | 6 +- db/db_impl_write.cc | 66 ++++- db/write_batch.cc | 8 + db/write_batch_internal.h | 5 + include/rocksdb/utilities/transaction_db.h | 6 + include/rocksdb/write_batch.h | 6 + options/options_parser.h | 1 - options/options_settable_test.cc | 5 +- .../transactions/pessimistic_transaction.cc | 2 + .../transactions/pessimistic_transaction.h | 4 + .../transactions/pessimistic_transaction_db.h | 7 +- utilities/transactions/transaction_base.h | 3 +- utilities/transactions/transaction_test.cc | 269 ++++++++++-------- utilities/transactions/write_prepared_txn.cc | 13 +- 15 files changed, 281 insertions(+), 135 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index 462b8c9b9..24d4f8c86 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -669,6 +669,9 @@ class DBImpl : public DB { uint64_t FindMinLogContainingOutstandingPrep(); uint64_t FindMinPrepLogReferencedByMemTable(); + // write cached_recoverable_state_ to memtable if it is not empty + // The writer must be the leader in write_thread_ and holding mutex_ + Status WriteRecoverableState(); private: friend class DB; @@ -800,7 +803,8 @@ class DBImpl : public DB { WriteContext* write_context); WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group, - WriteBatch* tmp_batch, size_t* write_with_wal); + WriteBatch* tmp_batch, size_t* write_with_wal, + WriteBatch** to_be_cached_state); Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size); @@ -990,6 +994,15 @@ class DBImpl : public DB { std::deque logs_; // Signaled when getting_synced becomes false for some of the logs_. InstrumentedCondVar log_sync_cv_; + // This is the app-level state that is written to the WAL but will be used + // only during recovery. Using this feature enables not writing the state to + // memtable on normal writes and hence improving the throughput. Each new + // write of the state will replace the previous state entirely even if the + // keys in the two consecuitive states do not overlap. + // It is protected by log_write_mutex_ when concurrent_prepare_ is enabled. + // Otherwise only the heaad of write_thread_ can access it. + WriteBatch cached_recoverable_state_; + std::atomic cached_recoverable_state_empty_ = {true}; std::atomic total_log_size_; // only used for dynamically adjusting max_total_wal_size. it is a sum of // [write_buffer_size * max_write_buffer_number] over all column families diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 4436d3acf..6dc28f969 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -947,7 +947,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); - if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty()) { + if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() && + cached_recoverable_state_empty_.load()) { // Nothing to flush return Status::OK(); } @@ -957,8 +958,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, write_thread_.EnterUnbatched(&w, &mutex_); } - // SwitchMemtable() will release and reacquire mutex - // during execution + // SwitchMemtable() will release and reacquire mutex during execution s = SwitchMemtable(cfd, &context); if (!writes_stopped) { diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 55801e827..67ebc08fa 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -75,6 +75,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::NotSupported( "pipelined_writes is not compatible with seq_per_batch"); } + // Otherwise IsLatestPersistentState optimization does not make sense + assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) || + disable_memtable); Status status; if (write_options.low_pri) { @@ -678,9 +681,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, } WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, - WriteBatch* tmp_batch, size_t* write_with_wal) { + WriteBatch* tmp_batch, size_t* write_with_wal, + WriteBatch** to_be_cached_state) { assert(write_with_wal != nullptr); assert(tmp_batch != nullptr); + assert(*to_be_cached_state == nullptr); WriteBatch* merged_batch = nullptr; *write_with_wal = 0; auto* leader = write_group.leader; @@ -690,6 +695,9 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, // contains one batch, that batch should be written to the WAL, // and the batch is not wanting to be truncated merged_batch = leader->batch; + if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) { + *to_be_cached_state = merged_batch; + } *write_with_wal = 1; } else { // WAL needs all of the batches flattened into a single batch. @@ -700,6 +708,10 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, if (writer->ShouldWriteToWAL()) { WriteBatchInternal::Append(merged_batch, writer->batch, /*WAL_only*/ true); + if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) { + // We only need to cache the last of such write batch + *to_be_cached_state = writer->batch; + } (*write_with_wal)++; } } @@ -734,8 +746,9 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, Status status; size_t write_with_wal = 0; - WriteBatch* merged_batch = - MergeBatch(write_group, &tmp_batch_, &write_with_wal); + WriteBatch* to_be_cached_state = nullptr; + WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_, + &write_with_wal, &to_be_cached_state); if (merged_batch == write_group.leader->batch) { write_group.leader->log_used = logfile_number_; } else if (write_with_wal > 1) { @@ -748,6 +761,10 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, uint64_t log_size; status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + if (to_be_cached_state) { + cached_recoverable_state_ = *to_be_cached_state; + cached_recoverable_state_empty_ = false; + } if (status.ok() && need_log_sync) { StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); @@ -797,8 +814,9 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, WriteBatch tmp_batch; size_t write_with_wal = 0; + WriteBatch* to_be_cached_state = nullptr; WriteBatch* merged_batch = - MergeBatch(write_group, &tmp_batch, &write_with_wal); + MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state); // We need to lock log_write_mutex_ since logs_ and alive_log_files might be // pushed back concurrently @@ -817,6 +835,10 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer = logs_.back().writer; uint64_t log_size; status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + if (to_be_cached_state) { + cached_recoverable_state_ = *to_be_cached_state; + cached_recoverable_state_empty_ = false; + } log_write_mutex_.Unlock(); if (status.ok()) { @@ -831,6 +853,34 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, return status; } +Status DBImpl::WriteRecoverableState() { + mutex_.AssertHeld(); + if (!cached_recoverable_state_empty_) { + bool dont_care_bool; + SequenceNumber next_seq; + if (concurrent_prepare_) { + log_write_mutex_.Lock(); + } + SequenceNumber seq = versions_->LastSequence(); + WriteBatchInternal::SetSequence(&cached_recoverable_state_, ++seq); + auto status = WriteBatchInternal::InsertInto( + &cached_recoverable_state_, column_family_memtables_.get(), + &flush_scheduler_, true, 0 /*recovery_log_number*/, this, + false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool, + seq_per_batch_); + versions_->SetLastSequence(--next_seq); + if (concurrent_prepare_) { + log_write_mutex_.Unlock(); + } + if (status.ok()) { + cached_recoverable_state_.Clear(); + cached_recoverable_state_empty_ = true; + } + return status; + } + return Status::OK(); +} + Status DBImpl::SwitchWAL(WriteContext* write_context) { mutex_.AssertHeld(); assert(write_context != nullptr); @@ -1069,6 +1119,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log::Writer* new_log = nullptr; MemTable* new_mem = nullptr; + // Recoverable state is persisted in WAL. After memtable switch, WAL might + // be deleted, so we write the state to memtable to be persisted as well. + Status s = WriteRecoverableState(); + if (!s.ok()) { + return s; + } + // In case of pipelined write is enabled, wait for all pending memtable // writers. if (immutable_db_options_.enable_pipelined_write) { @@ -1112,7 +1169,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { const auto preallocate_block_size = GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); mutex_.Unlock(); - Status s; { if (creating_new_log) { EnvOptions opt_env_opt = diff --git a/db/write_batch.cc b/db/write_batch.cc index 18475e944..76203ea1d 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -494,6 +494,14 @@ Status WriteBatch::Iterate(Handler* handler) const { } } +bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) { + return b->is_latest_persistent_state_; +} + +void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) { + b->is_latest_persistent_state_ = true; +} + int WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 7c2f42d49..3cca5435d 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -189,6 +189,11 @@ class WriteBatchInternal { // Returns the byte size of appending a WriteBatch with ByteSize // leftByteSize and a WriteBatch with ByteSize rightByteSize static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); + + // This write batch includes the latest state that should be persisted. Such + // state meant to be used only during recovery. + static void SetAsLastestPersistentState(WriteBatch* b); + static bool IsLatestPersistentState(const WriteBatch* b); }; // LocalSavePoint is similar to a scope guard diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 77043897a..b5a33d18b 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -97,6 +97,12 @@ struct TransactionOptions { // Status::Busy. The user should retry their transaction. bool deadlock_detect = false; + // If set, it states that the CommitTimeWriteBatch represents the latest state + // of the application and meant to be used later during recovery. It enables + // an optimization to postpone updating the memtable with CommitTimeWriteBatch + // to only SwithcMamtable or recovery. + bool use_only_the_last_commit_time_batch_for_recovery = false; + // TODO(agiardullo): TransactionDB does not yet support comparators that allow // two non-equal keys to be equivalent. Ie, cmp->Compare(a,b) should only // return 0 if diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 41f491b72..b612dd442 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -347,6 +347,12 @@ class WriteBatch : public WriteBatchBase { // Maximum size of rep_. size_t max_bytes_; + // Is the content of the batch the application's latest state that meant only + // to be used for recovery? Refer to + // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery for + // more details. + bool is_latest_persistent_state_ = false; + protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ diff --git a/options/options_parser.h b/options/options_parser.h index 5545c0b0f..5aab3e7e9 100644 --- a/options/options_parser.h +++ b/options/options_parser.h @@ -9,7 +9,6 @@ #include #include -#include "options/options_helper.h" #include "options/options_sanity_check.h" #include "rocksdb/env.h" #include "rocksdb/options.h" diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 33e9d4c18..58d0d8a4e 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -13,7 +13,7 @@ #include -#include "options/options_parser.h" +#include "options/options_helper.h" #include "rocksdb/convenience.h" #include "util/testharness.h" @@ -427,7 +427,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "hard_pending_compaction_bytes_limit=0;" "disable_auto_compactions=false;" "report_bg_io_stats=true;" - "compaction_options_fifo={max_table_files_size=3;ttl=100;allow_compaction=false;};", + "compaction_options_fifo={max_table_files_size=3;ttl=100;allow_" + "compaction=false;};", new_options)); ASSERT_EQ(unset_bytes_base, diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index c2bd61379..4a2d4d84b 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -82,6 +82,8 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { if (expiration_time_ > 0) { txn_db_impl_->InsertExpirableTransaction(txn_id_, this); } + use_only_the_last_commit_time_batch_for_recovery_ = + txn_options.use_only_the_last_commit_time_batch_for_recovery; } PessimisticTransaction::~PessimisticTransaction() { diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 95045c04b..be7487a83 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -113,6 +113,10 @@ class PessimisticTransaction : public TransactionBaseImpl { int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } protected: + // Refer to + // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery + bool use_only_the_last_commit_time_batch_for_recovery_ = false; + virtual Status PrepareInternal() = 0; virtual Status CommitWithoutPrepareInternal() = 0; diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 0031fc3ae..312bd9efc 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -114,16 +114,17 @@ class PessimisticTransactionDB : public TransactionDB { void SetDeadlockInfoBufferSize(uint32_t target_size) override; protected: + DBImpl* db_impl_; + std::shared_ptr info_log_; + const TransactionDBOptions txn_db_options_; + void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options = TransactionOptions()); - DBImpl* db_impl_; - std::shared_ptr info_log_; private: friend class WritePreparedTxnDB; friend class WritePreparedTxnDBMock; - const TransactionDBOptions txn_db_options_; TransactionLockMgr lock_mgr_; // Must be held when adding/dropping column families. diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 6bdb9ffe4..335a756a1 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -303,7 +303,8 @@ class TransactionBaseImpl : public Transaction { WriteBatchWithIndex write_batch_; private: - // batch to be written at commit time + // Extra data to be persisted with the commit. Note this is only used when + // prepare phase is not skipped. WriteBatch commit_time_batch_; // Stack of the Snapshot saved at each save point. Saved snapshots may be diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index de3199db8..9c7d08641 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -43,13 +43,10 @@ INSTANTIATE_TEST_CASE_P( DBAsBaseDB, TransactionTest, ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), std::make_tuple(false, true, WRITE_COMMITTED), - std::make_tuple(false, false, WRITE_PREPARED), std::make_tuple(false, true, WRITE_PREPARED))); INSTANTIATE_TEST_CASE_P( StackableDBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(true, false, WRITE_COMMITTED), - std::make_tuple(true, true, WRITE_COMMITTED), - std::make_tuple(true, false, WRITE_PREPARED), + ::testing::Values(std::make_tuple(true, true, WRITE_COMMITTED), std::make_tuple(true, true, WRITE_PREPARED))); INSTANTIATE_TEST_CASE_P( MySQLStyleTransactionTest, MySQLStyleTransactionTest, @@ -707,112 +704,131 @@ TEST_P(TransactionTest, CommitTimeBatchFailTest) { } TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { - WriteOptions write_options; - ReadOptions read_options; + for (bool cwb4recovery : {true, false}) { + ReOpen(); + WriteOptions write_options; + ReadOptions read_options; - TransactionOptions txn_options; + TransactionOptions txn_options; + txn_options.use_only_the_last_commit_time_batch_for_recovery = cwb4recovery; - string value; - Status s; + string value; + Status s; - DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); - Transaction* txn = db->BeginTransaction(write_options, txn_options); - s = txn->SetName("xid"); - ASSERT_OK(s); + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); - ASSERT_EQ(db->GetTransactionByName("xid"), txn); + ASSERT_EQ(db->GetTransactionByName("xid"), txn); - // transaction put - s = txn->Put(Slice("foo"), Slice("bar")); - ASSERT_OK(s); - ASSERT_EQ(1, txn->GetNumPuts()); + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); - // regular db put - s = db->Put(write_options, Slice("foo2"), Slice("bar2")); - ASSERT_OK(s); - ASSERT_EQ(1, txn->GetNumPuts()); + // regular db put + s = db->Put(write_options, Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); - // regular db read - db->Get(read_options, "foo2", &value); - ASSERT_EQ(value, "bar2"); + // regular db read + db->Get(read_options, "foo2", &value); + ASSERT_EQ(value, "bar2"); - // commit time put - txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")); - txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")); + // commit time put + txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")); + txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")); - // nothing has been prepped yet - ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + // nothing has been prepped yet + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); - s = txn->Prepare(); - ASSERT_OK(s); + s = txn->Prepare(); + ASSERT_OK(s); - // data not im mem yet - s = db->Get(read_options, Slice("foo"), &value); - ASSERT_TRUE(s.IsNotFound()); - s = db->Get(read_options, Slice("gtid"), &value); - ASSERT_TRUE(s.IsNotFound()); + // data not im mem yet + s = db->Get(read_options, Slice("foo"), &value); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(read_options, Slice("gtid"), &value); + ASSERT_TRUE(s.IsNotFound()); - // find trans in list of prepared transactions - std::vector prepared_trans; - db->GetAllPreparedTransactions(&prepared_trans); - ASSERT_EQ(prepared_trans.size(), 1); - ASSERT_EQ(prepared_trans.front()->GetName(), "xid"); + // find trans in list of prepared transactions + std::vector prepared_trans; + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 1); + ASSERT_EQ(prepared_trans.front()->GetName(), "xid"); - auto log_containing_prep = - db_impl->TEST_FindMinLogContainingOutstandingPrep(); - ASSERT_GT(log_containing_prep, 0); + auto log_containing_prep = + db_impl->TEST_FindMinLogContainingOutstandingPrep(); + ASSERT_GT(log_containing_prep, 0); - // make commit - s = txn->Commit(); - ASSERT_OK(s); + // make commit + s = txn->Commit(); + ASSERT_OK(s); - // value is now available - s = db->Get(read_options, "foo", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "bar"); + // value is now available + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); - s = db->Get(read_options, "gtid", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "dogs"); + if (!cwb4recovery) { + s = db->Get(read_options, "gtid", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs"); - s = db->Get(read_options, "gtid2", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "cats"); + s = db->Get(read_options, "gtid2", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "cats"); + } - // we already committed - s = txn->Commit(); - ASSERT_EQ(s, Status::InvalidArgument()); + // we already committed + s = txn->Commit(); + ASSERT_EQ(s, Status::InvalidArgument()); - // no longer is prpared results - db->GetAllPreparedTransactions(&prepared_trans); - ASSERT_EQ(prepared_trans.size(), 0); - ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); + // no longer is prpared results + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 0); + ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); - // heap should not care about prepared section anymore - ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + // heap should not care about prepared section anymore + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); - switch (txn_db_options.write_policy) { - case WRITE_COMMITTED: - // but now our memtable should be referencing the prep section - ASSERT_EQ(log_containing_prep, - db_impl->TEST_FindMinPrepLogReferencedByMemTable()); - break; - case WRITE_PREPARED: - case WRITE_UNPREPARED: - // In these modes memtable do not ref the prep sections - ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); - break; - default: - assert(false); + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // but now our memtable should be referencing the prep section + ASSERT_EQ(log_containing_prep, + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } + + db_impl->TEST_FlushMemTable(true); + + // after memtable flush we can now relese the log + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + + delete txn; + + if (cwb4recovery) { + // kill and reopen to trigger recovery + s = ReOpenNoDelete(); + ASSERT_OK(s); + s = db->Get(read_options, "gtid", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs"); + + s = db->Get(read_options, "gtid2", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "cats"); + } } - - db_impl->TEST_FlushMemTable(true); - - // after memtable flush we can now relese the log - ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); - - delete txn; } TEST_P(TransactionTest, TwoPhaseNameTest) { @@ -873,44 +889,67 @@ TEST_P(TransactionTest, TwoPhaseNameTest) { } TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) { - Status s; - std::string value; + for (bool cwb4recovery : {true, false}) { + for (bool test_with_empty_wal : {true, false}) { + if (!cwb4recovery && test_with_empty_wal) { + continue; + } + ReOpen(); + Status s; + std::string value; - WriteOptions write_options; - ReadOptions read_options; - TransactionOptions txn_options; - Transaction* txn1 = db->BeginTransaction(write_options, txn_options); - ASSERT_TRUE(txn1); - Transaction* txn2 = db->BeginTransaction(write_options, txn_options); - ASSERT_TRUE(txn2); + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + txn_options.use_only_the_last_commit_time_batch_for_recovery = + cwb4recovery; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); - s = txn1->SetName("joe"); - ASSERT_OK(s); + s = txn1->SetName("joe"); + ASSERT_OK(s); - s = txn2->SetName("bob"); - ASSERT_OK(s); + s = txn2->SetName("bob"); + ASSERT_OK(s); - s = txn1->Prepare(); - ASSERT_OK(s); + s = txn1->Prepare(); + ASSERT_OK(s); - s = txn1->Commit(); - ASSERT_OK(s); + s = txn1->Commit(); + ASSERT_OK(s); - delete txn1; + delete txn1; - txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")); + txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")); - s = txn2->Prepare(); - ASSERT_OK(s); + s = txn2->Prepare(); + ASSERT_OK(s); - s = txn2->Commit(); - ASSERT_OK(s); + s = txn2->Commit(); + ASSERT_OK(s); - s = db->Get(read_options, "foo", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "bar"); - - delete txn2; + delete txn2; + if (!cwb4recovery) { + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + } else { + if (test_with_empty_wal) { + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + db_impl->TEST_FlushMemTable(true); + } + db->FlushWAL(true); + // kill and reopen to trigger recovery + s = ReOpenNoDelete(); + ASSERT_OK(s); + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + } + } + } } TEST_P(TransactionTest, TwoPhaseExpirationTest) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 16499cc33..8ececbcb1 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -122,8 +122,13 @@ Status WritePreparedTxn::CommitInternal() { const bool empty = working_batch->Count() == 0; WriteBatchInternal::MarkCommit(working_batch, name_); - // any operations appended to this working_batch will be ignored from WAL - working_batch->MarkWalTerminationPoint(); + const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; + if (!empty && for_recovery) { + // When not writing to memtable, we can still cache the latest write batch. + // The cached batch will be written to memtable in WriteRecoverableState + // during FlushMemTable + WriteBatchInternal::SetAsLastestPersistentState(working_batch); + } const bool disable_memtable = true; uint64_t seq_used = kMaxSequenceNumber; @@ -133,14 +138,14 @@ Status WritePreparedTxn::CommitInternal() { const uint64_t zero_log_number = 0ull; auto s = db_impl_->WriteImpl( write_options_, working_batch, nullptr, nullptr, zero_log_number, - empty ? disable_memtable : !disable_memtable, &seq_used); + empty || for_recovery ? disable_memtable : !disable_memtable, &seq_used); assert(seq_used != kMaxSequenceNumber); uint64_t& commit_seq = seq_used; // TODO(myabandeh): Reject a commit request if AddCommitted cannot encode // commit_seq. This happens if prep_seq <<< commit_seq. auto prepare_seq = GetId(); wpt_db_->AddCommitted(prepare_seq, commit_seq); - if (!empty) { + if (!empty && !for_recovery) { // Commit the data that is accompnaied with the commit marker // TODO(myabandeh): skip AddPrepared wpt_db_->AddPrepared(commit_seq);