From f622ca2c7c12ff13b24083b57d1279aaa38a2ccd Mon Sep 17 00:00:00 2001 From: Manuel Ung Date: Wed, 31 Jul 2019 13:36:22 -0700 Subject: [PATCH] WriteUnPrepared: savepoint support (#5627) Summary: Add savepoint support when the current transaction has flushed unprepared batches. Rolling back to savepoint is similar to rolling back a transaction. It requires the set of keys that have changed since the savepoint, re-reading the keys at the snapshot at that savepoint, and the restoring the old keys by writing out another unprepared batch. For this strategy to work though, we must be capable of reading keys at a savepoint. This does not work if keys were written out using the same sequence number before and after a savepoint. Therefore, when we flush out unprepared batches, we must split the batch by savepoint if any savepoints exist. eg. If we have the following: ``` Put(A) Put(B) Put(C) SetSavePoint() Put(D) Put(E) SetSavePoint() Put(F) ``` Then we will write out 3 separate unprepared batches: ``` Put(A) 1 Put(B) 1 Put(C) 1 Put(D) 2 Put(E) 2 Put(F) 3 ``` This is so that when we rollback to eg. the first savepoint, we can just read keys at snapshot_seq = 1. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5627 Differential Revision: D16584130 Pulled By: lth fbshipit-source-id: 6d100dd548fb20c4b76661bd0f8a2647e64477fa --- db/write_batch.cc | 48 ++-- db/write_batch_internal.h | 4 + .../utilities/write_batch_with_index.h | 2 + include/rocksdb/write_batch.h | 2 +- utilities/transactions/transaction_base.cc | 4 +- utilities/transactions/transaction_base.h | 20 +- .../transactions/write_unprepared_txn.cc | 258 +++++++++++++++++- utilities/transactions/write_unprepared_txn.h | 60 +++- .../transactions/write_unprepared_txn_db.cc | 4 +- .../write_batch_with_index.cc | 5 + 10 files changed, 378 insertions(+), 29 deletions(-) diff --git a/db/write_batch.cc b/db/write_batch.cc index 2c2d81e87..8a896644f 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -511,12 +511,25 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, } Status WriteBatch::Iterate(Handler* handler) const { - Slice input(rep_); - if (input.size() < WriteBatchInternal::kHeader) { + if (rep_.size() < WriteBatchInternal::kHeader) { return Status::Corruption("malformed WriteBatch (too small)"); } - input.remove_prefix(WriteBatchInternal::kHeader); + return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader, + rep_.size()); +} + +Status WriteBatchInternal::Iterate(const WriteBatch* wb, + WriteBatch::Handler* handler, size_t begin, + size_t end) { + if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) { + return Status::Corruption("Invalid start/end bounds for Iterate"); + } + assert(begin <= end); + Slice input(wb->rep_.data() + begin, static_cast(end - begin)); + bool whole_batch = + (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size()); + Slice key, value, blob, xid; // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as // the batch boundary symbols otherwise we would mis-count the number of @@ -547,7 +560,7 @@ Status WriteBatch::Iterate(Handler* handler) const { } } else { assert(s.IsTryAgain()); - assert(!last_was_try_again); // to detect infinite loop bugs + assert(!last_was_try_again); // to detect infinite loop bugs if (UNLIKELY(last_was_try_again)) { return Status::Corruption( "two consecutive TryAgain in WriteBatch handler; this is either a " @@ -560,7 +573,7 @@ Status WriteBatch::Iterate(Handler* handler) const { switch (tag) { case kTypeColumnFamilyValue: case kTypeValue: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); s = handler->PutCF(column_family, key, value); if (LIKELY(s.ok())) { @@ -570,7 +583,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeColumnFamilyDeletion: case kTypeDeletion: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); s = handler->DeleteCF(column_family, key); if (LIKELY(s.ok())) { @@ -580,7 +593,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeColumnFamilySingleDeletion: case kTypeSingleDeletion: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); s = handler->SingleDeleteCF(column_family, key); if (LIKELY(s.ok())) { @@ -590,7 +603,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeColumnFamilyRangeDeletion: case kTypeRangeDeletion: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE)); s = handler->DeleteRangeCF(column_family, key, value); if (LIKELY(s.ok())) { @@ -600,7 +613,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeColumnFamilyMerge: case kTypeMerge: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); s = handler->MergeCF(column_family, key, value); if (LIKELY(s.ok())) { @@ -610,7 +623,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeColumnFamilyBlobIndex: case kTypeBlobIndex: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX)); s = handler->PutBlobIndexCF(column_family, key, value); if (LIKELY(s.ok())) { @@ -623,7 +636,7 @@ Status WriteBatch::Iterate(Handler* handler) const { empty_batch = false; break; case kTypeBeginPrepareXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); handler->MarkBeginPrepare(); empty_batch = false; @@ -642,7 +655,7 @@ Status WriteBatch::Iterate(Handler* handler) const { } break; case kTypeBeginPersistedPrepareXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); handler->MarkBeginPrepare(); empty_batch = false; @@ -655,7 +668,7 @@ Status WriteBatch::Iterate(Handler* handler) const { } break; case kTypeBeginUnprepareXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE)); handler->MarkBeginPrepare(true /* unprepared */); empty_batch = false; @@ -674,19 +687,19 @@ Status WriteBatch::Iterate(Handler* handler) const { } break; case kTypeEndPrepareXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE)); handler->MarkEndPrepare(xid); empty_batch = true; break; case kTypeCommitXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT)); handler->MarkCommit(xid); empty_batch = true; break; case kTypeRollbackXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK)); handler->MarkRollback(xid); empty_batch = true; @@ -702,7 +715,8 @@ Status WriteBatch::Iterate(Handler* handler) const { if (!s.ok()) { return s; } - if (handler_continue && found != WriteBatchInternal::Count(this)) { + if (handler_continue && whole_batch && + found != WriteBatchInternal::Count(wb)) { return Status::Corruption("WriteBatch has wrong count"); } else { return Status::OK(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index bae62bf03..67136a847 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -192,6 +192,10 @@ class WriteBatchInternal { // leftByteSize and a WriteBatch with ByteSize rightByteSize static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); + // Iterate over [begin, end) range of a write batch + static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler, + size_t begin, size_t end); + // This write batch includes the latest state that should be persisted. Such // state meant to be used only during recovery. static void SetAsLastestPersistentState(WriteBatch* b); diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 34e6c4689..586088d75 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -100,6 +100,8 @@ class WriteBatchWithIndex : public WriteBatchBase { size_t max_bytes = 0); ~WriteBatchWithIndex() override; + WriteBatchWithIndex(WriteBatchWithIndex&&); + WriteBatchWithIndex& operator=(WriteBatchWithIndex&&); using WriteBatchBase::Put; Status Put(ColumnFamilyHandle* column_family, const Slice& key, diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 393c5d9c6..b6b7c8bb8 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -271,7 +271,7 @@ class WriteBatch : public WriteBatchBase { virtual bool Continue(); protected: - friend class WriteBatch; + friend class WriteBatchInternal; virtual bool WriteAfterCommit() const { return true; } virtual bool WriteBeforePrepare() const { return false; } }; diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index bf59a1c40..30861f091 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -30,7 +30,7 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db, assert(dynamic_cast(db_) != nullptr); log_number_ = 0; if (dbimpl_->allow_2pc()) { - WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + InitWriteBatch(); } } @@ -49,7 +49,7 @@ void TransactionBaseImpl::Clear() { num_merges_ = 0; if (dbimpl_->allow_2pc()) { - WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + InitWriteBatch(); } } diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 657e9c596..72fa9d26a 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -11,6 +11,7 @@ #include #include +#include "db/write_batch_internal.h" #include "rocksdb/db.h" #include "rocksdb/slice.h" #include "rocksdb/snapshot.h" @@ -273,6 +274,15 @@ class TransactionBaseImpl : public Transaction { // Sets a snapshot if SetSnapshotOnNextOperation() has been called. void SetSnapshotIfNeeded(); + // Initialize write_batch_ for 2PC by inserting Noop. + inline void InitWriteBatch(bool clear = false) { + if (clear) { + write_batch_.Clear(); + } + assert(write_batch_.GetDataSize() == WriteBatchInternal::kHeader); + WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + } + DB* db_; DBImpl* dbimpl_; @@ -325,16 +335,18 @@ class TransactionBaseImpl : public Transaction { // Optimistic Transactions will wait till commit time to do conflict checking. TransactionKeyMap tracked_keys_; + // Stack of the Snapshot saved at each save point. Saved snapshots may be + // nullptr if there was no snapshot at the time SetSavePoint() was called. + std::unique_ptr>> + save_points_; + private: friend class WritePreparedTxn; // Extra data to be persisted with the commit. Note this is only used when // prepare phase is not skipped. WriteBatch commit_time_batch_; - // Stack of the Snapshot saved at each save point. Saved snapshots may be - // nullptr if there was no snapshot at the time SetSavePoint() was called. - std::unique_ptr>> save_points_; - // If true, future Put/Merge/Deletes will be indexed in the // WriteBatchWithIndex. // If false, future Put/Merge/Deletes will be inserted directly into the diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index c5f4db5bd..993c3b8b6 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -78,6 +78,8 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { } unprep_seqs_.clear(); + flushed_save_points_.reset(nullptr); + unflushed_save_points_.reset(nullptr); recovered_txn_ = false; largest_validated_seq_ = 0; } @@ -236,6 +238,20 @@ Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { } Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { + // If the current write batch contains savepoints, then some special handling + // is required so that RollbackToSavepoint can work. + // + // RollbackToSavepoint is not supported after Prepare() is called, so only do + // this for unprepared batches. + if (!prepared && unflushed_save_points_ != nullptr && + !unflushed_save_points_->empty()) { + return FlushWriteBatchWithSavePointToDB(); + } + + return FlushWriteBatchToDBInternal(prepared); +} + +Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) { if (name_.empty()) { return Status::InvalidArgument("Cannot write to DB without SetName."); } @@ -285,13 +301,118 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { // Reset transaction state. if (!prepared) { prepare_batch_cnt_ = 0; - write_batch_.Clear(); - WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + const bool kClear = true; + TransactionBaseImpl::InitWriteBatch(kClear); } return s; } +Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() { + assert(unflushed_save_points_ != nullptr && + unflushed_save_points_->size() > 0); + assert(save_points_ != nullptr && save_points_->size() > 0); + assert(save_points_->size() >= unflushed_save_points_->size()); + + // Handler class for creating an unprepared batch from a savepoint. + struct SavePointBatchHandler : public WriteBatch::Handler { + WriteBatchWithIndex* wb_; + const std::map& handles_; + + SavePointBatchHandler( + WriteBatchWithIndex* wb, + const std::map& handles) + : wb_(wb), handles_(handles) {} + + Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override { + return wb_->Put(handles_.at(cf), key, value); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return wb_->Delete(handles_.at(cf), key); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return wb_->SingleDelete(handles_.at(cf), key); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override { + return wb_->Merge(handles_.at(cf), key, value); + } + + // The only expected 2PC marker is the initial Noop marker. + Status MarkNoop(bool empty_batch) override { + return empty_batch ? Status::OK() : Status::InvalidArgument(); + } + + Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } + + Status MarkEndPrepare(const Slice&) override { + return Status::InvalidArgument(); + } + + Status MarkCommit(const Slice&) override { + return Status::InvalidArgument(); + } + + Status MarkRollback(const Slice&) override { + return Status::InvalidArgument(); + } + }; + + // The comparator of the default cf is passed in, similar to the + // initialization of TransactionBaseImpl::write_batch_. This comparator is + // only used if the write batch encounters an invalid cf id, and falls back to + // this comparator. + WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0, + true, 0); + // Swap with write_batch_ so that wb contains the complete write batch. The + // actual write batch that will be flushed to DB will be built in + // write_batch_, and will be read by FlushWriteBatchToDBInternal. + std::swap(wb, write_batch_); + TransactionBaseImpl::InitWriteBatch(); + + size_t prev_boundary = WriteBatchInternal::kHeader; + const bool kPrepared = true; + for (size_t i = 0; i < unflushed_save_points_->size(); i++) { + SavePointBatchHandler sp_handler(&write_batch_, + *wupt_db_->GetCFHandleMap().get()); + size_t curr_boundary = (*unflushed_save_points_)[i]; + + // Construct the partial write batch up to the savepoint. + // + // Theoretically, a memcpy between the write batches should be sufficient + // since the rewriting into the batch should produce the exact same byte + // representation. Rebuilding the WriteBatchWithIndex index is still + // necessary though, and would imply doing two passes over the batch though. + Status s = WriteBatchInternal::Iterate(wb.GetWriteBatch(), &sp_handler, + prev_boundary, curr_boundary); + if (!s.ok()) { + return s; + } + + // Flush the write batch. + s = FlushWriteBatchToDBInternal(!kPrepared); + if (!s.ok()) { + return s; + } + + if (flushed_save_points_ == nullptr) { + flushed_save_points_.reset( + new autovector()); + } + flushed_save_points_->emplace_back( + unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot())); + + prev_boundary = curr_boundary; + const bool kClear = true; + TransactionBaseImpl::InitWriteBatch(kClear); + } + + unflushed_save_points_->clear(); + return Status::OK(); +} + Status WriteUnpreparedTxn::PrepareInternal() { const bool kPrepared = true; return FlushWriteBatchToDB(kPrepared); @@ -379,6 +500,8 @@ Status WriteUnpreparedTxn::CommitInternal() { wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); } unprep_seqs_.clear(); + flushed_save_points_.reset(nullptr); + unflushed_save_points_.reset(nullptr); return s; } // else do the 2nd write to publish seq @@ -410,6 +533,8 @@ Status WriteUnpreparedTxn::CommitInternal() { wpt_db_->RemovePrepared(seq.first, seq.second); } unprep_seqs_.clear(); + flushed_save_points_.reset(nullptr); + unflushed_save_points_.reset(nullptr); return s; } @@ -488,6 +613,8 @@ Status WriteUnpreparedTxn::RollbackInternal() { wpt_db_->RemovePrepared(seq.first, seq.second); } unprep_seqs_.clear(); + flushed_save_points_.reset(nullptr); + unflushed_save_points_.reset(nullptr); return s; } // else do the 2nd write for commit uint64_t& prepare_seq = seq_used; @@ -514,6 +641,8 @@ Status WriteUnpreparedTxn::RollbackInternal() { } unprep_seqs_.clear(); + flushed_save_points_.reset(nullptr); + unflushed_save_points_.reset(nullptr); return s; } @@ -524,6 +653,131 @@ void WriteUnpreparedTxn::Clear() { TransactionBaseImpl::Clear(); } +void WriteUnpreparedTxn::SetSavePoint() { + assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) + + (flushed_save_points_ ? flushed_save_points_->size() : 0) == + (save_points_ ? save_points_->size() : 0)); + PessimisticTransaction::SetSavePoint(); + if (unflushed_save_points_ == nullptr) { + unflushed_save_points_.reset(new autovector()); + } + unflushed_save_points_->push_back(write_batch_.GetDataSize()); +} + +Status WriteUnpreparedTxn::RollbackToSavePoint() { + assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) + + (flushed_save_points_ ? flushed_save_points_->size() : 0) == + (save_points_ ? save_points_->size() : 0)); + if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) { + Status s = PessimisticTransaction::RollbackToSavePoint(); + assert(!s.IsNotFound()); + unflushed_save_points_->pop_back(); + return s; + } + + if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) { + return RollbackToSavePointInternal(); + } + + return Status::NotFound(); +} + +Status WriteUnpreparedTxn::RollbackToSavePointInternal() { + Status s; + + const bool kClear = true; + TransactionBaseImpl::InitWriteBatch(kClear); + + assert(flushed_save_points_->size() > 0); + WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back(); + + assert(top.unprep_seqs_.size() > 0); + assert(save_points_ != nullptr && save_points_->size() > 0); + const TransactionKeyMap& tracked_keys = save_points_->top().new_keys_; + + // TODO(lth): Reduce duplicate code with RollbackInternal logic. + ReadOptions roptions; + roptions.snapshot = top.snapshot_->snapshot(); + SequenceNumber min_uncommitted = + static_cast_with_check( + roptions.snapshot) + ->min_uncommitted_; + SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber(); + WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, + top.unprep_seqs_); + const auto& cf_map = *wupt_db_->GetCFHandleMap(); + for (const auto& cfkey : tracked_keys) { + const auto cfid = cfkey.first; + const auto& keys = cfkey.second; + + for (const auto& pair : keys) { + const auto& key = pair.first; + const auto& cf_handle = cf_map.at(cfid); + PinnableSlice pinnable_val; + bool not_used; + s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, + &callback); + + if (s.ok()) { + s = write_batch_.Put(cf_handle, key, pinnable_val); + assert(s.ok()); + } else if (s.IsNotFound()) { + s = write_batch_.Delete(cf_handle, key); + assert(s.ok()); + } else { + return s; + } + } + } + + const bool kPrepared = true; + s = FlushWriteBatchToDBInternal(!kPrepared); + assert(s.ok()); + if (!s.ok()) { + return s; + } + + // PessimisticTransaction::RollbackToSavePoint will call also call + // RollbackToSavepoint on write_batch_. However, write_batch_ is empty and has + // no savepoints because this savepoint has already been flushed. Work around + // this by setting a fake savepoint. + write_batch_.SetSavePoint(); + s = PessimisticTransaction::RollbackToSavePoint(); + assert(s.ok()); + if (!s.ok()) { + return s; + } + + flushed_save_points_->pop_back(); + return s; +} + +Status WriteUnpreparedTxn::PopSavePoint() { + assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) + + (flushed_save_points_ ? flushed_save_points_->size() : 0) == + (save_points_ ? save_points_->size() : 0)); + if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) { + Status s = PessimisticTransaction::PopSavePoint(); + assert(!s.IsNotFound()); + unflushed_save_points_->pop_back(); + return s; + } + + if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) { + // PessimisticTransaction::PopSavePoint will call also call PopSavePoint on + // write_batch_. However, write_batch_ is empty and has no savepoints + // because this savepoint has already been flushed. Work around this by + // setting a fake savepoint. + write_batch_.SetSavePoint(); + Status s = PessimisticTransaction::PopSavePoint(); + assert(!s.IsNotFound()); + flushed_save_points_->pop_back(); + return s; + } + + return Status::NotFound(); +} + void WriteUnpreparedTxn::MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family, const size_t num_keys, const Slice* keys, diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 77c180338..774d90e8d 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -73,7 +73,6 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { wup_snapshot_ = seq; } - private: static SequenceNumber CalcMaxVisibleSeq( const std::map& unprep_seqs, SequenceNumber snapshot_seq) { @@ -84,6 +83,8 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { } return std::max(max_unprepared, snapshot_seq); } + + private: WritePreparedTxnDB* db_; const std::map& unprep_seqs_; SequenceNumber wup_snapshot_; @@ -139,6 +140,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn { void Clear() override; + void SetSavePoint() override; + Status RollbackToSavePoint() override; + Status PopSavePoint() override; + // Get and GetIterator needs to be overridden so that a ReadCallback to // handle read-your-own-write is used. using Transaction::Get; @@ -172,6 +177,9 @@ class WriteUnpreparedTxn : public WritePreparedTxn { Status MaybeFlushWriteBatchToDB(); Status FlushWriteBatchToDB(bool prepared); + Status FlushWriteBatchToDBInternal(bool prepared); + Status FlushWriteBatchWithSavePointToDB(); + Status RollbackToSavePointInternal(); Status HandleWrite(std::function do_write); // For write unprepared, we check on every writebatch append to see if @@ -210,6 +218,56 @@ class WriteUnpreparedTxn : public WritePreparedTxn { // but in some cases, we should be able to restore the previously largest // value when calling RollbackToSavepoint. SequenceNumber largest_validated_seq_; + + struct SavePoint { + // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is + // used during RollbackToSavepoint to determine visibility when restoring + // old values. + // + // TODO(lth): Since all unprep_seqs_ sets further down the stack must be + // subsets, this can potentially be deduplicated by just storing set + // difference. Investigate if this is worth it. + std::map unprep_seqs_; + + // This snapshot will be used to read keys at this savepoint if we call + // RollbackToSavePoint. + std::unique_ptr snapshot_; + + SavePoint(const std::map& seqs, + ManagedSnapshot* snapshot) + : unprep_seqs_(seqs), snapshot_(snapshot){}; + }; + + // We have 3 data structures holding savepoint information: + // 1. TransactionBaseImpl::save_points_ + // 2. WriteUnpreparedTxn::flushed_save_points_ + // 3. WriteUnpreparecTxn::unflushed_save_points_ + // + // TransactionBaseImpl::save_points_ holds information about all write + // batches, including the current in-memory write_batch_, or unprepared + // batches that have been written out. Its responsibility is just to track + // which keys have been modified in every savepoint. + // + // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints + // set on unprepared batches that have already flushed. It holds the snapshot + // and unprep_seqs at that savepoint, so that the rollback process can + // determine which keys were visible at that point in time. + // + // WriteUnpreparecTxn::unflushed_save_points_ holds information about + // savepoints on the current in-memory write_batch_. It simply records the + // size of the write batch at every savepoint. + // + // TODO(lth): Remove the redundancy between save_point_boundaries_ and + // write_batch_.save_points_. + // + // Based on this information, here are some invariants: + // size(unflushed_save_points_) = size(write_batch_.save_points_) + // size(flushed_save_points_) + size(unflushed_save_points_) + // = size(save_points_) + // + std::unique_ptr> + flushed_save_points_; + std::unique_ptr> unflushed_save_points_; }; } // namespace rocksdb diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 875d54167..4381619e7 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -279,8 +279,8 @@ Status WriteUnpreparedTxnDB::Initialize( } } - wupt->write_batch_.Clear(); - WriteBatchInternal::InsertNoop(wupt->write_batch_.GetWriteBatch()); + const bool kClear = true; + wupt->InitWriteBatch(kClear); real_trx->SetState(Transaction::PREPARED); if (!s.ok()) { diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index cf17abf22..3ffa2e0c6 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -627,6 +627,11 @@ WriteBatchWithIndex::WriteBatchWithIndex( WriteBatchWithIndex::~WriteBatchWithIndex() {} +WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex&&) = default; + +WriteBatchWithIndex& WriteBatchWithIndex::operator=(WriteBatchWithIndex&&) = + default; + WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }