diff --git a/db/write_batch.cc b/db/write_batch.cc index 2c2d81e87..8a896644f 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -511,12 +511,25 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, } Status WriteBatch::Iterate(Handler* handler) const { - Slice input(rep_); - if (input.size() < WriteBatchInternal::kHeader) { + if (rep_.size() < WriteBatchInternal::kHeader) { return Status::Corruption("malformed WriteBatch (too small)"); } - input.remove_prefix(WriteBatchInternal::kHeader); + return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader, + rep_.size()); +} + +Status WriteBatchInternal::Iterate(const WriteBatch* wb, + WriteBatch::Handler* handler, size_t begin, + size_t end) { + if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) { + return Status::Corruption("Invalid start/end bounds for Iterate"); + } + assert(begin <= end); + Slice input(wb->rep_.data() + begin, static_cast(end - begin)); + bool whole_batch = + (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size()); + Slice key, value, blob, xid; // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as // the batch boundary symbols otherwise we would mis-count the number of @@ -547,7 +560,7 @@ Status WriteBatch::Iterate(Handler* handler) const { } } else { assert(s.IsTryAgain()); - assert(!last_was_try_again); // to detect infinite loop bugs + assert(!last_was_try_again); // to detect infinite loop bugs if (UNLIKELY(last_was_try_again)) { return Status::Corruption( "two consecutive TryAgain in WriteBatch handler; this is either a " @@ -560,7 +573,7 @@ Status WriteBatch::Iterate(Handler* handler) const { switch (tag) { case kTypeColumnFamilyValue: case kTypeValue: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); s = handler->PutCF(column_family, key, value); if (LIKELY(s.ok())) { @@ -570,7 +583,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeColumnFamilyDeletion: case kTypeDeletion: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); s = handler->DeleteCF(column_family, key); if (LIKELY(s.ok())) { @@ -580,7 +593,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeColumnFamilySingleDeletion: case kTypeSingleDeletion: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); s = handler->SingleDeleteCF(column_family, key); if (LIKELY(s.ok())) { @@ -590,7 +603,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeColumnFamilyRangeDeletion: case kTypeRangeDeletion: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE)); s = handler->DeleteRangeCF(column_family, key, value); if (LIKELY(s.ok())) { @@ -600,7 +613,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeColumnFamilyMerge: case kTypeMerge: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); s = handler->MergeCF(column_family, key, value); if (LIKELY(s.ok())) { @@ -610,7 +623,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeColumnFamilyBlobIndex: case kTypeBlobIndex: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX)); s = handler->PutBlobIndexCF(column_family, key, value); if (LIKELY(s.ok())) { @@ -623,7 +636,7 @@ Status WriteBatch::Iterate(Handler* handler) const { empty_batch = false; break; case kTypeBeginPrepareXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); handler->MarkBeginPrepare(); empty_batch = false; @@ -642,7 +655,7 @@ Status WriteBatch::Iterate(Handler* handler) const { } break; case kTypeBeginPersistedPrepareXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); handler->MarkBeginPrepare(); empty_batch = false; @@ -655,7 +668,7 @@ Status WriteBatch::Iterate(Handler* handler) const { } break; case kTypeBeginUnprepareXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE)); handler->MarkBeginPrepare(true /* unprepared */); empty_batch = false; @@ -674,19 +687,19 @@ Status WriteBatch::Iterate(Handler* handler) const { } break; case kTypeEndPrepareXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE)); handler->MarkEndPrepare(xid); empty_batch = true; break; case kTypeCommitXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT)); handler->MarkCommit(xid); empty_batch = true; break; case kTypeRollbackXID: - assert(content_flags_.load(std::memory_order_relaxed) & + assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK)); handler->MarkRollback(xid); empty_batch = true; @@ -702,7 +715,8 @@ Status WriteBatch::Iterate(Handler* handler) const { if (!s.ok()) { return s; } - if (handler_continue && found != WriteBatchInternal::Count(this)) { + if (handler_continue && whole_batch && + found != WriteBatchInternal::Count(wb)) { return Status::Corruption("WriteBatch has wrong count"); } else { return Status::OK(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index bae62bf03..67136a847 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -192,6 +192,10 @@ class WriteBatchInternal { // leftByteSize and a WriteBatch with ByteSize rightByteSize static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); + // Iterate over [begin, end) range of a write batch + static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler, + size_t begin, size_t end); + // This write batch includes the latest state that should be persisted. Such // state meant to be used only during recovery. static void SetAsLastestPersistentState(WriteBatch* b); diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 34e6c4689..586088d75 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -100,6 +100,8 @@ class WriteBatchWithIndex : public WriteBatchBase { size_t max_bytes = 0); ~WriteBatchWithIndex() override; + WriteBatchWithIndex(WriteBatchWithIndex&&); + WriteBatchWithIndex& operator=(WriteBatchWithIndex&&); using WriteBatchBase::Put; Status Put(ColumnFamilyHandle* column_family, const Slice& key, diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 393c5d9c6..b6b7c8bb8 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -271,7 +271,7 @@ class WriteBatch : public WriteBatchBase { virtual bool Continue(); protected: - friend class WriteBatch; + friend class WriteBatchInternal; virtual bool WriteAfterCommit() const { return true; } virtual bool WriteBeforePrepare() const { return false; } }; diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index bf59a1c40..30861f091 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -30,7 +30,7 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db, assert(dynamic_cast(db_) != nullptr); log_number_ = 0; if (dbimpl_->allow_2pc()) { - WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + InitWriteBatch(); } } @@ -49,7 +49,7 @@ void TransactionBaseImpl::Clear() { num_merges_ = 0; if (dbimpl_->allow_2pc()) { - WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + InitWriteBatch(); } } diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 657e9c596..72fa9d26a 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -11,6 +11,7 @@ #include #include +#include "db/write_batch_internal.h" #include "rocksdb/db.h" #include "rocksdb/slice.h" #include "rocksdb/snapshot.h" @@ -273,6 +274,15 @@ class TransactionBaseImpl : public Transaction { // Sets a snapshot if SetSnapshotOnNextOperation() has been called. void SetSnapshotIfNeeded(); + // Initialize write_batch_ for 2PC by inserting Noop. + inline void InitWriteBatch(bool clear = false) { + if (clear) { + write_batch_.Clear(); + } + assert(write_batch_.GetDataSize() == WriteBatchInternal::kHeader); + WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + } + DB* db_; DBImpl* dbimpl_; @@ -325,16 +335,18 @@ class TransactionBaseImpl : public Transaction { // Optimistic Transactions will wait till commit time to do conflict checking. TransactionKeyMap tracked_keys_; + // Stack of the Snapshot saved at each save point. Saved snapshots may be + // nullptr if there was no snapshot at the time SetSavePoint() was called. + std::unique_ptr>> + save_points_; + private: friend class WritePreparedTxn; // Extra data to be persisted with the commit. Note this is only used when // prepare phase is not skipped. WriteBatch commit_time_batch_; - // Stack of the Snapshot saved at each save point. Saved snapshots may be - // nullptr if there was no snapshot at the time SetSavePoint() was called. - std::unique_ptr>> save_points_; - // If true, future Put/Merge/Deletes will be indexed in the // WriteBatchWithIndex. // If false, future Put/Merge/Deletes will be inserted directly into the diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index c5f4db5bd..993c3b8b6 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -78,6 +78,8 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { } unprep_seqs_.clear(); + flushed_save_points_.reset(nullptr); + unflushed_save_points_.reset(nullptr); recovered_txn_ = false; largest_validated_seq_ = 0; } @@ -236,6 +238,20 @@ Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { } Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { + // If the current write batch contains savepoints, then some special handling + // is required so that RollbackToSavepoint can work. + // + // RollbackToSavepoint is not supported after Prepare() is called, so only do + // this for unprepared batches. + if (!prepared && unflushed_save_points_ != nullptr && + !unflushed_save_points_->empty()) { + return FlushWriteBatchWithSavePointToDB(); + } + + return FlushWriteBatchToDBInternal(prepared); +} + +Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) { if (name_.empty()) { return Status::InvalidArgument("Cannot write to DB without SetName."); } @@ -285,13 +301,118 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { // Reset transaction state. if (!prepared) { prepare_batch_cnt_ = 0; - write_batch_.Clear(); - WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + const bool kClear = true; + TransactionBaseImpl::InitWriteBatch(kClear); } return s; } +Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() { + assert(unflushed_save_points_ != nullptr && + unflushed_save_points_->size() > 0); + assert(save_points_ != nullptr && save_points_->size() > 0); + assert(save_points_->size() >= unflushed_save_points_->size()); + + // Handler class for creating an unprepared batch from a savepoint. + struct SavePointBatchHandler : public WriteBatch::Handler { + WriteBatchWithIndex* wb_; + const std::map& handles_; + + SavePointBatchHandler( + WriteBatchWithIndex* wb, + const std::map& handles) + : wb_(wb), handles_(handles) {} + + Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override { + return wb_->Put(handles_.at(cf), key, value); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return wb_->Delete(handles_.at(cf), key); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return wb_->SingleDelete(handles_.at(cf), key); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override { + return wb_->Merge(handles_.at(cf), key, value); + } + + // The only expected 2PC marker is the initial Noop marker. + Status MarkNoop(bool empty_batch) override { + return empty_batch ? Status::OK() : Status::InvalidArgument(); + } + + Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } + + Status MarkEndPrepare(const Slice&) override { + return Status::InvalidArgument(); + } + + Status MarkCommit(const Slice&) override { + return Status::InvalidArgument(); + } + + Status MarkRollback(const Slice&) override { + return Status::InvalidArgument(); + } + }; + + // The comparator of the default cf is passed in, similar to the + // initialization of TransactionBaseImpl::write_batch_. This comparator is + // only used if the write batch encounters an invalid cf id, and falls back to + // this comparator. + WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0, + true, 0); + // Swap with write_batch_ so that wb contains the complete write batch. The + // actual write batch that will be flushed to DB will be built in + // write_batch_, and will be read by FlushWriteBatchToDBInternal. + std::swap(wb, write_batch_); + TransactionBaseImpl::InitWriteBatch(); + + size_t prev_boundary = WriteBatchInternal::kHeader; + const bool kPrepared = true; + for (size_t i = 0; i < unflushed_save_points_->size(); i++) { + SavePointBatchHandler sp_handler(&write_batch_, + *wupt_db_->GetCFHandleMap().get()); + size_t curr_boundary = (*unflushed_save_points_)[i]; + + // Construct the partial write batch up to the savepoint. + // + // Theoretically, a memcpy between the write batches should be sufficient + // since the rewriting into the batch should produce the exact same byte + // representation. Rebuilding the WriteBatchWithIndex index is still + // necessary though, and would imply doing two passes over the batch though. + Status s = WriteBatchInternal::Iterate(wb.GetWriteBatch(), &sp_handler, + prev_boundary, curr_boundary); + if (!s.ok()) { + return s; + } + + // Flush the write batch. + s = FlushWriteBatchToDBInternal(!kPrepared); + if (!s.ok()) { + return s; + } + + if (flushed_save_points_ == nullptr) { + flushed_save_points_.reset( + new autovector()); + } + flushed_save_points_->emplace_back( + unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot())); + + prev_boundary = curr_boundary; + const bool kClear = true; + TransactionBaseImpl::InitWriteBatch(kClear); + } + + unflushed_save_points_->clear(); + return Status::OK(); +} + Status WriteUnpreparedTxn::PrepareInternal() { const bool kPrepared = true; return FlushWriteBatchToDB(kPrepared); @@ -379,6 +500,8 @@ Status WriteUnpreparedTxn::CommitInternal() { wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); } unprep_seqs_.clear(); + flushed_save_points_.reset(nullptr); + unflushed_save_points_.reset(nullptr); return s; } // else do the 2nd write to publish seq @@ -410,6 +533,8 @@ Status WriteUnpreparedTxn::CommitInternal() { wpt_db_->RemovePrepared(seq.first, seq.second); } unprep_seqs_.clear(); + flushed_save_points_.reset(nullptr); + unflushed_save_points_.reset(nullptr); return s; } @@ -488,6 +613,8 @@ Status WriteUnpreparedTxn::RollbackInternal() { wpt_db_->RemovePrepared(seq.first, seq.second); } unprep_seqs_.clear(); + flushed_save_points_.reset(nullptr); + unflushed_save_points_.reset(nullptr); return s; } // else do the 2nd write for commit uint64_t& prepare_seq = seq_used; @@ -514,6 +641,8 @@ Status WriteUnpreparedTxn::RollbackInternal() { } unprep_seqs_.clear(); + flushed_save_points_.reset(nullptr); + unflushed_save_points_.reset(nullptr); return s; } @@ -524,6 +653,131 @@ void WriteUnpreparedTxn::Clear() { TransactionBaseImpl::Clear(); } +void WriteUnpreparedTxn::SetSavePoint() { + assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) + + (flushed_save_points_ ? flushed_save_points_->size() : 0) == + (save_points_ ? save_points_->size() : 0)); + PessimisticTransaction::SetSavePoint(); + if (unflushed_save_points_ == nullptr) { + unflushed_save_points_.reset(new autovector()); + } + unflushed_save_points_->push_back(write_batch_.GetDataSize()); +} + +Status WriteUnpreparedTxn::RollbackToSavePoint() { + assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) + + (flushed_save_points_ ? flushed_save_points_->size() : 0) == + (save_points_ ? save_points_->size() : 0)); + if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) { + Status s = PessimisticTransaction::RollbackToSavePoint(); + assert(!s.IsNotFound()); + unflushed_save_points_->pop_back(); + return s; + } + + if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) { + return RollbackToSavePointInternal(); + } + + return Status::NotFound(); +} + +Status WriteUnpreparedTxn::RollbackToSavePointInternal() { + Status s; + + const bool kClear = true; + TransactionBaseImpl::InitWriteBatch(kClear); + + assert(flushed_save_points_->size() > 0); + WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back(); + + assert(top.unprep_seqs_.size() > 0); + assert(save_points_ != nullptr && save_points_->size() > 0); + const TransactionKeyMap& tracked_keys = save_points_->top().new_keys_; + + // TODO(lth): Reduce duplicate code with RollbackInternal logic. + ReadOptions roptions; + roptions.snapshot = top.snapshot_->snapshot(); + SequenceNumber min_uncommitted = + static_cast_with_check( + roptions.snapshot) + ->min_uncommitted_; + SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber(); + WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, + top.unprep_seqs_); + const auto& cf_map = *wupt_db_->GetCFHandleMap(); + for (const auto& cfkey : tracked_keys) { + const auto cfid = cfkey.first; + const auto& keys = cfkey.second; + + for (const auto& pair : keys) { + const auto& key = pair.first; + const auto& cf_handle = cf_map.at(cfid); + PinnableSlice pinnable_val; + bool not_used; + s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, + &callback); + + if (s.ok()) { + s = write_batch_.Put(cf_handle, key, pinnable_val); + assert(s.ok()); + } else if (s.IsNotFound()) { + s = write_batch_.Delete(cf_handle, key); + assert(s.ok()); + } else { + return s; + } + } + } + + const bool kPrepared = true; + s = FlushWriteBatchToDBInternal(!kPrepared); + assert(s.ok()); + if (!s.ok()) { + return s; + } + + // PessimisticTransaction::RollbackToSavePoint will call also call + // RollbackToSavepoint on write_batch_. However, write_batch_ is empty and has + // no savepoints because this savepoint has already been flushed. Work around + // this by setting a fake savepoint. + write_batch_.SetSavePoint(); + s = PessimisticTransaction::RollbackToSavePoint(); + assert(s.ok()); + if (!s.ok()) { + return s; + } + + flushed_save_points_->pop_back(); + return s; +} + +Status WriteUnpreparedTxn::PopSavePoint() { + assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) + + (flushed_save_points_ ? flushed_save_points_->size() : 0) == + (save_points_ ? save_points_->size() : 0)); + if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) { + Status s = PessimisticTransaction::PopSavePoint(); + assert(!s.IsNotFound()); + unflushed_save_points_->pop_back(); + return s; + } + + if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) { + // PessimisticTransaction::PopSavePoint will call also call PopSavePoint on + // write_batch_. However, write_batch_ is empty and has no savepoints + // because this savepoint has already been flushed. Work around this by + // setting a fake savepoint. + write_batch_.SetSavePoint(); + Status s = PessimisticTransaction::PopSavePoint(); + assert(!s.IsNotFound()); + flushed_save_points_->pop_back(); + return s; + } + + return Status::NotFound(); +} + void WriteUnpreparedTxn::MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family, const size_t num_keys, const Slice* keys, diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 77c180338..774d90e8d 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -73,7 +73,6 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { wup_snapshot_ = seq; } - private: static SequenceNumber CalcMaxVisibleSeq( const std::map& unprep_seqs, SequenceNumber snapshot_seq) { @@ -84,6 +83,8 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { } return std::max(max_unprepared, snapshot_seq); } + + private: WritePreparedTxnDB* db_; const std::map& unprep_seqs_; SequenceNumber wup_snapshot_; @@ -139,6 +140,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn { void Clear() override; + void SetSavePoint() override; + Status RollbackToSavePoint() override; + Status PopSavePoint() override; + // Get and GetIterator needs to be overridden so that a ReadCallback to // handle read-your-own-write is used. using Transaction::Get; @@ -172,6 +177,9 @@ class WriteUnpreparedTxn : public WritePreparedTxn { Status MaybeFlushWriteBatchToDB(); Status FlushWriteBatchToDB(bool prepared); + Status FlushWriteBatchToDBInternal(bool prepared); + Status FlushWriteBatchWithSavePointToDB(); + Status RollbackToSavePointInternal(); Status HandleWrite(std::function do_write); // For write unprepared, we check on every writebatch append to see if @@ -210,6 +218,56 @@ class WriteUnpreparedTxn : public WritePreparedTxn { // but in some cases, we should be able to restore the previously largest // value when calling RollbackToSavepoint. SequenceNumber largest_validated_seq_; + + struct SavePoint { + // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is + // used during RollbackToSavepoint to determine visibility when restoring + // old values. + // + // TODO(lth): Since all unprep_seqs_ sets further down the stack must be + // subsets, this can potentially be deduplicated by just storing set + // difference. Investigate if this is worth it. + std::map unprep_seqs_; + + // This snapshot will be used to read keys at this savepoint if we call + // RollbackToSavePoint. + std::unique_ptr snapshot_; + + SavePoint(const std::map& seqs, + ManagedSnapshot* snapshot) + : unprep_seqs_(seqs), snapshot_(snapshot){}; + }; + + // We have 3 data structures holding savepoint information: + // 1. TransactionBaseImpl::save_points_ + // 2. WriteUnpreparedTxn::flushed_save_points_ + // 3. WriteUnpreparecTxn::unflushed_save_points_ + // + // TransactionBaseImpl::save_points_ holds information about all write + // batches, including the current in-memory write_batch_, or unprepared + // batches that have been written out. Its responsibility is just to track + // which keys have been modified in every savepoint. + // + // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints + // set on unprepared batches that have already flushed. It holds the snapshot + // and unprep_seqs at that savepoint, so that the rollback process can + // determine which keys were visible at that point in time. + // + // WriteUnpreparecTxn::unflushed_save_points_ holds information about + // savepoints on the current in-memory write_batch_. It simply records the + // size of the write batch at every savepoint. + // + // TODO(lth): Remove the redundancy between save_point_boundaries_ and + // write_batch_.save_points_. + // + // Based on this information, here are some invariants: + // size(unflushed_save_points_) = size(write_batch_.save_points_) + // size(flushed_save_points_) + size(unflushed_save_points_) + // = size(save_points_) + // + std::unique_ptr> + flushed_save_points_; + std::unique_ptr> unflushed_save_points_; }; } // namespace rocksdb diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 875d54167..4381619e7 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -279,8 +279,8 @@ Status WriteUnpreparedTxnDB::Initialize( } } - wupt->write_batch_.Clear(); - WriteBatchInternal::InsertNoop(wupt->write_batch_.GetWriteBatch()); + const bool kClear = true; + wupt->InitWriteBatch(kClear); real_trx->SetState(Transaction::PREPARED); if (!s.ok()) { diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index cf17abf22..3ffa2e0c6 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -627,6 +627,11 @@ WriteBatchWithIndex::WriteBatchWithIndex( WriteBatchWithIndex::~WriteBatchWithIndex() {} +WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex&&) = default; + +WriteBatchWithIndex& WriteBatchWithIndex::operator=(WriteBatchWithIndex&&) = + default; + WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }