WritePrepared Txn: address some pending TODOs

Summary:
This patch addresses a couple of minor TODOs for WritePrepared Txn such as double checking some assert statements at runtime as well, skip extra AddPrepared in non-2pc transactions, and safety check for infinite loops.
Closes https://github.com/facebook/rocksdb/pull/3302

Differential Revision: D6617002

Pulled By: maysamyabandeh

fbshipit-source-id: ef6673c139cb49f64c0879508d2f573b78609aca
This commit is contained in:
Maysam Yabandeh 2018-01-09 08:47:46 -08:00 committed by Facebook Github Bot
parent 24e2c1640d
commit 00b33c2474
7 changed files with 181 additions and 117 deletions

View File

@ -1883,6 +1883,12 @@ class InMemoryHandler : public WriteBatch::Handler {
return Status::OK();
}
virtual Status MarkNoop(bool)
override {
row_ << "NOOP ";
return Status::OK();
}
virtual Status DeleteCF(uint32_t cf, const Slice& key) override {
row_ << "DELETE(" << cf << ") : ";
row_ << LDBCommand::StringToHex(key.ToString()) << " ";

View File

@ -11,6 +11,7 @@
#include "utilities/transactions/pessimistic_transaction_db.h"
#include <inttypes.h>
#include <string>
#include <unordered_set>
#include <vector>
@ -186,6 +187,8 @@ Status TransactionDB::Open(
Status s;
DB* db;
ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is " PRId32,
static_cast<int>(txn_db_options.write_policy));
std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
std::vector<size_t> compaction_enabled_cf_indices;
DBOptions db_options_2pc = db_options;

View File

@ -4977,6 +4977,69 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
}
}
// Test that the transactional db can handle duplicate keys in the write batch
TEST_P(TransactionTest, DuplicateKeyTest) {
for (bool do_prepare : {true, false}) {
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto s = txn0->SetName("xid");
ASSERT_OK(s);
s = txn0->Put(Slice("foo0"), Slice("bar0a"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo0"), Slice("bar0b"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo1"), Slice("bar1"));
ASSERT_OK(s);
s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
ASSERT_OK(s);
// TODO(myabandeh): enable this after duplicatae merge keys are supported
// s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
// ASSERT_OK(s);
s = txn0->Put(Slice("foo2"), Slice("bar2b"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo3"), Slice("bar3"));
ASSERT_OK(s);
// TODO(myabandeh): enable this after duplicatae merge keys are supported
// s = txn0->Merge(Slice("foo3"), Slice("bar3"));
// ASSERT_OK(s);
s = txn0->Put(Slice("foo4"), Slice("bar4"));
ASSERT_OK(s);
s = txn0->Delete(Slice("foo4"));
ASSERT_OK(s);
s = txn0->SingleDelete(Slice("foo4"));
ASSERT_OK(s);
if (do_prepare) {
s = txn0->Prepare();
ASSERT_OK(s);
}
s = txn0->Commit();
ASSERT_OK(s);
if (!do_prepare) {
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
pdb->UnregisterTransaction(txn0);
}
delete txn0;
ReadOptions ropt;
PinnableSlice pinnable_val;
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar0b"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar1"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar2b"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar3"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -1283,72 +1283,6 @@ TEST_P(WritePreparedTransactionTest, RollbackTest) {
}
}
// TODO(myabandeh): move it to transaction_test when it is extended to
// WROTE_PREPARED.
// Test that the transactional db can handle duplicate keys in the write batch
TEST_P(WritePreparedTransactionTest, DuplicateKeyTest) {
for (bool do_prepare : {true, false}) {
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto s = txn0->SetName("xid");
ASSERT_OK(s);
s = txn0->Put(Slice("foo0"), Slice("bar0a"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo0"), Slice("bar0b"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo1"), Slice("bar1"));
ASSERT_OK(s);
s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
ASSERT_OK(s);
// TODO(myabandeh): enable this after duplicatae merge keys are supported
// s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
// ASSERT_OK(s);
s = txn0->Put(Slice("foo2"), Slice("bar2b"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo3"), Slice("bar3"));
ASSERT_OK(s);
// TODO(myabandeh): enable this after duplicatae merge keys are supported
// s = txn0->Merge(Slice("foo3"), Slice("bar3"));
// ASSERT_OK(s);
s = txn0->Put(Slice("foo4"), Slice("bar4"));
ASSERT_OK(s);
s = txn0->Delete(Slice("foo4"));
ASSERT_OK(s);
s = txn0->SingleDelete(Slice("foo4"));
ASSERT_OK(s);
if (do_prepare) {
s = txn0->Prepare();
ASSERT_OK(s);
}
s = txn0->Commit();
ASSERT_OK(s);
if (!do_prepare) {
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
pdb->UnregisterTransaction(txn0);
}
delete txn0;
ReadOptions ropt;
PinnableSlice pinnable_val;
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar0b"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar1"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar2b"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar3"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
}
}
TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) {
// Use large buffer to avoid memtable flush after 1024 insertions
options.write_buffer_size = 1024 * 1024;

View File

@ -7,6 +7,11 @@
#include "utilities/transactions/write_prepared_txn.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <map>
#include "db/column_family.h"
@ -55,7 +60,7 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
assert(db_iter);
return write_batch_.NewIteratorWithBase(db_iter);
return write_batch_.NewIteratorWithBase(column_family, db_iter);
}
Status WritePreparedTxn::PrepareInternal() {
@ -75,10 +80,15 @@ Status WritePreparedTxn::PrepareInternal() {
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
!DISABLE_MEMTABLE, &seq_used);
assert(seq_used != kMaxSequenceNumber);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used;
SetId(prepare_seq);
wpt_db_->AddPrepared(prepare_seq);
// TODO(myabandeh): AddPrepared better to be called in the pre-release
// callback otherwise there is a non-zero chance of max dvancing prepare_seq
// and readers assume the data as committed.
if (s.ok()) {
wpt_db_->AddPrepared(prepare_seq);
}
return s;
}
@ -92,8 +102,15 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() {
}
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal");
// TODO(myabandeh): handle the duplicate keys in the batch
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
bool sync = write_options_.sync;
if (!do_one_write) {
// No need to sync on the first write
write_options_.sync = false;
}
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(batch);
const bool DISABLE_MEMTABLE = true;
@ -105,14 +122,25 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr,
no_log_ref, !DISABLE_MEMTABLE, &seq_used,
do_one_write ? &update_commit_map : nullptr);
assert(seq_used != kMaxSequenceNumber);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
uint64_t& prepare_seq = seq_used;
SetId(prepare_seq);
if (!s.ok()) {
return s;
}
if (do_one_write) {
return s;
} // else do the 2nd write for commit
uint64_t& prepare_seq = seq_used;
// Set the original value of sync
write_options_.sync = sync;
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq);
// Note: we skip AddPrepared here. This could be further optimized by skip
// erasing prepare_seq from prepared_txn_ in the following callback.
// TODO(myabandeh): What if max advances the prepare_seq_ in the meanwhile and
// readers assume the prepared data as committed? Almost zero probability.
// Commit the batch by writing an empty batch to the 2nd queue that will
// release the commit sequence number to readers.
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
@ -124,11 +152,13 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used,
&update_commit_map_with_prepare);
assert(seq_used != kMaxSequenceNumber);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
return s;
}
Status WritePreparedTxn::CommitInternal() {
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitInternal prepare_seq: %" PRIu64, GetID());
// We take the commit-time batch and append the Commit marker.
// The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
@ -143,8 +173,6 @@ Status WritePreparedTxn::CommitInternal() {
WriteBatchInternal::SetAsLastestPersistentState(working_batch);
}
// TODO(myabandeh): Reject a commit request if AddCommitted cannot encode
// commit_seq. This happens if prep_seq <<< commit_seq.
auto prepare_seq = GetId();
const bool includes_data = !empty && !for_recovery;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
@ -158,11 +186,13 @@ Status WritePreparedTxn::CommitInternal() {
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used,
&update_commit_map);
assert(seq_used != kMaxSequenceNumber);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
return s;
}
Status WritePreparedTxn::RollbackInternal() {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"RollbackInternal prepare_seq: %" PRIu64, GetId());
WriteBatch rollback_batch;
assert(GetId() != kMaxSequenceNumber);
assert(GetId() > 0);
@ -242,18 +272,20 @@ Status WritePreparedTxn::RollbackInternal() {
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
no_log_ref, !DISABLE_MEMTABLE, &seq_used,
do_one_write ? &update_commit_map : nullptr);
assert(seq_used != kMaxSequenceNumber);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (!s.ok()) {
return s;
}
if (do_one_write) {
// TODO(myabandeh): what if max has already advanced rollback_seq?
// Mark the txn as rolled back
uint64_t& rollback_seq = seq_used;
wpt_db_->RollbackPrepared(GetId(), rollback_seq);
return s;
} // else do the 2nd write for commit
uint64_t& prepare_seq = seq_used;
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"RollbackInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq);
// Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers.
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
@ -265,10 +297,12 @@ Status WritePreparedTxn::RollbackInternal() {
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used,
&update_commit_map_with_prepare);
assert(seq_used != kMaxSequenceNumber);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back
uint64_t& rollback_seq = seq_used;
wpt_db_->RollbackPrepared(GetId(), rollback_seq);
if (s.ok()) {
wpt_db_->RollbackPrepared(GetId(), rollback_seq);
}
return s;
}

