From bdc056f8aa21b3bdae5f91821b273d80627f8392 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Mon, 7 Aug 2017 16:07:40 -0700 Subject: [PATCH] Refactor PessimisticTransaction Summary: This patch splits Commit and Prepare into lock-related logic and db-write-related logic. It moves lock-related logic to PessimisticTransaction to be reused by all children classes and movies the existing impl of db-write-related to PrepareInternal, CommitSingleInternal, and CommitInternal in WriteCommittedTxnImpl. Closes https://github.com/facebook/rocksdb/pull/2691 Differential Revision: D5569464 Pulled By: maysamyabandeh fbshipit-source-id: d1b8698e69801a4126c7bc211745d05c636f5325 --- CMakeLists.txt | 6 +- TARGETS | 6 +- db/db_impl.h | 6 +- src.mk | 6 +- utilities/blob_db/blob_db_impl.cc | 2 +- ...tion_impl.cc => optimistic_transaction.cc} | 26 ++-- ...action_impl.h => optimistic_transaction.h} | 18 +-- .../optimistic_transaction_db_impl.cc | 8 +- ...ion_impl.cc => pessimistic_transaction.cc} | 120 ++++++++++-------- ...ction_impl.h => pessimistic_transaction.h} | 47 ++++--- .../pessimistic_transaction_db.cc | 20 +-- .../transactions/pessimistic_transaction_db.h | 14 +- .../transactions/transaction_lock_mgr.cc | 16 +-- utilities/transactions/transaction_lock_mgr.h | 18 +-- ...nsaction_impl.cc => write_prepared_txn.cc} | 32 +++-- ...ransaction_impl.h => write_prepared_txn.h} | 24 ++-- 16 files changed, 197 insertions(+), 172 deletions(-) rename utilities/transactions/{optimistic_transaction_impl.cc => optimistic_transaction.cc} (83%) rename utilities/transactions/{optimistic_transaction_impl.h => optimistic_transaction.h} (82%) rename utilities/transactions/{transaction_impl.cc => pessimistic_transaction.cc} (82%) rename utilities/transactions/{transaction_impl.h => pessimistic_transaction.h} (86%) rename utilities/transactions/{write_prepared_transaction_impl.cc => write_prepared_txn.cc} (63%) rename utilities/transactions/{write_prepared_transaction_impl.h => write_prepared_txn.h} (77%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8209f30fe..b4b568b74 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -518,14 +518,14 @@ set(SOURCES utilities/spatialdb/spatial_db.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/transactions/optimistic_transaction_db_impl.cc - utilities/transactions/optimistic_transaction_impl.cc + utilities/transactions/optimistic_transaction.cc utilities/transactions/transaction_base.cc utilities/transactions/pessimistic_transaction_db.cc utilities/transactions/transaction_db_mutex_impl.cc - utilities/transactions/transaction_impl.cc + utilities/transactions/pessimistic_transaction.cc utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_util.cc - utilities/transactions/write_prepared_transaction_impl.cc + utilities/transactions/write_prepared_txn.cc utilities/ttl/db_ttl_impl.cc utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc diff --git a/TARGETS b/TARGETS index e52f50707..dcf2729e7 100644 --- a/TARGETS +++ b/TARGETS @@ -245,14 +245,14 @@ cpp_library( "utilities/spatialdb/spatial_db.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/transactions/optimistic_transaction_db_impl.cc", - "utilities/transactions/optimistic_transaction_impl.cc", + "utilities/transactions/optimistic_transaction.cc", "utilities/transactions/transaction_base.cc", "utilities/transactions/pessimistic_transaction_db.cc", "utilities/transactions/transaction_db_mutex_impl.cc", - "utilities/transactions/transaction_impl.cc", + "utilities/transactions/pessimistic_transaction.cc", "utilities/transactions/transaction_lock_mgr.cc", "utilities/transactions/transaction_util.cc", - "utilities/transactions/write_prepared_transaction_impl.cc", + "utilities/transactions/write_prepared_txn.cc", "utilities/ttl/db_ttl_impl.cc", "utilities/write_batch_with_index/write_batch_with_index.cc", "utilities/write_batch_with_index/write_batch_with_index_internal.cc", diff --git a/db/db_impl.h b/db/db_impl.h index d89ea50ca..d057f9345 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -631,9 +631,9 @@ class DBImpl : public DB { private: friend class DB; friend class InternalStats; - friend class PessimisticTxn; - friend class WriteCommittedTxnImpl; - friend class WritePreparedTxnImpl; + friend class PessimisticTransaction; + friend class WriteCommittedTxn; + friend class WritePreparedTxn; #ifndef ROCKSDB_LITE friend class ForwardIterator; #endif diff --git a/src.mk b/src.mk index 44c59fea7..30012d11f 100644 --- a/src.mk +++ b/src.mk @@ -193,14 +193,14 @@ LIB_SOURCES = \ utilities/spatialdb/spatial_db.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \ - utilities/transactions/optimistic_transaction_impl.cc \ + utilities/transactions/optimistic_transaction.cc \ utilities/transactions/transaction_base.cc \ utilities/transactions/pessimistic_transaction_db.cc \ utilities/transactions/transaction_db_mutex_impl.cc \ - utilities/transactions/transaction_impl.cc \ + utilities/transactions/pessimistic_transaction.cc \ utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_util.cc \ - utilities/transactions/write_prepared_transaction_impl.cc \ + utilities/transactions/write_prepared_txn.cc \ utilities/ttl/db_ttl_impl.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index_internal.cc \ diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 783c9d4ef..9e1623eb5 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -32,7 +32,7 @@ #include "util/random.h" #include "util/timer_queue.h" #include "utilities/transactions/optimistic_transaction_db_impl.h" -#include "utilities/transactions/optimistic_transaction_impl.h" +#include "utilities/transactions/optimistic_transaction.h" namespace { int kBlockBasedTableVersionFormat = 2; diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction.cc similarity index 83% rename from utilities/transactions/optimistic_transaction_impl.cc rename to utilities/transactions/optimistic_transaction.cc index 044dded23..882fbec4a 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction.cc @@ -5,11 +5,9 @@ #ifndef ROCKSDB_LITE -#include "utilities/transactions/optimistic_transaction_impl.h" +#include "utilities/transactions/optimistic_transaction.h" -#include #include -#include #include "db/column_family.h" #include "db/db_impl.h" @@ -25,40 +23,40 @@ namespace rocksdb { struct WriteOptions; -OptimisticTransactionImpl::OptimisticTransactionImpl( +OptimisticTransaction::OptimisticTransaction( OptimisticTransactionDB* txn_db, const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options) : TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_(txn_db) { Initialize(txn_options); } -void OptimisticTransactionImpl::Initialize( +void OptimisticTransaction::Initialize( const OptimisticTransactionOptions& txn_options) { if (txn_options.set_snapshot) { SetSnapshot(); } } -void OptimisticTransactionImpl::Reinitialize( +void OptimisticTransaction::Reinitialize( OptimisticTransactionDB* txn_db, const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options) { TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options); Initialize(txn_options); } -OptimisticTransactionImpl::~OptimisticTransactionImpl() { +OptimisticTransaction::~OptimisticTransaction() { } -void OptimisticTransactionImpl::Clear() { +void OptimisticTransaction::Clear() { TransactionBaseImpl::Clear(); } -Status OptimisticTransactionImpl::Prepare() { +Status OptimisticTransaction::Prepare() { return Status::InvalidArgument( "Two phase commit not supported for optimistic transactions."); } -Status OptimisticTransactionImpl::Commit() { +Status OptimisticTransaction::Commit() { // Set up callback which will call CheckTransactionForConflicts() to // check whether this transaction is safe to be committed. OptimisticTransactionCallback callback(this); @@ -75,7 +73,7 @@ Status OptimisticTransactionImpl::Commit() { return s; } -Status OptimisticTransactionImpl::Rollback() { +Status OptimisticTransaction::Rollback() { Clear(); return Status::OK(); } @@ -83,7 +81,7 @@ Status OptimisticTransactionImpl::Rollback() { // Record this key so that we can check it for conflicts at commit time. // // 'exclusive' is unused for OptimisticTransaction. -Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, +Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, bool exclusive, bool untracked) { if (untracked) { @@ -114,7 +112,7 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, // // Should only be called on writer thread in order to avoid any race conditions // in detecting write conflicts. -Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) { +Status OptimisticTransaction::CheckTransactionForConflicts(DB* db) { Status result; auto db_impl = static_cast_with_check(db); @@ -127,7 +125,7 @@ Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) { true /* cache_only */); } -Status OptimisticTransactionImpl::SetName(const TransactionName& name) { +Status OptimisticTransaction::SetName(const TransactionName& /* unused */) { return Status::InvalidArgument("Optimistic transactions cannot be named."); } diff --git a/utilities/transactions/optimistic_transaction_impl.h b/utilities/transactions/optimistic_transaction.h similarity index 82% rename from utilities/transactions/optimistic_transaction_impl.h rename to utilities/transactions/optimistic_transaction.h index 6baec6962..b49bd6ab9 100644 --- a/utilities/transactions/optimistic_transaction_impl.h +++ b/utilities/transactions/optimistic_transaction.h @@ -26,13 +26,13 @@ namespace rocksdb { -class OptimisticTransactionImpl : public TransactionBaseImpl { +class OptimisticTransaction : public TransactionBaseImpl { public: - OptimisticTransactionImpl(OptimisticTransactionDB* db, + OptimisticTransaction(OptimisticTransactionDB* db, const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options); - virtual ~OptimisticTransactionImpl(); + virtual ~OptimisticTransaction(); void Reinitialize(OptimisticTransactionDB* txn_db, const WriteOptions& write_options, @@ -67,20 +67,20 @@ class OptimisticTransactionImpl : public TransactionBaseImpl { void Clear() override; - void UnlockGetForUpdate(ColumnFamilyHandle* column_family, - const Slice& key) override { + void UnlockGetForUpdate(ColumnFamilyHandle* /* unused */, + const Slice& /* unused */) override { // Nothing to unlock. } // No copying allowed - OptimisticTransactionImpl(const OptimisticTransactionImpl&); - void operator=(const OptimisticTransactionImpl&); + OptimisticTransaction(const OptimisticTransaction&); + void operator=(const OptimisticTransaction&); }; // Used at commit time to trigger transaction validation class OptimisticTransactionCallback : public WriteCallback { public: - explicit OptimisticTransactionCallback(OptimisticTransactionImpl* txn) + explicit OptimisticTransactionCallback(OptimisticTransaction* txn) : txn_(txn) {} Status Callback(DB* db) override { @@ -90,7 +90,7 @@ class OptimisticTransactionCallback : public WriteCallback { bool AllowWriteBatching() override { return false; } private: - OptimisticTransactionImpl* txn_; + OptimisticTransaction* txn_; }; } // namespace rocksdb diff --git a/utilities/transactions/optimistic_transaction_db_impl.cc b/utilities/transactions/optimistic_transaction_db_impl.cc index 001ebefe1..d9db6fde0 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.cc +++ b/utilities/transactions/optimistic_transaction_db_impl.cc @@ -14,7 +14,7 @@ #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/optimistic_transaction_db.h" -#include "utilities/transactions/optimistic_transaction_impl.h" +#include "utilities/transactions/optimistic_transaction.h" namespace rocksdb { @@ -25,7 +25,7 @@ Transaction* OptimisticTransactionDBImpl::BeginTransaction( ReinitializeTransaction(old_txn, write_options, txn_options); return old_txn; } else { - return new OptimisticTransactionImpl(this, write_options, txn_options); + return new OptimisticTransaction(this, write_options, txn_options); } } @@ -81,8 +81,8 @@ Status OptimisticTransactionDB::Open( void OptimisticTransactionDBImpl::ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options) { - assert(dynamic_cast(txn) != nullptr); - auto txn_impl = reinterpret_cast(txn); + assert(dynamic_cast(txn) != nullptr); + auto txn_impl = reinterpret_cast(txn); txn_impl->Reinitialize(this, write_options, txn_options); } diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/pessimistic_transaction.cc similarity index 82% rename from utilities/transactions/transaction_impl.cc rename to utilities/transactions/pessimistic_transaction.cc index a2219e1a3..092b7132c 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -5,7 +5,7 @@ #ifndef ROCKSDB_LITE -#include "utilities/transactions/transaction_impl.h" +#include "utilities/transactions/pessimistic_transaction.h" #include #include @@ -29,13 +29,13 @@ namespace rocksdb { struct WriteOptions; -std::atomic PessimisticTxn::txn_id_counter_(1); +std::atomic PessimisticTransaction::txn_id_counter_(1); -TransactionID PessimisticTxn::GenTxnID() { +TransactionID PessimisticTransaction::GenTxnID() { return txn_id_counter_.fetch_add(1); } -PessimisticTxn::PessimisticTxn(TransactionDB* txn_db, +PessimisticTransaction::PessimisticTransaction(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) : TransactionBaseImpl(txn_db->GetRootDB(), write_options), @@ -53,7 +53,7 @@ PessimisticTxn::PessimisticTxn(TransactionDB* txn_db, Initialize(txn_options); } -void PessimisticTxn::Initialize(const TransactionOptions& txn_options) { +void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { txn_id_ = GenTxnID(); txn_state_ = STARTED; @@ -84,7 +84,7 @@ void PessimisticTxn::Initialize(const TransactionOptions& txn_options) { } } -PessimisticTxn::~PessimisticTxn() { +PessimisticTransaction::~PessimisticTransaction() { txn_db_impl_->UnLock(this, &GetTrackedKeys()); if (expiration_time_ > 0) { txn_db_impl_->RemoveExpirableTransaction(txn_id_); @@ -94,12 +94,12 @@ PessimisticTxn::~PessimisticTxn() { } } -void PessimisticTxn::Clear() { +void PessimisticTransaction::Clear() { txn_db_impl_->UnLock(this, &GetTrackedKeys()); TransactionBaseImpl::Clear(); } -void PessimisticTxn::Reinitialize(TransactionDB* txn_db, +void PessimisticTransaction::Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) { if (!name_.empty() && txn_state_ != COMMITED) { @@ -109,7 +109,7 @@ void PessimisticTxn::Reinitialize(TransactionDB* txn_db, Initialize(txn_options); } -bool PessimisticTxn::IsExpired() const { +bool PessimisticTransaction::IsExpired() const { if (expiration_time_ > 0) { if (db_->GetEnv()->NowMicros() >= expiration_time_) { // Transaction is expired. @@ -120,12 +120,12 @@ bool PessimisticTxn::IsExpired() const { return false; } -WriteCommittedTxnImpl::WriteCommittedTxnImpl( +WriteCommittedTxn::WriteCommittedTxn( TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) - : PessimisticTxn(txn_db, write_options, txn_options){}; + : PessimisticTransaction(txn_db, write_options, txn_options){}; -Status WriteCommittedTxnImpl::CommitBatch(WriteBatch* batch) { +Status WriteCommittedTxn::CommitBatch(WriteBatch* batch) { TransactionKeyMap keys_to_unlock; Status s = LockBatch(batch, &keys_to_unlock); @@ -163,7 +163,7 @@ Status WriteCommittedTxnImpl::CommitBatch(WriteBatch* batch) { return s; } -Status WriteCommittedTxnImpl::Prepare() { +Status PessimisticTransaction::Prepare() { Status s; if (name_.empty()) { @@ -192,12 +192,7 @@ Status WriteCommittedTxnImpl::Prepare() { txn_state_.store(AWAITING_PREPARE); // transaction can't expire after preparation expiration_time_ = 0; - WriteOptions write_options = write_options_; - write_options.disableWAL = false; - WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); - s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), - /*callback*/ nullptr, &log_number_, /*log ref*/ 0, - /* disable_memtable*/ true); + s = PrepareInternal(); if (s.ok()) { assert(log_number_ != 0); dbimpl_->MarkLogAsContainingPrepSection(log_number_); @@ -218,9 +213,20 @@ Status WriteCommittedTxnImpl::Prepare() { return s; } -Status WriteCommittedTxnImpl::Commit() { +Status WriteCommittedTxn::PrepareInternal() { + WriteOptions write_options = write_options_; + write_options.disableWAL = false; + WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); + Status s = + db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + /*callback*/ nullptr, &log_number_, /*log ref*/ 0, + /* disable_memtable*/ true); + return s; +} + +Status PessimisticTransaction::Commit() { Status s; - bool commit_single = false; + bool commit_without_prepare = false; bool commit_prepared = false; if (IsExpired()) { @@ -234,25 +240,28 @@ Status WriteCommittedTxnImpl::Commit() { // our locks stolen. In this case the only valid state is STARTED because // a state of PREPARED would have a cleared expiration_time_. TransactionState expected = STARTED; - commit_single = std::atomic_compare_exchange_strong(&txn_state_, &expected, - AWAITING_COMMIT); + commit_without_prepare = std::atomic_compare_exchange_strong( + &txn_state_, &expected, AWAITING_COMMIT); TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1"); } else if (txn_state_ == PREPARED) { // expiration and lock stealing is not a concern commit_prepared = true; } else if (txn_state_ == STARTED) { // expiration and lock stealing is not a concern - commit_single = true; + commit_without_prepare = true; + // TODO(myabandeh): what if the user mistakenly forgets prepare? We should + // add an option so that the user explictly express the intention of + // skipping the prepare phase. } - if (commit_single) { + if (commit_without_prepare) { assert(!commit_prepared); if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) { s = Status::InvalidArgument( "Commit-time batch contains values that will not be committed."); } else { txn_state_.store(AWAITING_COMMIT); - s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch()); + s = CommitWithoutPrepareInternal(); Clear(); if (s.ok()) { txn_state_.store(COMMITED); @@ -261,21 +270,8 @@ Status WriteCommittedTxnImpl::Commit() { } else if (commit_prepared) { txn_state_.store(AWAITING_COMMIT); - // We take the commit-time batch and append the Commit marker. - // The Memtable will ignore the Commit marker in non-recovery mode - WriteBatch* working_batch = GetCommitTimeWriteBatch(); - WriteBatchInternal::MarkCommit(working_batch, name_); + s = CommitInternal(); - // any operations appended to this working_batch will be ignored from WAL - working_batch->MarkWalTerminationPoint(); - - // insert prepared batch into Memtable only skipping WAL. - // Memtable will ignore BeginPrepare/EndPrepare markers - // in non recovery mode and simply insert the values - WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch()); - - s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, - log_number_); if (!s.ok()) { ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, "Commit write failed"); @@ -304,7 +300,31 @@ Status WriteCommittedTxnImpl::Commit() { return s; } -Status WriteCommittedTxnImpl::Rollback() { +Status WriteCommittedTxn::CommitWithoutPrepareInternal() { + Status s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch()); + return s; +} + +Status WriteCommittedTxn::CommitInternal() { + // We take the commit-time batch and append the Commit marker. + // The Memtable will ignore the Commit marker in non-recovery mode + WriteBatch* working_batch = GetCommitTimeWriteBatch(); + WriteBatchInternal::MarkCommit(working_batch, name_); + + // any operations appended to this working_batch will be ignored from WAL + working_batch->MarkWalTerminationPoint(); + + // insert prepared batch into Memtable only skipping WAL. + // Memtable will ignore BeginPrepare/EndPrepare markers + // in non recovery mode and simply insert the values + WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch()); + + auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, + log_number_); + return s; +} + +Status WriteCommittedTxn::Rollback() { Status s; if (txn_state_ == PREPARED) { WriteBatch rollback_marker; @@ -331,7 +351,7 @@ Status WriteCommittedTxnImpl::Rollback() { return s; } -Status PessimisticTxn::RollbackToSavePoint() { +Status PessimisticTransaction::RollbackToSavePoint() { if (txn_state_ != STARTED) { return Status::InvalidArgument("Transaction is beyond state for rollback."); } @@ -349,7 +369,7 @@ Status PessimisticTxn::RollbackToSavePoint() { // Lock all keys in this batch. // On success, caller should unlock keys_to_unlock -Status PessimisticTxn::LockBatch(WriteBatch* batch, +Status PessimisticTransaction::LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock) { class Handler : public WriteBatch::Handler { public: @@ -372,12 +392,12 @@ Status PessimisticTxn::LockBatch(WriteBatch* batch, } virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { + const Slice& /* unused */) override { RecordKey(column_family_id, key); return Status::OK(); } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { + const Slice& /* unused */) override { RecordKey(column_family_id, key); return Status::OK(); } @@ -427,7 +447,7 @@ Status PessimisticTxn::LockBatch(WriteBatch* batch, // If check_shapshot is true and this transaction has a snapshot set, // this key will only be locked if there have been no writes to this key since // the snapshot time. -Status PessimisticTxn::TryLock(ColumnFamilyHandle* column_family, +Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, bool exclusive, bool untracked) { uint32_t cfh_id = GetColumnFamilyID(column_family); @@ -515,7 +535,7 @@ Status PessimisticTxn::TryLock(ColumnFamilyHandle* column_family, // Return OK() if this key has not been modified more recently than the // transaction snapshot_. -Status PessimisticTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, +Status PessimisticTransaction::ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, SequenceNumber prev_seqno, SequenceNumber* new_seqno) { @@ -539,19 +559,19 @@ Status PessimisticTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, false /* cache_only */); } -bool PessimisticTxn::TryStealingLocks() { +bool PessimisticTransaction::TryStealingLocks() { assert(IsExpired()); TransactionState expected = STARTED; return std::atomic_compare_exchange_strong(&txn_state_, &expected, LOCKS_STOLEN); } -void PessimisticTxn::UnlockGetForUpdate(ColumnFamilyHandle* column_family, +void PessimisticTransaction::UnlockGetForUpdate(ColumnFamilyHandle* column_family, const Slice& key) { txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); } -Status PessimisticTxn::SetName(const TransactionName& name) { +Status PessimisticTransaction::SetName(const TransactionName& name) { Status s; if (txn_state_ == STARTED) { if (name_.length()) { diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/pessimistic_transaction.h similarity index 86% rename from utilities/transactions/transaction_impl.h rename to utilities/transactions/pessimistic_transaction.h index dce5c7b97..a0162fa27 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -31,24 +31,23 @@ namespace rocksdb { class PessimisticTransactionDB; -class PessimisticTxn; // A transaction under pessimistic concurrency control. This class implements // the locking API and interfaces with the lock manager as well as the // pessimistic transactional db. -class PessimisticTxn : public TransactionBaseImpl { +class PessimisticTransaction : public TransactionBaseImpl { public: - PessimisticTxn(TransactionDB* db, const WriteOptions& write_options, + PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options, const TransactionOptions& txn_options); - virtual ~PessimisticTxn(); + virtual ~PessimisticTransaction(); void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options); - Status Prepare() override = 0; + Status Prepare() override; - Status Commit() override = 0; + Status Commit() override; virtual Status CommitBatch(WriteBatch* batch) = 0; @@ -111,6 +110,12 @@ class PessimisticTxn : public TransactionBaseImpl { int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } protected: + virtual Status PrepareInternal() = 0; + + virtual Status CommitWithoutPrepareInternal() = 0; + + virtual Status CommitInternal() = 0; + void Initialize(const TransactionOptions& txn_options); Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); @@ -170,41 +175,43 @@ class PessimisticTxn : public TransactionBaseImpl { const Slice& key) override; // No copying allowed - PessimisticTxn(const PessimisticTxn&); - void operator=(const PessimisticTxn&); + PessimisticTransaction(const PessimisticTransaction&); + void operator=(const PessimisticTransaction&); }; -class WriteCommittedTxnImpl : public PessimisticTxn { +class WriteCommittedTxn : public PessimisticTransaction { public: - WriteCommittedTxnImpl(TransactionDB* db, const WriteOptions& write_options, + WriteCommittedTxn(TransactionDB* db, const WriteOptions& write_options, const TransactionOptions& txn_options); - virtual ~WriteCommittedTxnImpl() {} - - Status Prepare() override; - - Status Commit() override; + virtual ~WriteCommittedTxn() {} Status CommitBatch(WriteBatch* batch) override; Status Rollback() override; private: + Status PrepareInternal() override; + + Status CommitWithoutPrepareInternal() override; + + Status CommitInternal() override; + Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, SequenceNumber prev_seqno, SequenceNumber* new_seqno); // No copying allowed - WriteCommittedTxnImpl(const WriteCommittedTxnImpl&); - void operator=(const WriteCommittedTxnImpl&); + WriteCommittedTxn(const WriteCommittedTxn&); + void operator=(const WriteCommittedTxn&); }; // Used at commit time to check whether transaction is committing before its // expiration time. class TransactionCallback : public WriteCallback { public: - explicit TransactionCallback(PessimisticTxn* txn) : txn_(txn) {} + explicit TransactionCallback(PessimisticTransaction* txn) : txn_(txn) {} - Status Callback(DB* db) override { + Status Callback(DB* /* unused */) override { if (txn_->IsExpired()) { return Status::Expired(); } else { @@ -215,7 +222,7 @@ class TransactionCallback : public WriteCallback { bool AllowWriteBatching() override { return true; } private: - PessimisticTxn* txn_; + PessimisticTransaction* txn_; }; } // namespace rocksdb diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 052dc80f7..9787d76df 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -17,7 +17,7 @@ #include "rocksdb/utilities/transaction_db.h" #include "util/cast_util.h" #include "utilities/transactions/transaction_db_mutex_impl.h" -#include "utilities/transactions/transaction_impl.h" +#include "utilities/transactions/pessimistic_transaction.h" namespace rocksdb { @@ -128,7 +128,7 @@ Transaction* WriteCommittedTxnDB::BeginTransaction( ReinitializeTransaction(old_txn, write_options, txn_options); return old_txn; } else { - return new WriteCommittedTxnImpl(this, write_options, txn_options); + return new WriteCommittedTxn(this, write_options, txn_options); } } @@ -139,7 +139,7 @@ Transaction* WritePreparedTxnDB::BeginTransaction( ReinitializeTransaction(old_txn, write_options, txn_options); return old_txn; } else { - return new WritePreparedTxnImpl(this, write_options, txn_options); + return new WritePreparedTxn(this, write_options, txn_options); } } @@ -301,18 +301,18 @@ Status PessimisticTransactionDB::DropColumnFamily( return s; } -Status PessimisticTransactionDB::TryLock(PessimisticTxn* txn, uint32_t cfh_id, +Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key, bool exclusive) { return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); } -void PessimisticTransactionDB::UnLock(PessimisticTxn* txn, +void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys) { lock_mgr_.UnLock(txn, keys, GetEnv()); } -void PessimisticTransactionDB::UnLock(PessimisticTxn* txn, uint32_t cfh_id, +void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key) { lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); } @@ -409,7 +409,7 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts, Transaction* txn = BeginInternalTransaction(opts); txn->DisableIndexing(); - auto txn_impl = static_cast_with_check(txn); + auto txn_impl = static_cast_with_check(txn); // Since commitBatch sorts the keys before locking, concurrent Write() // operations will not cause a deadlock. @@ -423,7 +423,7 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts, } void PessimisticTransactionDB::InsertExpirableTransaction(TransactionID tx_id, - PessimisticTxn* tx) { + PessimisticTransaction* tx) { assert(tx->GetExpirationTime() > 0); std::lock_guard lock(map_mutex_); expirable_transactions_map_.insert({tx_id, tx}); @@ -442,14 +442,14 @@ bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks( if (tx_it == expirable_transactions_map_.end()) { return true; } - PessimisticTxn& tx = *(tx_it->second); + PessimisticTransaction& tx = *(tx_it->second); return tx.TryStealingLocks(); } void PessimisticTransactionDB::ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options) { - auto txn_impl = static_cast_with_check(txn); + auto txn_impl = static_cast_with_check(txn); txn_impl->Reinitialize(this, write_options, txn_options); } diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index d9cf7d558..6ff1d015a 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -15,9 +15,9 @@ #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" -#include "utilities/transactions/transaction_impl.h" +#include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/transaction_lock_mgr.h" -#include "utilities/transactions/write_prepared_transaction_impl.h" +#include "utilities/transactions/write_prepared_txn.h" namespace rocksdb { @@ -64,11 +64,11 @@ class PessimisticTransactionDB : public TransactionDB { using StackableDB::DropColumnFamily; virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; - Status TryLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key, + Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key, bool exclusive); - void UnLock(PessimisticTxn* txn, const TransactionKeyMap* keys); - void UnLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key); + void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys); + void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key); void AddColumnFamily(const ColumnFamilyHandle* handle); @@ -79,7 +79,7 @@ class PessimisticTransactionDB : public TransactionDB { return txn_db_options_; } - void InsertExpirableTransaction(TransactionID tx_id, PessimisticTxn* tx); + void InsertExpirableTransaction(TransactionID tx_id, PessimisticTransaction* tx); void RemoveExpirableTransaction(TransactionID tx_id); // If transaction is no longer available, locks can be stolen @@ -116,7 +116,7 @@ class PessimisticTransactionDB : public TransactionDB { // that has started a commit. Only transactions with an expiration time // should be in this map. std::mutex map_mutex_; - std::unordered_map + std::unordered_map expirable_transactions_map_; // map from name to two phase transaction instance diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 95612cd39..d93d5bcde 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -227,7 +227,7 @@ bool TransactionLockMgr::IsLockExpired(TransactionID txn_id, return expired; } -Status TransactionLockMgr::TryLock(PessimisticTxn* txn, +Status TransactionLockMgr::TryLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env, bool exclusive) { @@ -256,7 +256,7 @@ Status TransactionLockMgr::TryLock(PessimisticTxn* txn, // Helper function for TryLock(). Status TransactionLockMgr::AcquireWithTimeout( - PessimisticTxn* txn, LockMap* lock_map, LockMapStripe* stripe, + PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe, uint32_t column_family_id, const std::string& key, Env* env, int64_t timeout, const LockInfo& lock_info) { Status result; @@ -357,13 +357,13 @@ Status TransactionLockMgr::AcquireWithTimeout( } void TransactionLockMgr::DecrementWaiters( - const PessimisticTxn* txn, const autovector& wait_ids) { + const PessimisticTransaction* txn, const autovector& wait_ids) { std::lock_guard lock(wait_txn_map_mutex_); DecrementWaitersImpl(txn, wait_ids); } void TransactionLockMgr::DecrementWaitersImpl( - const PessimisticTxn* txn, const autovector& wait_ids) { + const PessimisticTransaction* txn, const autovector& wait_ids) { auto id = txn->GetID(); assert(wait_txn_map_.Contains(id)); wait_txn_map_.Delete(id); @@ -377,7 +377,7 @@ void TransactionLockMgr::DecrementWaitersImpl( } bool TransactionLockMgr::IncrementWaiters( - const PessimisticTxn* txn, const autovector& wait_ids) { + const PessimisticTransaction* txn, const autovector& wait_ids) { auto id = txn->GetID(); std::vector queue(txn->GetDeadlockDetectDepth()); std::lock_guard lock(wait_txn_map_mutex_); @@ -501,7 +501,7 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, return result; } -void TransactionLockMgr::UnLockKey(const PessimisticTxn* txn, +void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn, const std::string& key, LockMapStripe* stripe, LockMap* lock_map, Env* env) { @@ -537,7 +537,7 @@ void TransactionLockMgr::UnLockKey(const PessimisticTxn* txn, } } -void TransactionLockMgr::UnLock(PessimisticTxn* txn, uint32_t column_family_id, +void TransactionLockMgr::UnLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env) { std::shared_ptr lock_map_ptr = GetLockMap(column_family_id); LockMap* lock_map = lock_map_ptr.get(); @@ -559,7 +559,7 @@ void TransactionLockMgr::UnLock(PessimisticTxn* txn, uint32_t column_family_id, stripe->stripe_cv->NotifyAll(); } -void TransactionLockMgr::UnLock(const PessimisticTxn* txn, +void TransactionLockMgr::UnLock(const PessimisticTransaction* txn, const TransactionKeyMap* key_map, Env* env) { for (auto& key_map_iter : *key_map) { uint32_t column_family_id = key_map_iter.first; diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 86a65783f..6e542071c 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -17,7 +17,7 @@ #include "util/autovector.h" #include "util/hash_map.h" #include "util/thread_local.h" -#include "utilities/transactions/transaction_impl.h" +#include "utilities/transactions/pessimistic_transaction.h" namespace rocksdb { @@ -47,14 +47,14 @@ class TransactionLockMgr { // Attempt to lock key. If OK status is returned, the caller is responsible // for calling UnLock() on this key. - Status TryLock(PessimisticTxn* txn, uint32_t column_family_id, + Status TryLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env, bool exclusive); // Unlock a key locked by TryLock(). txn must be the same Transaction that // locked this key. - void UnLock(const PessimisticTxn* txn, const TransactionKeyMap* keys, + void UnLock(const PessimisticTransaction* txn, const TransactionKeyMap* keys, Env* env); - void UnLock(PessimisticTxn* txn, uint32_t column_family_id, + void UnLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env); using LockStatusData = std::unordered_multimap; @@ -102,7 +102,7 @@ class TransactionLockMgr { std::shared_ptr GetLockMap(uint32_t column_family_id); - Status AcquireWithTimeout(PessimisticTxn* txn, LockMap* lock_map, + Status AcquireWithTimeout(PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe, uint32_t column_family_id, const std::string& key, Env* env, int64_t timeout, const LockInfo& lock_info); @@ -112,14 +112,14 @@ class TransactionLockMgr { const LockInfo& lock_info, uint64_t* wait_time, autovector* txn_ids); - void UnLockKey(const PessimisticTxn* txn, const std::string& key, + void UnLockKey(const PessimisticTransaction* txn, const std::string& key, LockMapStripe* stripe, LockMap* lock_map, Env* env); - bool IncrementWaiters(const PessimisticTxn* txn, + bool IncrementWaiters(const PessimisticTransaction* txn, const autovector& wait_ids); - void DecrementWaiters(const PessimisticTxn* txn, + void DecrementWaiters(const PessimisticTransaction* txn, const autovector& wait_ids); - void DecrementWaitersImpl(const PessimisticTxn* txn, + void DecrementWaitersImpl(const PessimisticTransaction* txn, const autovector& wait_ids); // No copying allowed diff --git a/utilities/transactions/write_prepared_transaction_impl.cc b/utilities/transactions/write_prepared_txn.cc similarity index 63% rename from utilities/transactions/write_prepared_transaction_impl.cc rename to utilities/transactions/write_prepared_txn.cc index c018e9460..f3942855b 100644 --- a/utilities/transactions/write_prepared_transaction_impl.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -5,56 +5,54 @@ #ifndef ROCKSDB_LITE -#include "utilities/transactions/write_prepared_transaction_impl.h" +#include "utilities/transactions/write_prepared_txn.h" #include -#include -#include -#include #include "db/column_family.h" #include "db/db_impl.h" -#include "rocksdb/comparator.h" #include "rocksdb/db.h" -#include "rocksdb/snapshot.h" #include "rocksdb/status.h" #include "rocksdb/utilities/transaction_db.h" -#include "util/string_util.h" -#include "util/sync_point.h" #include "utilities/transactions/pessimistic_transaction_db.h" -#include "utilities/transactions/transaction_impl.h" -#include "utilities/transactions/transaction_util.h" +#include "utilities/transactions/pessimistic_transaction.h" namespace rocksdb { struct WriteOptions; -WritePreparedTxnImpl::WritePreparedTxnImpl( +WritePreparedTxn::WritePreparedTxn( TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) - : PessimisticTxn(txn_db, write_options, txn_options) { - PessimisticTxn::Initialize(txn_options); + : PessimisticTransaction(txn_db, write_options, txn_options) { + PessimisticTransaction::Initialize(txn_options); } -Status WritePreparedTxnImpl::CommitBatch(WriteBatch* batch) { +Status WritePreparedTxn::CommitBatch(WriteBatch* /* unused */) { // TODO(myabandeh) Implement this throw std::runtime_error("CommitBatch not Implemented"); return Status::OK(); } -Status WritePreparedTxnImpl::Prepare() { +Status WritePreparedTxn::PrepareInternal() { // TODO(myabandeh) Implement this throw std::runtime_error("Prepare not Implemented"); return Status::OK(); } -Status WritePreparedTxnImpl::Commit() { +Status WritePreparedTxn::CommitWithoutPrepareInternal() { // TODO(myabandeh) Implement this throw std::runtime_error("Commit not Implemented"); return Status::OK(); } -Status WritePreparedTxnImpl::Rollback() { +Status WritePreparedTxn::CommitInternal() { + // TODO(myabandeh) Implement this + throw std::runtime_error("Commit not Implemented"); + return Status::OK(); +} + +Status WritePreparedTxn::Rollback() { // TODO(myabandeh) Implement this throw std::runtime_error("Rollback not Implemented"); return Status::OK(); diff --git a/utilities/transactions/write_prepared_transaction_impl.h b/utilities/transactions/write_prepared_txn.h similarity index 77% rename from utilities/transactions/write_prepared_transaction_impl.h rename to utilities/transactions/write_prepared_txn.h index eab2b8669..c0feb2207 100644 --- a/utilities/transactions/write_prepared_transaction_impl.h +++ b/utilities/transactions/write_prepared_txn.h @@ -26,7 +26,7 @@ #include "rocksdb/utilities/write_batch_with_index.h" #include "util/autovector.h" #include "utilities/transactions/transaction_base.h" -#include "utilities/transactions/transaction_impl.h" +#include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/transaction_util.h" namespace rocksdb { @@ -35,24 +35,26 @@ class TransactionDBImpl; // This impl could write to DB also uncomitted data and then later tell apart // committed data from uncomitted data. Uncommitted data could be after the -// Prepare phase in 2PC (WritePreparedTxnImpl) or before that +// Prepare phase in 2PC (WritePreparedTxn) or before that // (WriteUnpreparedTxnImpl). -class WritePreparedTxnImpl : public PessimisticTxn { +class WritePreparedTxn : public PessimisticTransaction { public: - WritePreparedTxnImpl(TransactionDB* db, const WriteOptions& write_options, + WritePreparedTxn(TransactionDB* db, const WriteOptions& write_options, const TransactionOptions& txn_options); - virtual ~WritePreparedTxnImpl() {} - - Status Prepare() override; - - Status Commit() override; + virtual ~WritePreparedTxn() {} Status CommitBatch(WriteBatch* batch) override; Status Rollback() override; private: + Status PrepareInternal() override; + + Status CommitWithoutPrepareInternal() override; + + Status CommitInternal() override; + // TODO(myabandeh): verify that the current impl work with values being // written with prepare sequence number too. // Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& @@ -61,8 +63,8 @@ class WritePreparedTxnImpl : public PessimisticTxn { // new_seqno); // No copying allowed - WritePreparedTxnImpl(const WritePreparedTxnImpl&); - void operator=(const WritePreparedTxnImpl&); + WritePreparedTxn(const WritePreparedTxn&); + void operator=(const WritePreparedTxn&); }; } // namespace rocksdb