diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 04274866a..26efd51b3 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -317,6 +317,12 @@ class TransactionBaseImpl : public Transaction { // Records writes pending in this transaction WriteBatchWithIndex write_batch_; + // Map from column_family_id to map of keys that are involved in this + // transaction. + // For Pessimistic Transactions this is the list of locked keys. + // Optimistic Transactions will wait till commit time to do conflict checking. + TransactionKeyMap tracked_keys_; + private: friend class WritePreparedTxn; // Extra data to be persisted with the commit. Note this is only used when @@ -327,12 +333,6 @@ class TransactionBaseImpl : public Transaction { // nullptr if there was no snapshot at the time SetSavePoint() was called. std::unique_ptr>> save_points_; - // Map from column_family_id to map of keys that are involved in this - // transaction. - // For Pessimistic Transactions this is the list of locked keys. - // Optimistic Transactions will wait till commit time to do conflict checking. - TransactionKeyMap tracked_keys_; - // If true, future Put/Merge/Deletes will be indexed in the // WriteBatchWithIndex. // If false, future Put/Merge/Deletes will be inserted directly into the diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 54d478c94..d127220e4 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -42,7 +42,9 @@ SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber( WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) - : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) { + : WritePreparedTxn(txn_db, write_options, txn_options), + wupt_db_(txn_db), + recovered_txn_(false) { max_write_batch_size_ = txn_options.max_write_batch_size; // We set max bytes to zero so that we don't get a memory limit error. // Instead of trying to keep write batch strictly under the size limit, we @@ -69,6 +71,12 @@ WriteUnpreparedTxn::~WriteUnpreparedTxn() { log_number_); } } + + // Call tracked_keys_.clear() so that ~PessimisticTransaction does not + // try to unlock keys for recovered transactions. + if (recovered_txn_) { + tracked_keys_.clear(); + } } void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { @@ -76,7 +84,7 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { max_write_batch_size_ = txn_options.max_write_batch_size; write_batch_.SetMaxBytes(0); unprep_seqs_.clear(); - write_set_keys_.clear(); + recovered_txn_ = false; } Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, @@ -148,6 +156,72 @@ Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); } +// WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For +// WriteUnprepared, the write batches have already been written into the +// database during WAL replay, so all we have to do is just to "retrack" the key +// so that rollbacks are possible. +// +// Calling TryLock instead of TrackKey is also possible, but as an optimization, +// recovered transactions do not hold locks on their keys. This follows the +// implementation in PessimisticTransactionDB::Initialize where we set +// skip_concurrency_control to true. +Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) { + struct TrackKeyHandler : public WriteBatch::Handler { + WriteUnpreparedTxn* txn_; + bool rollback_merge_operands_; + + TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands) + : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {} + + Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { + txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, + false /* read_only */, true /* exclusive */); + return Status::OK(); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, + false /* read_only */, true /* exclusive */); + return Status::OK(); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, + false /* read_only */, true /* exclusive */); + return Status::OK(); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { + if (rollback_merge_operands_) { + txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, + false /* read_only */, true /* exclusive */); + } + return Status::OK(); + } + + // Recovered batches do not contain 2PC markers. + Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } + + Status MarkEndPrepare(const Slice&) override { + return Status::InvalidArgument(); + } + + Status MarkNoop(bool) override { return Status::InvalidArgument(); } + + Status MarkCommit(const Slice&) override { + return Status::InvalidArgument(); + } + + Status MarkRollback(const Slice&) override { + return Status::InvalidArgument(); + } + }; + + TrackKeyHandler handler(this, + wupt_db_->txn_db_options_.rollback_merge_operands); + return wb->Iterate(&handler); +} + Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { const bool kPrepared = true; Status s; @@ -159,25 +233,11 @@ Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { return s; } -void WriteUnpreparedTxn::UpdateWriteKeySet(uint32_t cfid, const Slice& key) { - // TODO(lth): write_set_keys_ can just be a std::string instead of a vector. - write_set_keys_[cfid].push_back(key.ToString()); -} - Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { if (name_.empty()) { return Status::InvalidArgument("Cannot write to DB without SetName."); } - // Update write_key_set_ for rollback purposes. - KeySetBuilder keyset_handler( - this, wupt_db_->txn_db_options_.rollback_merge_operands); - auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&keyset_handler); - assert(s.ok()); - if (!s.ok()) { - return s; - } - // TODO(lth): Reduce duplicate code with WritePrepared prepare logic. WriteOptions write_options = write_options_; write_options.disableWAL = false; @@ -204,10 +264,10 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { // WriteImpl should not overwrite that value, so set log_used to nullptr if // log_number_ is already set. uint64_t* log_used = log_number_ ? nullptr : &log_number_; - s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), - /*callback*/ nullptr, log_used, /*log ref*/ - 0, !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_, - &add_prepared_callback); + auto s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + /*callback*/ nullptr, log_used, /*log ref*/ + 0, !DISABLE_MEMTABLE, &seq_used, + prepare_batch_cnt_, &add_prepared_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); auto prepare_seq = seq_used; @@ -317,7 +377,6 @@ Status WriteUnpreparedTxn::CommitInternal() { wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); } unprep_seqs_.clear(); - write_set_keys_.clear(); return s; } // else do the 2nd write to publish seq @@ -349,7 +408,6 @@ Status WriteUnpreparedTxn::CommitInternal() { wpt_db_->RemovePrepared(seq.first, seq.second); } unprep_seqs_.clear(); - write_set_keys_.clear(); return s; } @@ -359,19 +417,21 @@ Status WriteUnpreparedTxn::RollbackInternal() { wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0); assert(GetId() != kMaxSequenceNumber); assert(GetId() > 0); + Status s; const auto& cf_map = *wupt_db_->GetCFHandleMap(); auto read_at_seq = kMaxSequenceNumber; - Status s; ReadOptions roptions; // Note that we do not use WriteUnpreparedTxnReadCallback because we do not // need to read our own writes when reading prior versions of the key for // rollback. + const auto& tracked_keys = GetTrackedKeys(); WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq); - for (const auto& cfkey : write_set_keys_) { + for (const auto& cfkey : tracked_keys) { const auto cfid = cfkey.first; const auto& keys = cfkey.second; - for (const auto& key : keys) { + for (const auto& pair : keys) { + const auto& key = pair.first; const auto& cf_handle = cf_map.at(cfid); PinnableSlice pinnable_val; bool not_used; @@ -426,7 +486,6 @@ Status WriteUnpreparedTxn::RollbackInternal() { wpt_db_->RemovePrepared(seq.first, seq.second); } unprep_seqs_.clear(); - write_set_keys_.clear(); return s; } // else do the 2nd write for commit uint64_t& prepare_seq = seq_used; @@ -453,10 +512,16 @@ Status WriteUnpreparedTxn::RollbackInternal() { } unprep_seqs_.clear(); - write_set_keys_.clear(); return s; } +void WriteUnpreparedTxn::Clear() { + if (!recovered_txn_) { + txn_db_impl_->UnLock(this, &GetTrackedKeys()); + } + TransactionBaseImpl::Clear(); +} + Status WriteUnpreparedTxn::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 751d36c23..15a76d134 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -94,20 +94,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn { const SliceParts& key, const bool assume_tracked = false) override; - virtual Status RebuildFromWriteBatch(WriteBatch*) override { - // This function was only useful for recovering prepared transactions, but - // is unused for write prepared because a transaction may consist of - // multiple write batches. - // - // If there are use cases outside of recovery that can make use of this, - // then support could be added. - return Status::NotSupported("Not supported for WriteUnprepared"); - } + virtual Status RebuildFromWriteBatch(WriteBatch*) override; const std::map& GetUnpreparedSequenceNumbers(); - void UpdateWriteKeySet(uint32_t cfid, const Slice& key); - protected: void Initialize(const TransactionOptions& txn_options) override; @@ -118,6 +108,8 @@ class WriteUnpreparedTxn : public WritePreparedTxn { Status RollbackInternal() override; + void Clear() override; + // Get and GetIterator needs to be overridden so that a ReadCallback to // handle read-your-own-write is used. using Transaction::Get; @@ -157,10 +149,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn { // commit callbacks. std::map unprep_seqs_; - // Set of keys that have written to that have already been written to DB - // (ie. not in write_batch_). - // - std::map> write_set_keys_; + // Recovered transactions have tracked_keys_ populated, but are not actually + // locked for efficiency reasons. For recovered transactions, skip unlocking + // keys when transaction ends. + bool recovered_txn_; }; } // namespace rocksdb diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 9382edfad..c4be058bb 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -252,12 +252,13 @@ Status WriteUnpreparedTxnDB::Initialize( assert(real_trx); auto wupt = static_cast_with_check(real_trx); + wupt->recovered_txn_ = true; real_trx->SetLogNumber(first_log_number); real_trx->SetId(first_seq); Status s = real_trx->SetName(recovered_trx->name_); if (!s.ok()) { - break; + return s; } wupt->prepare_batch_cnt_ = last_prepare_batch_cnt; @@ -270,12 +271,11 @@ Status WriteUnpreparedTxnDB::Initialize( ordered_seq_cnt[seq] = cnt; assert(wupt->unprep_seqs_.count(seq) == 0); wupt->unprep_seqs_[seq] = cnt; - KeySetBuilder keyset_handler(wupt, - txn_db_options_.rollback_merge_operands); - s = batch_info.batch_->Iterate(&keyset_handler); + + s = wupt->RebuildFromWriteBatch(batch_info.batch_); assert(s.ok()); if (!s.ok()) { - break; + return s; } } @@ -284,7 +284,7 @@ Status WriteUnpreparedTxnDB::Initialize( real_trx->SetState(Transaction::PREPARED); if (!s.ok()) { - break; + return s; } } // AddPrepared must be called in order @@ -397,29 +397,5 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options, return db_iter; } -Status KeySetBuilder::PutCF(uint32_t cf, const Slice& key, - const Slice& /*val*/) { - txn_->UpdateWriteKeySet(cf, key); - return Status::OK(); -} - -Status KeySetBuilder::DeleteCF(uint32_t cf, const Slice& key) { - txn_->UpdateWriteKeySet(cf, key); - return Status::OK(); -} - -Status KeySetBuilder::SingleDeleteCF(uint32_t cf, const Slice& key) { - txn_->UpdateWriteKeySet(cf, key); - return Status::OK(); -} - -Status KeySetBuilder::MergeCF(uint32_t cf, const Slice& key, - const Slice& /*val*/) { - if (rollback_merge_operands_) { - txn_->UpdateWriteKeySet(cf, key); - } - return Status::OK(); -} - } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_unprepared_txn_db.h b/utilities/transactions/write_unprepared_txn_db.h index 6405ba683..65cb4b919 100644 --- a/utilities/transactions/write_unprepared_txn_db.h +++ b/utilities/transactions/write_unprepared_txn_db.h @@ -144,32 +144,5 @@ class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback { SequenceNumber rollback_seq_; }; -struct KeySetBuilder : public WriteBatch::Handler { - WriteUnpreparedTxn* txn_; - bool rollback_merge_operands_; - - KeySetBuilder(WriteUnpreparedTxn* txn, bool rollback_merge_operands) - : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {} - - Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override; - - Status DeleteCF(uint32_t cf, const Slice& key) override; - - Status SingleDeleteCF(uint32_t cf, const Slice& key) override; - - Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override; - - // Recovered batches do not contain 2PC markers. - Status MarkNoop(bool) override { return Status::InvalidArgument(); } - Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } - Status MarkEndPrepare(const Slice&) override { - return Status::InvalidArgument(); - } - Status MarkCommit(const Slice&) override { return Status::InvalidArgument(); } - Status MarkRollback(const Slice&) override { - return Status::InvalidArgument(); - } -}; - } // namespace rocksdb #endif // ROCKSDB_LITE