WritePrepared Txn: Optimize for recoverable state

Summary:
GetCommitTimeWriteBatch is currently used to store some state as part of commit in 2PC. In MyRocks it is specifically used to store some data that would be needed only during recovery. So it is not need to be stored in memtable right after each commit.
This patch enables an optimization to write the GetCommitTimeWriteBatch only to the WAL. The batch will be written to memtable during recovery when the WAL is replayed. To cover the case when WAL is deleted after memtable flush, the batch is also buffered and written to memtable right before each memtable flush.
Closes https://github.com/facebook/rocksdb/pull/3071

Differential Revision: D6148023

Pulled By: maysamyabandeh

fbshipit-source-id: 2d09bae5565abe2017c0327421010d5c0d55eaa7
This commit is contained in:
Maysam Yabandeh 2017-11-01 17:23:52 -07:00 committed by Facebook Github Bot
parent c1cf94c787
commit 17731a43a6
15 changed files with 281 additions and 135 deletions

View File

@ -669,6 +669,9 @@ class DBImpl : public DB {
uint64_t FindMinLogContainingOutstandingPrep();
uint64_t FindMinPrepLogReferencedByMemTable();
// write cached_recoverable_state_ to memtable if it is not empty
// The writer must be the leader in write_thread_ and holding mutex_
Status WriteRecoverableState();
private:
friend class DB;
@ -800,7 +803,8 @@ class DBImpl : public DB {
WriteContext* write_context);
WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group,
WriteBatch* tmp_batch, size_t* write_with_wal);
WriteBatch* tmp_batch, size_t* write_with_wal,
WriteBatch** to_be_cached_state);
Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size);
@ -990,6 +994,15 @@ class DBImpl : public DB {
std::deque<LogWriterNumber> logs_;
// Signaled when getting_synced becomes false for some of the logs_.
InstrumentedCondVar log_sync_cv_;
// This is the app-level state that is written to the WAL but will be used
// only during recovery. Using this feature enables not writing the state to
// memtable on normal writes and hence improving the throughput. Each new
// write of the state will replace the previous state entirely even if the
// keys in the two consecuitive states do not overlap.
// It is protected by log_write_mutex_ when concurrent_prepare_ is enabled.
// Otherwise only the heaad of write_thread_ can access it.
WriteBatch cached_recoverable_state_;
std::atomic<bool> cached_recoverable_state_empty_ = {true};
std::atomic<uint64_t> total_log_size_;
// only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families

View File

@ -947,7 +947,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty()) {
if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() &&
cached_recoverable_state_empty_.load()) {
// Nothing to flush
return Status::OK();
}
@ -957,8 +958,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
write_thread_.EnterUnbatched(&w, &mutex_);
}
// SwitchMemtable() will release and reacquire mutex
// during execution
// SwitchMemtable() will release and reacquire mutex during execution
s = SwitchMemtable(cfd, &context);
if (!writes_stopped) {

View File

@ -75,6 +75,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::NotSupported(
"pipelined_writes is not compatible with seq_per_batch");
}
// Otherwise IsLatestPersistentState optimization does not make sense
assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
disable_memtable);
Status status;
if (write_options.low_pri) {
@ -678,9 +681,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
}
WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
WriteBatch* tmp_batch, size_t* write_with_wal) {
WriteBatch* tmp_batch, size_t* write_with_wal,
WriteBatch** to_be_cached_state) {
assert(write_with_wal != nullptr);
assert(tmp_batch != nullptr);
assert(*to_be_cached_state == nullptr);
WriteBatch* merged_batch = nullptr;
*write_with_wal = 0;
auto* leader = write_group.leader;
@ -690,6 +695,9 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
merged_batch = leader->batch;
if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
*to_be_cached_state = merged_batch;
}
*write_with_wal = 1;
} else {
// WAL needs all of the batches flattened into a single batch.
@ -700,6 +708,10 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
if (writer->ShouldWriteToWAL()) {
WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
// We only need to cache the last of such write batch
*to_be_cached_state = writer->batch;
}
(*write_with_wal)++;
}
}
@ -734,8 +746,9 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
Status status;
size_t write_with_wal = 0;
WriteBatch* merged_batch =
MergeBatch(write_group, &tmp_batch_, &write_with_wal);
WriteBatch* to_be_cached_state = nullptr;
WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
&write_with_wal, &to_be_cached_state);
if (merged_batch == write_group.leader->batch) {
write_group.leader->log_used = logfile_number_;
} else if (write_with_wal > 1) {
@ -748,6 +761,10 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
uint64_t log_size;
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
}
if (status.ok() && need_log_sync) {
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
@ -797,8 +814,9 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
WriteBatch tmp_batch;
size_t write_with_wal = 0;
WriteBatch* to_be_cached_state = nullptr;
WriteBatch* merged_batch =
MergeBatch(write_group, &tmp_batch, &write_with_wal);
MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state);
// We need to lock log_write_mutex_ since logs_ and alive_log_files might be
// pushed back concurrently
@ -817,6 +835,10 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer = logs_.back().writer;
uint64_t log_size;
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
}
log_write_mutex_.Unlock();
if (status.ok()) {
@ -831,6 +853,34 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
return status;
}
Status DBImpl::WriteRecoverableState() {
mutex_.AssertHeld();
if (!cached_recoverable_state_empty_) {
bool dont_care_bool;
SequenceNumber next_seq;
if (concurrent_prepare_) {
log_write_mutex_.Lock();
}
SequenceNumber seq = versions_->LastSequence();
WriteBatchInternal::SetSequence(&cached_recoverable_state_, ++seq);
auto status = WriteBatchInternal::InsertInto(
&cached_recoverable_state_, column_family_memtables_.get(),
&flush_scheduler_, true, 0 /*recovery_log_number*/, this,
false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool,
seq_per_batch_);
versions_->SetLastSequence(--next_seq);
if (concurrent_prepare_) {
log_write_mutex_.Unlock();
}
if (status.ok()) {
cached_recoverable_state_.Clear();
cached_recoverable_state_empty_ = true;
}
return status;
}
return Status::OK();
}
Status DBImpl::SwitchWAL(WriteContext* write_context) {
mutex_.AssertHeld();
assert(write_context != nullptr);
@ -1069,6 +1119,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr;
// Recoverable state is persisted in WAL. After memtable switch, WAL might
// be deleted, so we write the state to memtable to be persisted as well.
Status s = WriteRecoverableState();
if (!s.ok()) {
return s;
}
// In case of pipelined write is enabled, wait for all pending memtable
// writers.
if (immutable_db_options_.enable_pipelined_write) {
@ -1112,7 +1169,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
const auto preallocate_block_size =
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
mutex_.Unlock();
Status s;
{
if (creating_new_log) {
EnvOptions opt_env_opt =

View File

@ -494,6 +494,14 @@ Status WriteBatch::Iterate(Handler* handler) const {
}
}
bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
return b->is_latest_persistent_state_;
}
void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
b->is_latest_persistent_state_ = true;
}
int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}

View File

@ -189,6 +189,11 @@ class WriteBatchInternal {
// Returns the byte size of appending a WriteBatch with ByteSize
// leftByteSize and a WriteBatch with ByteSize rightByteSize
static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
// This write batch includes the latest state that should be persisted. Such
// state meant to be used only during recovery.
static void SetAsLastestPersistentState(WriteBatch* b);
static bool IsLatestPersistentState(const WriteBatch* b);
};
// LocalSavePoint is similar to a scope guard

View File

@ -97,6 +97,12 @@ struct TransactionOptions {
// Status::Busy. The user should retry their transaction.
bool deadlock_detect = false;
// If set, it states that the CommitTimeWriteBatch represents the latest state
// of the application and meant to be used later during recovery. It enables
// an optimization to postpone updating the memtable with CommitTimeWriteBatch
// to only SwithcMamtable or recovery.
bool use_only_the_last_commit_time_batch_for_recovery = false;
// TODO(agiardullo): TransactionDB does not yet support comparators that allow
// two non-equal keys to be equivalent. Ie, cmp->Compare(a,b) should only
// return 0 if

View File

@ -347,6 +347,12 @@ class WriteBatch : public WriteBatchBase {
// Maximum size of rep_.
size_t max_bytes_;
// Is the content of the batch the application's latest state that meant only
// to be used for recovery? Refer to
// TransactionOptions::use_only_the_last_commit_time_batch_for_recovery for
// more details.
bool is_latest_persistent_state_ = false;
protected:
std::string rep_; // See comment in write_batch.cc for the format of rep_

View File

@ -9,7 +9,6 @@
#include <string>
#include <vector>
#include "options/options_helper.h"
#include "options/options_sanity_check.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"

View File

@ -13,7 +13,7 @@
#include <cstring>
#include "options/options_parser.h"
#include "options/options_helper.h"
#include "rocksdb/convenience.h"
#include "util/testharness.h"
@ -427,7 +427,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"hard_pending_compaction_bytes_limit=0;"
"disable_auto_compactions=false;"
"report_bg_io_stats=true;"
"compaction_options_fifo={max_table_files_size=3;ttl=100;allow_compaction=false;};",
"compaction_options_fifo={max_table_files_size=3;ttl=100;allow_"
"compaction=false;};",
new_options));
ASSERT_EQ(unset_bytes_base,

View File

@ -82,6 +82,8 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
if (expiration_time_ > 0) {
txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
}
use_only_the_last_commit_time_batch_for_recovery_ =
txn_options.use_only_the_last_commit_time_batch_for_recovery;
}
PessimisticTransaction::~PessimisticTransaction() {

View File

@ -113,6 +113,10 @@ class PessimisticTransaction : public TransactionBaseImpl {
int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; }
protected:
// Refer to
// TransactionOptions::use_only_the_last_commit_time_batch_for_recovery
bool use_only_the_last_commit_time_batch_for_recovery_ = false;
virtual Status PrepareInternal() = 0;
virtual Status CommitWithoutPrepareInternal() = 0;

View File

@ -114,16 +114,17 @@ class PessimisticTransactionDB : public TransactionDB {
void SetDeadlockInfoBufferSize(uint32_t target_size) override;
protected:
DBImpl* db_impl_;
std::shared_ptr<Logger> info_log_;
const TransactionDBOptions txn_db_options_;
void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions());
DBImpl* db_impl_;
std::shared_ptr<Logger> info_log_;
private:
friend class WritePreparedTxnDB;
friend class WritePreparedTxnDBMock;
const TransactionDBOptions txn_db_options_;
TransactionLockMgr lock_mgr_;
// Must be held when adding/dropping column families.

View File

@ -303,7 +303,8 @@ class TransactionBaseImpl : public Transaction {
WriteBatchWithIndex write_batch_;
private:
// batch to be written at commit time
// Extra data to be persisted with the commit. Note this is only used when
// prepare phase is not skipped.
WriteBatch commit_time_batch_;
// Stack of the Snapshot saved at each save point. Saved snapshots may be

View File

@ -43,13 +43,10 @@ INSTANTIATE_TEST_CASE_P(
DBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED),
std::make_tuple(false, true, WRITE_COMMITTED),
std::make_tuple(false, false, WRITE_PREPARED),
std::make_tuple(false, true, WRITE_PREPARED)));
INSTANTIATE_TEST_CASE_P(
StackableDBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(true, false, WRITE_COMMITTED),
std::make_tuple(true, true, WRITE_COMMITTED),
std::make_tuple(true, false, WRITE_PREPARED),
::testing::Values(std::make_tuple(true, true, WRITE_COMMITTED),
std::make_tuple(true, true, WRITE_PREPARED)));
INSTANTIATE_TEST_CASE_P(
MySQLStyleTransactionTest, MySQLStyleTransactionTest,
@ -707,112 +704,131 @@ TEST_P(TransactionTest, CommitTimeBatchFailTest) {
}
TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
WriteOptions write_options;
ReadOptions read_options;
for (bool cwb4recovery : {true, false}) {
ReOpen();
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
TransactionOptions txn_options;
txn_options.use_only_the_last_commit_time_batch_for_recovery = cwb4recovery;
string value;
Status s;
string value;
Status s;
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
Transaction* txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid");
ASSERT_OK(s);
Transaction* txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid");
ASSERT_OK(s);
ASSERT_EQ(db->GetTransactionByName("xid"), txn);
ASSERT_EQ(db->GetTransactionByName("xid"), txn);
// transaction put
s = txn->Put(Slice("foo"), Slice("bar"));
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
// transaction put
s = txn->Put(Slice("foo"), Slice("bar"));
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
// regular db put
s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
// regular db put
s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
// regular db read
db->Get(read_options, "foo2", &value);
ASSERT_EQ(value, "bar2");
// regular db read
db->Get(read_options, "foo2", &value);
ASSERT_EQ(value, "bar2");
// commit time put
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs"));
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats"));
// commit time put
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs"));
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats"));
// nothing has been prepped yet
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
// nothing has been prepped yet
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
s = txn->Prepare();
ASSERT_OK(s);
s = txn->Prepare();
ASSERT_OK(s);
// data not im mem yet
s = db->Get(read_options, Slice("foo"), &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(read_options, Slice("gtid"), &value);
ASSERT_TRUE(s.IsNotFound());
// data not im mem yet
s = db->Get(read_options, Slice("foo"), &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(read_options, Slice("gtid"), &value);
ASSERT_TRUE(s.IsNotFound());
// find trans in list of prepared transactions
std::vector<Transaction*> prepared_trans;
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), 1);
ASSERT_EQ(prepared_trans.front()->GetName(), "xid");
// find trans in list of prepared transactions
std::vector<Transaction*> prepared_trans;
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), 1);
ASSERT_EQ(prepared_trans.front()->GetName(), "xid");
auto log_containing_prep =
db_impl->TEST_FindMinLogContainingOutstandingPrep();
ASSERT_GT(log_containing_prep, 0);
auto log_containing_prep =
db_impl->TEST_FindMinLogContainingOutstandingPrep();
ASSERT_GT(log_containing_prep, 0);
// make commit
s = txn->Commit();
ASSERT_OK(s);
// make commit
s = txn->Commit();
ASSERT_OK(s);
// value is now available
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
// value is now available
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
s = db->Get(read_options, "gtid", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "dogs");
if (!cwb4recovery) {
s = db->Get(read_options, "gtid", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "dogs");
s = db->Get(read_options, "gtid2", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "cats");
s = db->Get(read_options, "gtid2", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "cats");
}
// we already committed
s = txn->Commit();
ASSERT_EQ(s, Status::InvalidArgument());
// we already committed
s = txn->Commit();
ASSERT_EQ(s, Status::InvalidArgument());
// no longer is prpared results
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), 0);
ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
// no longer is prpared results
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), 0);
ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
// heap should not care about prepared section anymore
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
// heap should not care about prepared section anymore
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
// but now our memtable should be referencing the prep section
ASSERT_EQ(log_containing_prep,
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
case WRITE_PREPARED:
case WRITE_UNPREPARED:
// In these modes memtable do not ref the prep sections
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
default:
assert(false);
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
// but now our memtable should be referencing the prep section
ASSERT_EQ(log_containing_prep,
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
case WRITE_PREPARED:
case WRITE_UNPREPARED:
// In these modes memtable do not ref the prep sections
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
default:
assert(false);
}
db_impl->TEST_FlushMemTable(true);
// after memtable flush we can now relese the log
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
delete txn;
if (cwb4recovery) {
// kill and reopen to trigger recovery
s = ReOpenNoDelete();
ASSERT_OK(s);
s = db->Get(read_options, "gtid", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "dogs");
s = db->Get(read_options, "gtid2", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "cats");
}
}
db_impl->TEST_FlushMemTable(true);
// after memtable flush we can now relese the log
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
delete txn;
}
TEST_P(TransactionTest, TwoPhaseNameTest) {
@ -873,44 +889,67 @@ TEST_P(TransactionTest, TwoPhaseNameTest) {
}
TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
Status s;
std::string value;
for (bool cwb4recovery : {true, false}) {
for (bool test_with_empty_wal : {true, false}) {
if (!cwb4recovery && test_with_empty_wal) {
continue;
}
ReOpen();
Status s;
std::string value;
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn1);
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn2);
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
txn_options.use_only_the_last_commit_time_batch_for_recovery =
cwb4recovery;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn1);
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn2);
s = txn1->SetName("joe");
ASSERT_OK(s);
s = txn1->SetName("joe");
ASSERT_OK(s);
s = txn2->SetName("bob");
ASSERT_OK(s);
s = txn2->SetName("bob");
ASSERT_OK(s);
s = txn1->Prepare();
ASSERT_OK(s);
s = txn1->Prepare();
ASSERT_OK(s);
s = txn1->Commit();
ASSERT_OK(s);
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
delete txn1;
txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar"));
txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar"));
s = txn2->Prepare();
ASSERT_OK(s);
s = txn2->Prepare();
ASSERT_OK(s);
s = txn2->Commit();
ASSERT_OK(s);
s = txn2->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
delete txn2;
delete txn2;
if (!cwb4recovery) {
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
} else {
if (test_with_empty_wal) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->TEST_FlushMemTable(true);
}
db->FlushWAL(true);
// kill and reopen to trigger recovery
s = ReOpenNoDelete();
ASSERT_OK(s);
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
}
}
}
}
TEST_P(TransactionTest, TwoPhaseExpirationTest) {

View File

@ -122,8 +122,13 @@ Status WritePreparedTxn::CommitInternal() {
const bool empty = working_batch->Count() == 0;
WriteBatchInternal::MarkCommit(working_batch, name_);
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();
const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
if (!empty && for_recovery) {
// When not writing to memtable, we can still cache the latest write batch.
// The cached batch will be written to memtable in WriteRecoverableState
// during FlushMemTable
WriteBatchInternal::SetAsLastestPersistentState(working_batch);
}
const bool disable_memtable = true;
uint64_t seq_used = kMaxSequenceNumber;
@ -133,14 +138,14 @@ Status WritePreparedTxn::CommitInternal() {
const uint64_t zero_log_number = 0ull;
auto s = db_impl_->WriteImpl(
write_options_, working_batch, nullptr, nullptr, zero_log_number,
empty ? disable_memtable : !disable_memtable, &seq_used);
empty || for_recovery ? disable_memtable : !disable_memtable, &seq_used);
assert(seq_used != kMaxSequenceNumber);
uint64_t& commit_seq = seq_used;
// TODO(myabandeh): Reject a commit request if AddCommitted cannot encode
// commit_seq. This happens if prep_seq <<< commit_seq.
auto prepare_seq = GetId();
wpt_db_->AddCommitted(prepare_seq, commit_seq);
if (!empty) {
if (!empty && !for_recovery) {
// Commit the data that is accompnaied with the commit marker
// TODO(myabandeh): skip AddPrepared
wpt_db_->AddPrepared(commit_seq);