WritePrepared: Add rollback batch to PreparedHeap (#5026)

Summary:
The patch adds the sequence number of the rollback patch to the PrepareHeap when two_write_queues is enabled. Although the current behavior is still correct, the change simplifies reasoning about the code, by having all uncommitted batches registered with the PreparedHeap.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5026

Differential Revision: D14249401

Pulled By: maysamyabandeh

fbshipit-source-id: 1e3424edee5cd14e56ee35931ad3c93ed997cd5a
This commit is contained in:
Maysam Yabandeh 2019-03-07 07:26:36 -08:00 committed by Facebook Github Bot
parent 186b3afaa8
commit 703f1375c2
4 changed files with 25 additions and 10 deletions

View File

@ -7,6 +7,12 @@
#include "utilities/transactions/transaction_base.h" #include "utilities/transactions/transaction_base.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/column_family.h" #include "db/column_family.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"

View File

@ -1285,7 +1285,7 @@ TEST_P(WritePreparedTransactionTest, TxnInitialize) {
// This tests that transactions with duplicate keys perform correctly after max // This tests that transactions with duplicate keys perform correctly after max
// is advancing their prepared sequence numbers. This will not be the case if // is advancing their prepared sequence numbers. This will not be the case if
// for example the txn does not add the prepared seq for the second sub-batch to // for example the txn does not add the prepared seq for the second sub-batch to
// the PrepareHeap structure. // the PreparedHeap structure.
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) { TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
WriteOptions write_options; WriteOptions write_options;
TransactionOptions txn_options; TransactionOptions txn_options;
@ -1297,7 +1297,7 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Ensure that all the prepared sequence numbers will be removed from the // Ensure that all the prepared sequence numbers will be removed from the
// PrepareHeap. // PreparedHeap.
SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE; SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE;
wp_db->AdvanceMaxEvictedSeq(0, new_max); wp_db->AdvanceMaxEvictedSeq(0, new_max);

View File

@ -88,7 +88,7 @@ Status WritePreparedTxn::PrepareInternal() {
// For each duplicate key we account for a new sub-batch // For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
// Having AddPrepared in the PreReleaseCallback allows in-order addition of // Having AddPrepared in the PreReleaseCallback allows in-order addition of
// prepared entries to PrepareHeap and hence enables an optimization. Refer to // prepared entries to PreparedHeap and hence enables an optimization. Refer to
// SmallestUnCommittedSeq for more details. // SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback( AddPreparedCallback add_prepared_callback(
wpt_db_, prepare_batch_cnt_, wpt_db_, prepare_batch_cnt_,
@ -328,15 +328,23 @@ Status WritePreparedTxn::RollbackInternal() {
// CommitCache that keeps an entry evicted due to max advance and yet overlaps // CommitCache that keeps an entry evicted due to max advance and yet overlaps
// with a live snapshot around so that the live snapshot properly skips the // with a live snapshot around so that the live snapshot properly skips the
// entry even if its prepare seq is lower than max_evicted_seq_. // entry even if its prepare seq is lower than max_evicted_seq_.
AddPreparedCallback add_prepared_callback(
wpt_db_, ONE_BATCH, db_impl_->immutable_db_options().two_write_queues);
WritePreparedCommitEntryPreReleaseCallback update_commit_map( WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH); wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
PreReleaseCallback* pre_release_callback;
if (do_one_write) {
pre_release_callback = &update_commit_map;
} else {
pre_release_callback = &add_prepared_callback;
}
// Note: the rollback batch does not need AddPrepared since it is written to // Note: the rollback batch does not need AddPrepared since it is written to
// DB in one shot. min_uncommitted still works since it requires capturing // DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while // data that is written to DB but not yet committed, while
// the rollback batch commits with PreReleaseCallback. // the rollback batch commits with PreReleaseCallback.
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
do_one_write ? &update_commit_map : nullptr); pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -345,14 +353,14 @@ Status WritePreparedTxn::RollbackInternal() {
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
return s; return s;
} // else do the 2nd write for commit } // else do the 2nd write for commit
uint64_t& prepare_seq = seq_used; uint64_t rollback_seq = seq_used;
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"RollbackInternal 2nd write prepare_seq: %" PRIu64, "RollbackInternal 2nd write rollback_seq: %" PRIu64,
prepare_seq); rollback_seq);
// Commit the batch by writing an empty batch to the queue that will release // Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers. // the commit sequence number to readers.
WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare( WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, GetId(), prepare_seq, prepare_batch_cnt_); wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
WriteBatch empty_batch; WriteBatch empty_batch;
empty_batch.PutLogData(Slice()); empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator
@ -367,6 +375,7 @@ Status WritePreparedTxn::RollbackInternal() {
if (s.ok()) { if (s.ok()) {
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
} }
wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
return s; return s;
} }

View File

@ -207,8 +207,8 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
// is a non-zero chance of max advancing prepare_seq and readers assume the // is a non-zero chance of max advancing prepare_seq and readers assume the
// data as committed. // data as committed.
// Also having it in the PreReleaseCallback allows in-order addition of // Also having it in the PreReleaseCallback allows in-order addition of
// prepared entries to PrepareHeap and hence enables an optimization. Refer to // prepared entries to PreparedHeap and hence enables an optimization. Refer
// SmallestUnCommittedSeq for more details. // to SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback( AddPreparedCallback add_prepared_callback(
wpt_db_, prepare_batch_cnt_, wpt_db_, prepare_batch_cnt_,
db_impl_->immutable_db_options().two_write_queues); db_impl_->immutable_db_options().two_write_queues);