WritePrepared Txn: fix bug with Rollback seq

Summary:
The sequence number was not properly advanced after a rollback marker. The patch extends the existing unit tests to detect the bug and also fixes it.
Closes https://github.com/facebook/rocksdb/pull/3157

Differential Revision: D6304291

Pulled By: maysamyabandeh

fbshipit-source-id: 1b519c44a5371b802da49c9e32bd00087a8da401
This commit is contained in:
Maysam Yabandeh 2017-11-15 08:19:57 -08:00 committed by Facebook Github Bot
parent 175d5d6a9e
commit 53863b76f9
8 changed files with 120 additions and 33 deletions

View File

@ -988,6 +988,16 @@ class MemTableInserter : public WriteBatch::Handler {
virtual bool WriterAfterCommit() const { return write_after_commit_; } virtual bool WriterAfterCommit() const { return write_after_commit_; }
// The batch seq is regularly restarted; In normal mode it is set when
// MemTableInserter is constructed in the write thread and in recovery mode it
// is set when a batch, which is tagged with seq, is read from the WAL.
// Within a sequenced batch, which could be a merge of multiple batches, we
// have two policies to advance the seq: i) seq_per_key (default) and ii)
// seq_per_batch. To implement the latter we need to mark the boundry between
// the individual batches. The approach is this: 1) Use the terminating
// markers to indicate the boundry (kTypeEndPrepareXID, kTypeCommitXID,
// kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absense of a
// natural boundy marker.
void MaybeAdvanceSeq(bool batch_boundry = false) { void MaybeAdvanceSeq(bool batch_boundry = false) {
if (batch_boundry == seq_per_batch_) { if (batch_boundry == seq_per_batch_) {
sequence_++; sequence_++;
@ -1430,6 +1440,9 @@ class MemTableInserter : public WriteBatch::Handler {
// in non recovery we simply ignore this tag // in non recovery we simply ignore this tag
} }
const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry);
return Status::OK(); return Status::OK();
} }

View File

@ -924,6 +924,7 @@ class DB {
// Retrieve the sorted list of all wal files with earliest file first // Retrieve the sorted list of all wal files with earliest file first
virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0; virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0;
// Note: this API is not yet consistent with WritePrepared transactions.
// Sets iter to an iterator that is positioned at a write-batch containing // Sets iter to an iterator that is positioned at a write-batch containing
// seq_number. If the sequence number is non existent, it returns an iterator // seq_number. If the sequence number is non existent, it returns an iterator
// at the first available seq_no after the requested seq_no // at the first available seq_no after the requested seq_no

View File

@ -4837,6 +4837,8 @@ TEST_P(TransactionTest, MemoryLimitTest) {
// necessarily the one acceptable way. If the algorithm is legitimately changed, // necessarily the one acceptable way. If the algorithm is legitimately changed,
// this unit test should be updated as well. // this unit test should be updated as well.
TEST_P(TransactionTest, SeqAdvanceTest) { TEST_P(TransactionTest, SeqAdvanceTest) {
// TODO(myabandeh): must be test with false before new releases
const bool short_test = true;
WriteOptions wopts; WriteOptions wopts;
FlushOptions fopt; FlushOptions fopt;
@ -4846,7 +4848,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
// Do the test with NUM_BRANCHES branches in it. Each run of a test takes some // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
// of the branches. This is the same as counting a binary number where i-th // of the branches. This is the same as counting a binary number where i-th
// bit represents whether we take branch i in the represented by the number. // bit represents whether we take branch i in the represented by the number.
const size_t NUM_BRANCHES = 8; const size_t NUM_BRANCHES = short_test ? 6 : 10;
// Helper function that shows if the branch is to be taken in the run // Helper function that shows if the branch is to be taken in the run
// represented by the number n. // represented by the number n.
auto branch_do = [&](size_t n, size_t* branch) { auto branch_do = [&](size_t n, size_t* branch) {
@ -4869,7 +4871,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
seq = db_impl->TEST_GetLastVisibleSequence(); seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq); ASSERT_EQ(exp_seq, seq);
} }
if (branch_do(n, &branch)) { if (!short_test && branch_do(n, &branch)) {
db_impl->FlushWAL(true); db_impl->FlushWAL(true);
ReOpenNoDelete(); ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
@ -4891,7 +4893,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
seq = db_impl->TEST_GetLastVisibleSequence(); seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq); ASSERT_EQ(exp_seq, seq);
} }
if (branch_do(n, &branch)) { if (!short_test && branch_do(n, &branch)) {
db_impl->FlushWAL(true); db_impl->FlushWAL(true);
ReOpenNoDelete(); ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
@ -4908,7 +4910,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
seq = db_impl->TEST_GetLastVisibleSequence(); seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq); ASSERT_EQ(exp_seq, seq);
} }
if (branch_do(n, &branch)) { if (!short_test && branch_do(n, &branch)) {
db_impl->FlushWAL(true); db_impl->FlushWAL(true);
ReOpenNoDelete(); ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
@ -4916,10 +4918,24 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
ASSERT_EQ(exp_seq, seq); ASSERT_EQ(exp_seq, seq);
} }
txn_t0(0); txn_t4(0);
seq = db_impl->TEST_GetLastVisibleSequence(); seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq); ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
}
if (!short_test && branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
txn_t2(0); txn_t2(0);
seq = db_impl->TEST_GetLastVisibleSequence(); seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq); ASSERT_EQ(exp_seq, seq);
@ -4929,7 +4945,7 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
seq = db_impl->TEST_GetLastVisibleSequence(); seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq); ASSERT_EQ(exp_seq, seq);
} }
if (branch_do(n, &branch)) { if (!short_test && branch_do(n, &branch)) {
db_impl->FlushWAL(true); db_impl->FlushWAL(true);
ReOpenNoDelete(); ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());

View File

@ -236,6 +236,41 @@ class TransactionTest : public ::testing::TestWithParam<
} }
delete txn; delete txn;
}; };
std::function<void(size_t)> txn_t4 = [&](size_t index) {
// A full 2pc txn that also involves a commit marker.
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
auto istr = std::to_string(index);
auto s = txn->SetName("xid" + istr);
ASSERT_OK(s);
s = txn->Put(Slice("foo" + istr), Slice("bar"));
s = txn->Put(Slice("foo2" + istr), Slice("bar2"));
s = txn->Put(Slice("foo3" + istr), Slice("bar3"));
s = txn->Put(Slice("foo4" + istr), Slice("bar4"));
s = txn->Put(Slice("foo5" + istr), Slice("bar5"));
ASSERT_OK(s);
expected_commits++;
s = txn->Prepare();
ASSERT_OK(s);
commit_writes++;
s = txn->Rollback();
ASSERT_OK(s);
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// No seq is consumed for deleting the txn buffer
exp_seq += 0;
} else {
// Consume one seq per batch
exp_seq++;
// Consume one seq per rollback batch
exp_seq++;
if (options.two_write_queues) {
// Consume one seq for rollback commit
exp_seq++;
}
}
delete txn;
};
// Test that we can change write policy after a clean shutdown (which would // Test that we can change write policy after a clean shutdown (which would
// empty the WAL) // empty the WAL)