View File

@ -22,6 +22,7 @@
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/mutexlock.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
@ -168,36 +169,31 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
// Adcance max_evicted_seq_ no more than 100 times before the cache wraps
// around.
INC_STEP_FOR_MAX_EVICTED =
std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast<size_t>(1));
std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
commit_cache_ = unique_ptr<std::atomic<CommitEntry64b>[]>(
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
}
#define ROCKSDB_LOG_DETAILS(LGR, FMT, ...) \
; // due to overhead by default skip such lines
// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__)
// Returns true if commit_seq <= snapshot_seq
bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
uint64_t snapshot_seq) const {
// Here we try to infer the return value without looking into prepare list.
// This would help avoiding synchronization over a shared map.
// TODO(myabandeh): read your own writes
// TODO(myabandeh): optimize this. This sequence of checks must be correct but
// not necessary efficient
if (prep_seq == 0) {
// Compaction will output keys to bottom-level with sequence number 0 if
// it is visible to the earliest snapshot.
ROCKSDB_LOG_DETAILS(
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
}
if (snapshot_seq < prep_seq) {
// snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
ROCKSDB_LOG_DETAILS(
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
@ -207,7 +203,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
ReadLock rl(&prepared_mutex_);
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// Then it is not committed yet
ROCKSDB_LOG_DETAILS(
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
@ -219,7 +215,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
if (exist && prep_seq == cached.prep_seq) {
// It is committed and also not evicted from commit cache
ROCKSDB_LOG_DETAILS(
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
return cached.commit_seq <= snapshot_seq;
@ -230,9 +226,10 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
// At this point we dont know if it was committed or it is still prepared
auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
// max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now
if (max_evicted_seq < prep_seq) {
// Not evicted from cache and also not present, so must be still prepared
ROCKSDB_LOG_DETAILS(
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
@ -247,7 +244,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
// only (iii) is the case: committed
// commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
// snapshot_seq
ROCKSDB_LOG_DETAILS(
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
@ -256,7 +253,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
// If there was no overlapping commit entry, then it is committed with a
// commit_seq lower than any live snapshot, including snapshot_seq.
if (old_commit_map_empty_.load(std::memory_order_acquire)) {
ROCKSDB_LOG_DETAILS(
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
@ -273,30 +270,34 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
found = std::binary_search(vec.begin(), vec.end(), prep_seq);
}
if (!found) {
ROCKSDB_LOG_DETAILS(
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
}
}
// (ii) it the case: it is committed but after the snapshot_seq
ROCKSDB_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
}
void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
ROCKSDB_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq);
// TODO(myabandeh): Add a runtime check to ensure the following assert.
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq);
assert(seq > max_evicted_seq_);
if (seq <= max_evicted_seq_) {
throw std::runtime_error(
"Added prepare_seq is larger than max_evicted_seq_: " + ToString(seq) +
" <= " + ToString(max_evicted_seq_.load()));
}
WriteLock wl(&prepared_mutex_);
prepared_txns_.push(seq);
}
void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq,
uint64_t rollback_seq) {
ROCKSDB_LOG_DETAILS(
ROCKS_LOG_DETAILS(
info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "",
prep_seq, rollback_seq);
std::vector<SequenceNumber> snapshots =
@ -322,10 +323,10 @@ void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq,
}
}
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
uint64_t commit_seq) {
ROCKSDB_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
prepare_seq, commit_seq);
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
bool prepare_skipped, uint8_t loop_cnt) {
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
prepare_seq, commit_seq);
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
@ -334,9 +335,9 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
if (to_be_evicted) {
auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
ROCKSDB_LOG_DETAILS(info_log_,
"Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
evicted.prep_seq, evicted.commit_seq, prev_max);
ROCKS_LOG_DETAILS(info_log_,
"Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
evicted.prep_seq, evicted.commit_seq, prev_max);
if (prev_max < evicted.commit_seq) {
// Inc max in larger steps to avoid frequent updates
auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED;
@ -351,11 +352,13 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
if (!succ) {
// A very rare event, in which the commit entry is updated before we do.
// Here we apply a very simple solution of retrying.
// TODO(myabandeh): do precautions to detect bugs that cause infinite loops
AddCommitted(prepare_seq, commit_seq);
if (loop_cnt > 100) {
throw std::runtime_error("Infinite loop in AddCommitted!");
}
AddCommitted(prepare_seq, commit_seq, prepare_skipped, ++loop_cnt);
return;
}
{
if (!prepare_skipped) {
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prepare_seq);
bool was_empty = delayed_prepared_.empty();
@ -442,6 +445,7 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max,
const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
SequenceNumber max) {
ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
InstrumentedMutex(db_impl_->mutex());
return db_impl_->snapshots().GetAll(nullptr, max);
}
@ -479,6 +483,8 @@ void WritePreparedTxnDB::ReleaseSnapshotInternal(
void WritePreparedTxnDB::UpdateSnapshots(
const std::vector<SequenceNumber>& snapshots,
const SequenceNumber& version) {
ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
version);
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
#ifndef NDEBUG

View File

@ -20,6 +20,7 @@
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/string_util.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_lock_mgr.h"
@ -27,6 +28,10 @@
namespace rocksdb {
#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \
; // due to overhead by default skip such lines
// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__)
// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
// In this way some data in the DB might not be committed. The DB provides
// mechanisms to tell such data apart from committed data.
@ -102,8 +107,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// be used to idenitfy the snapshots that overlap with the rolled back txn.
void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq);
// Add the transaction with prepare sequence prepare_seq and commit sequence
// commit_seq to the commit map
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq);
// commit_seq to the commit map. prepare_skipped is set if the prpeare phase
// is skipped for this commit. loop_cnt is to detect infinite loops.
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
bool prepare_skipped = false, uint8_t loop_cnt = 0);
struct CommitEntry {
uint64_t prep_seq;
@ -120,7 +127,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
: INDEX_BITS(index_bits),
PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)) {}
COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)),
DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {}
// Number of higher bits of a sequence number that is not used. They are
// used to encode the value type, ...
const size_t PAD_BITS = static_cast<size_t>(8);
@ -133,6 +141,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
const size_t COMMIT_BITS;
// Filter to encode/decode commit seq
const uint64_t COMMIT_FILTER;
// The value of commit_seq - prepare_seq + 1 must be less than this bound
const uint64_t DELTA_UPPERBOUND;
};
// Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
@ -157,7 +167,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
uint64_t delta = cs - ps + 1; // make initialized delta always >= 1
// zero is reserved for uninitialized entries
assert(0 < delta);
assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
assert(delta < format.DELTA_UPPERBOUND);
if (delta >= format.DELTA_UPPERBOUND) {
throw std::runtime_error(
"commit_seq >> prepare_seq. The allowed distance is " +
ToString(format.DELTA_UPPERBOUND) + " commit_seq is " +
ToString(cs) + " prepare_seq is " + ToString(ps));
}
rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
rep_ = rep_ | delta;
}
@ -332,7 +348,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// commit_cache_ must be initialized to zero to tell apart an empty index from
// a filled one. Thread-safety is provided with commit_cache_mutex_.
unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
// The largest evicted *commit* sequence number from the commit_cache_
// The largest evicted *commit* sequence number from the commit_cache_. If a
// seq is smaller than max_evicted_seq_ is might or might not be present in
// commit_cache_. So commit_cache_ must first be checked before consulting
// with max_evicted_seq_.
std::atomic<uint64_t> max_evicted_seq_ = {};
// Advance max_evicted_seq_ by this value each time it needs an update. The
// larger the value, the less frequent advances we would have. We do not want
@ -397,9 +416,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
} // else there was no prepare phase
if (includes_data_) {
// Commit the data that is accompnaied with the commit request
// TODO(myabandeh): skip AddPrepared
db_->AddPrepared(commit_seq);
db_->AddCommitted(commit_seq, commit_seq);
const bool PREPARE_SKIPPED = true;
db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED);
}
if (db_impl_->immutable_db_options().two_write_queues) {
// Publish the sequence number. We can do that here assuming the callback