View File

@ -605,12 +605,13 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) {
FlushOptions fopt; FlushOptions fopt;
// Number of different txn types we use in this test // Number of different txn types we use in this test
const size_t type_cnt = 4; const size_t type_cnt = 5;
// The size of the first write group // The size of the first write group
// TODO(myabandeh): This should be increase for pre-release tests // TODO(myabandeh): This should be increase for pre-release tests
const size_t first_group_size = 2; const size_t first_group_size = 2;
// Total number of txns we run in each test // Total number of txns we run in each test
const size_t txn_cnt = first_group_size * 2; // TODO(myabandeh): This should be increase for pre-release tests
const size_t txn_cnt = first_group_size + 1;
size_t base[txn_cnt + 1] = { size_t base[txn_cnt + 1] = {
1, 1,
@ -675,6 +676,9 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) {
case 3: case 3:
threads.emplace_back(txn_t3, bi); threads.emplace_back(txn_t3, bi);
break; break;
case 4:
threads.emplace_back(txn_t3, bi);
break;
default: default:
assert(false); assert(false);
} }
@ -710,16 +714,30 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) {
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
// The latest seq might be due to a commit without prepare and hence not // The latest seq might be due to a commit without prepare and hence not
// persisted in the WAL. To make the verification of seq after recovery // persisted in the WAL. We need to discount such seqs if they are not
// easier we write in a transaction with prepare which makes the latest seq // continued by any seq consued by a value write.
// to be persisted via the commitmarker. if (options.two_write_queues) {
txn_t3(0); WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
MutexLock l(&wp_db->seq_for_metadata_mutex_);
auto& vec = wp_db->seq_for_metadata;
std::sort(vec.begin(), vec.end());
// going backward discount any last seq consumed for metadata until we see
// a seq that is consumed for actualy key/values.
auto rit = vec.rbegin();
for (; rit != vec.rend(); ++rit) {
if (*rit == exp_seq) {
exp_seq--;
} else {
break;
}
}
}
// Check if recovery preserves the last sequence number // Check if recovery preserves the last sequence number
db_impl->FlushWAL(true); db_impl->FlushWAL(true);
ReOpenNoDelete(); ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber(); seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq); ASSERT_EQ(exp_seq, seq);
// Check if flush preserves the last sequence number // Check if flush preserves the last sequence number
@ -1134,25 +1152,18 @@ TEST_P(WritePreparedTransactionTest, RollbackTest) {
ASSERT_SAME(db, s4, v4, "key4"); ASSERT_SAME(db, s4, v4, "key4");
if (crash) { if (crash) {
// TODO(myabandeh): replace it with true crash (commented lines below) delete txn;
// after compaction PR is landed.
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
auto seq = db_impl->GetLatestSequenceNumber(); db_impl->FlushWAL(true);
ReOpenNoDelete();
wp_db = dynamic_cast<WritePreparedTxnDB*>(db); wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
SequenceNumber prev_max = wp_db->max_evicted_seq_; txn = db->GetTransactionByName("xid0");
wp_db->AdvanceMaxEvictedSeq(prev_max, seq); ASSERT_FALSE(wp_db->delayed_prepared_empty_);
// delete txn; ReadLock rl(&wp_db->prepared_mutex_);
// auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); ASSERT_TRUE(wp_db->prepared_txns_.empty());
// db_impl->FlushWAL(true); ASSERT_FALSE(wp_db->delayed_prepared_.empty());
// ReOpenNoDelete(); ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
// wp_db = dynamic_cast<WritePreparedTxnDB*>(db); wp_db->delayed_prepared_.end());
// txn = db->GetTransactionByName("xid0");
// ASSERT_FALSE(wp_db->delayed_prepared_empty_);
// ReadLock rl(&wp_db->prepared_mutex_);
// ASSERT_TRUE(wp_db->prepared_txns_.empty());
// ASSERT_FALSE(wp_db->delayed_prepared_.empty());
// ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
// wp_db->delayed_prepared_.end());
} }
ASSERT_SAME(db, s1, v1, "key1"); ASSERT_SAME(db, s1, v1, "key1");

View File

@ -93,7 +93,12 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() {
SequenceNumber WritePreparedTxn::GetACommitSeqNumber(SequenceNumber prep_seq) { SequenceNumber WritePreparedTxn::GetACommitSeqNumber(SequenceNumber prep_seq) {
if (db_impl_->immutable_db_options().two_write_queues) { if (db_impl_->immutable_db_options().two_write_queues) {
return db_impl_->IncAndFetchSequenceNumber(); auto s = db_impl_->IncAndFetchSequenceNumber();
#ifndef NDEBUG
MutexLock l(&wpt_db_->seq_for_metadata_mutex_);
wpt_db_->seq_for_metadata.push_back(s);
#endif
return s;
} else { } else {
return prep_seq; return prep_seq;
} }
@ -161,8 +166,6 @@ Status WritePreparedTxn::RollbackInternal() {
WriteBatch rollback_batch; WriteBatch rollback_batch;
assert(GetId() != kMaxSequenceNumber); assert(GetId() != kMaxSequenceNumber);
assert(GetId() > 0); assert(GetId() > 0);
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&rollback_batch);
// In WritePrepared, the txn is is the same as prepare seq // In WritePrepared, the txn is is the same as prepare seq
auto last_visible_txn = GetId() - 1; auto last_visible_txn = GetId() - 1;
struct RollbackWriteBatchBuilder : public WriteBatch::Handler { struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
@ -227,6 +230,7 @@ Status WritePreparedTxn::RollbackInternal() {
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
// The Rollback marker will be used as a batch separator
WriteBatchInternal::MarkRollback(&rollback_batch, name_); WriteBatchInternal::MarkRollback(&rollback_batch, name_);
const bool disable_memtable = true; const bool disable_memtable = true;
const uint64_t no_log_ref = 0; const uint64_t no_log_ref = 0;

View File

@ -215,6 +215,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
} }
{ {
// We should not normally reach here // We should not normally reach here
// TODO(myabandeh): check only if snapshot_seq is in the list of snaphots
ReadLock rl(&old_commit_map_mutex_); ReadLock rl(&old_commit_map_mutex_);
auto old_commit_entry = old_commit_map_.find(prep_seq); auto old_commit_entry = old_commit_map_.find(prep_seq);
if (old_commit_entry == old_commit_map_.end() || if (old_commit_entry == old_commit_map_.end() ||

View File

@ -179,6 +179,12 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Struct to hold ownership of snapshot and read callback for cleanup. // Struct to hold ownership of snapshot and read callback for cleanup.
struct IteratorState; struct IteratorState;
#ifndef NDEBUG
// For unit tests we can track of the seq numbers that are used for metadata as opposed to actual key/values
std::vector<uint64_t> seq_for_metadata;
mutable port::Mutex seq_for_metadata_mutex_;
#endif
private: private:
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;