From c3d5c4d38ab65e9aef713a029cb166b194b960f2 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Thu, 3 Aug 2017 08:46:47 -0700 Subject: [PATCH] Refactor TransactionImpl Summary: This patch refactors TransactionImpl by separating the logic for pessimistic concurrency control from the implementation of how to write the data to rocksdb. The existing implementation is named WriteCommittedTxnImpl as it writes committed data to the db. A template named WritePreparedTxnImpl is also added which will be later completed to provide a an alternative implementation. Closes https://github.com/facebook/rocksdb/pull/2676 Differential Revision: D5549998 Pulled By: maysamyabandeh fbshipit-source-id: 16298e86b43ca4849324c1f35c731913c6d17bec --- CMakeLists.txt | 1 + db/db_impl.h | 4 +- src.mk | 1 + utilities/transactions/transaction_db_impl.cc | 16 ++-- utilities/transactions/transaction_db_impl.h | 11 ++- utilities/transactions/transaction_impl.cc | 75 +++++++++--------- utilities/transactions/transaction_impl.h | 78 ++++++++++++------- .../transactions/transaction_lock_mgr.cc | 16 ++-- utilities/transactions/transaction_lock_mgr.h | 16 ++-- .../write_prepared_transaction_impl.cc | 65 ++++++++++++++++ .../write_prepared_transaction_impl.h | 70 +++++++++++++++++ 11 files changed, 259 insertions(+), 94 deletions(-) create mode 100644 utilities/transactions/write_prepared_transaction_impl.cc create mode 100644 utilities/transactions/write_prepared_transaction_impl.h diff --git a/CMakeLists.txt b/CMakeLists.txt index bc8eb5d61..bd7a8fbe4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -525,6 +525,7 @@ set(SOURCES utilities/transactions/transaction_impl.cc utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_util.cc + utilities/transactions/write_prepared_transaction_impl.cc utilities/ttl/db_ttl_impl.cc utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc diff --git a/db/db_impl.h b/db/db_impl.h index 7fec69cd7..3284048a6 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -631,7 +631,9 @@ class DBImpl : public DB { private: friend class DB; friend class InternalStats; - friend class TransactionImpl; + friend class PessimisticTxn; + friend class WriteCommittedTxnImpl; + friend class WritePreparedTxnImpl; #ifndef ROCKSDB_LITE friend class ForwardIterator; #endif diff --git a/src.mk b/src.mk index 81d78eb36..0b0d4e6ab 100644 --- a/src.mk +++ b/src.mk @@ -200,6 +200,7 @@ LIB_SOURCES = \ utilities/transactions/transaction_impl.cc \ utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_util.cc \ + utilities/transactions/write_prepared_transaction_impl.cc \ utilities/ttl/db_ttl_impl.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index_internal.cc \ diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index 69b5bc1ea..bd43b585a 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -128,7 +128,7 @@ Transaction* TransactionDBImpl::BeginTransaction( ReinitializeTransaction(old_txn, write_options, txn_options); return old_txn; } else { - return new TransactionImpl(this, write_options, txn_options); + return new WriteCommittedTxnImpl(this, write_options, txn_options); } } @@ -266,17 +266,17 @@ Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { return s; } -Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id, +Status TransactionDBImpl::TryLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key, bool exclusive) { return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); } -void TransactionDBImpl::UnLock(TransactionImpl* txn, +void TransactionDBImpl::UnLock(PessimisticTxn* txn, const TransactionKeyMap* keys) { lock_mgr_.UnLock(txn, keys, GetEnv()); } -void TransactionDBImpl::UnLock(TransactionImpl* txn, uint32_t cfh_id, +void TransactionDBImpl::UnLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key) { lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); } @@ -372,7 +372,7 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { Transaction* txn = BeginInternalTransaction(opts); txn->DisableIndexing(); - auto txn_impl = static_cast_with_check(txn); + auto txn_impl = static_cast_with_check(txn); // Since commitBatch sorts the keys before locking, concurrent Write() // operations will not cause a deadlock. @@ -386,7 +386,7 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { } void TransactionDBImpl::InsertExpirableTransaction(TransactionID tx_id, - TransactionImpl* tx) { + PessimisticTxn* tx) { assert(tx->GetExpirationTime() > 0); std::lock_guard lock(map_mutex_); expirable_transactions_map_.insert({tx_id, tx}); @@ -405,14 +405,14 @@ bool TransactionDBImpl::TryStealingExpiredTransactionLocks( if (tx_it == expirable_transactions_map_.end()) { return true; } - TransactionImpl& tx = *(tx_it->second); + PessimisticTxn& tx = *(tx_it->second); return tx.TryStealingLocks(); } void TransactionDBImpl::ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options) { - auto txn_impl = static_cast_with_check(txn); + auto txn_impl = static_cast_with_check(txn); txn_impl->Reinitialize(this, write_options, txn_options); } diff --git a/utilities/transactions/transaction_db_impl.h b/utilities/transactions/transaction_db_impl.h index 428512e82..dfc13fbd7 100644 --- a/utilities/transactions/transaction_db_impl.h +++ b/utilities/transactions/transaction_db_impl.h @@ -63,11 +63,11 @@ class TransactionDBImpl : public TransactionDB { using StackableDB::DropColumnFamily; virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; - Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key, + Status TryLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key, bool exclusive); - void UnLock(TransactionImpl* txn, const TransactionKeyMap* keys); - void UnLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key); + void UnLock(PessimisticTxn* txn, const TransactionKeyMap* keys); + void UnLock(PessimisticTxn* txn, uint32_t cfh_id, const std::string& key); void AddColumnFamily(const ColumnFamilyHandle* handle); @@ -78,7 +78,7 @@ class TransactionDBImpl : public TransactionDB { return txn_db_options_; } - void InsertExpirableTransaction(TransactionID tx_id, TransactionImpl* tx); + void InsertExpirableTransaction(TransactionID tx_id, PessimisticTxn* tx); void RemoveExpirableTransaction(TransactionID tx_id); // If transaction is no longer available, locks can be stolen @@ -109,13 +109,12 @@ class TransactionDBImpl : public TransactionDB { // Must be held when adding/dropping column families. InstrumentedMutex column_family_mutex_; Transaction* BeginInternalTransaction(const WriteOptions& options); - Status WriteHelper(WriteBatch* updates, TransactionImpl* txn_impl); // Used to ensure that no locks are stolen from an expirable transaction // that has started a commit. Only transactions with an expiration time // should be in this map. std::mutex map_mutex_; - std::unordered_map + std::unordered_map expirable_transactions_map_; // map from name to two phase transaction instance diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index dd0c69be4..ececec6d5 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -29,31 +29,31 @@ namespace rocksdb { struct WriteOptions; -std::atomic TransactionImpl::txn_id_counter_(1); +std::atomic PessimisticTxn::txn_id_counter_(1); -TransactionID TransactionImpl::GenTxnID() { +TransactionID PessimisticTxn::GenTxnID() { return txn_id_counter_.fetch_add(1); } -TransactionImpl::TransactionImpl(TransactionDB* txn_db, - const WriteOptions& write_options, - const TransactionOptions& txn_options) +PessimisticTxn::PessimisticTxn(TransactionDB* txn_db, + const WriteOptions& write_options, + const TransactionOptions& txn_options) : TransactionBaseImpl(txn_db->GetRootDB(), write_options), txn_db_impl_(nullptr), + expiration_time_(0), txn_id_(0), waiting_cf_id_(0), waiting_key_(nullptr), - expiration_time_(0), lock_timeout_(0), deadlock_detect_(false), deadlock_detect_depth_(0) { txn_db_impl_ = static_cast_with_check(txn_db); - db_impl_ = static_cast_with_check(txn_db->GetRootDB()); + db_impl_ = static_cast_with_check(db_); Initialize(txn_options); } -void TransactionImpl::Initialize(const TransactionOptions& txn_options) { +void PessimisticTxn::Initialize(const TransactionOptions& txn_options) { txn_id_ = GenTxnID(); txn_state_ = STARTED; @@ -84,7 +84,7 @@ void TransactionImpl::Initialize(const TransactionOptions& txn_options) { } } -TransactionImpl::~TransactionImpl() { +PessimisticTxn::~PessimisticTxn() { txn_db_impl_->UnLock(this, &GetTrackedKeys()); if (expiration_time_ > 0) { txn_db_impl_->RemoveExpirableTransaction(txn_id_); @@ -94,14 +94,14 @@ TransactionImpl::~TransactionImpl() { } } -void TransactionImpl::Clear() { +void PessimisticTxn::Clear() { txn_db_impl_->UnLock(this, &GetTrackedKeys()); TransactionBaseImpl::Clear(); } -void TransactionImpl::Reinitialize(TransactionDB* txn_db, - const WriteOptions& write_options, - const TransactionOptions& txn_options) { +void PessimisticTxn::Reinitialize(TransactionDB* txn_db, + const WriteOptions& write_options, + const TransactionOptions& txn_options) { if (!name_.empty() && txn_state_ != COMMITED) { txn_db_impl_->UnregisterTransaction(this); } @@ -109,7 +109,7 @@ void TransactionImpl::Reinitialize(TransactionDB* txn_db, Initialize(txn_options); } -bool TransactionImpl::IsExpired() const { +bool PessimisticTxn::IsExpired() const { if (expiration_time_ > 0) { if (db_->GetEnv()->NowMicros() >= expiration_time_) { // Transaction is expired. @@ -120,7 +120,12 @@ bool TransactionImpl::IsExpired() const { return false; } -Status TransactionImpl::CommitBatch(WriteBatch* batch) { +WriteCommittedTxnImpl::WriteCommittedTxnImpl( + TransactionDB* txn_db, const WriteOptions& write_options, + const TransactionOptions& txn_options) + : PessimisticTxn(txn_db, write_options, txn_options){}; + +Status WriteCommittedTxnImpl::CommitBatch(WriteBatch* batch) { TransactionKeyMap keys_to_unlock; Status s = LockBatch(batch, &keys_to_unlock); @@ -158,7 +163,7 @@ Status TransactionImpl::CommitBatch(WriteBatch* batch) { return s; } -Status TransactionImpl::Prepare() { +Status WriteCommittedTxnImpl::Prepare() { Status s; if (name_.empty()) { @@ -213,7 +218,7 @@ Status TransactionImpl::Prepare() { return s; } -Status TransactionImpl::Commit() { +Status WriteCommittedTxnImpl::Commit() { Status s; bool commit_single = false; bool commit_prepared = false; @@ -299,7 +304,7 @@ Status TransactionImpl::Commit() { return s; } -Status TransactionImpl::Rollback() { +Status WriteCommittedTxnImpl::Rollback() { Status s; if (txn_state_ == PREPARED) { WriteBatch rollback_marker; @@ -326,7 +331,7 @@ Status TransactionImpl::Rollback() { return s; } -Status TransactionImpl::RollbackToSavePoint() { +Status PessimisticTxn::RollbackToSavePoint() { if (txn_state_ != STARTED) { return Status::InvalidArgument("Transaction is beyond state for rollback."); } @@ -344,8 +349,8 @@ Status TransactionImpl::RollbackToSavePoint() { // Lock all keys in this batch. // On success, caller should unlock keys_to_unlock -Status TransactionImpl::LockBatch(WriteBatch* batch, - TransactionKeyMap* keys_to_unlock) { +Status PessimisticTxn::LockBatch(WriteBatch* batch, + TransactionKeyMap* keys_to_unlock) { class Handler : public WriteBatch::Handler { public: // Sorted map of column_family_id to sorted set of keys. @@ -422,9 +427,9 @@ Status TransactionImpl::LockBatch(WriteBatch* batch, // If check_shapshot is true and this transaction has a snapshot set, // this key will only be locked if there have been no writes to this key since // the snapshot time. -Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, - const Slice& key, bool read_only, - bool exclusive, bool untracked) { +Status PessimisticTxn::TryLock(ColumnFamilyHandle* column_family, + const Slice& key, bool read_only, bool exclusive, + bool untracked) { uint32_t cfh_id = GetColumnFamilyID(column_family); std::string key_str = key.ToString(); bool previously_locked; @@ -510,10 +515,10 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, // Return OK() if this key has not been modified more recently than the // transaction snapshot_. -Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, - const Slice& key, - SequenceNumber prev_seqno, - SequenceNumber* new_seqno) { +Status PessimisticTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, + const Slice& key, + SequenceNumber prev_seqno, + SequenceNumber* new_seqno) { assert(snapshot_); SequenceNumber seq = snapshot_->GetSequenceNumber(); @@ -526,29 +531,27 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, *new_seqno = seq; - auto db_impl = static_cast_with_check(db_); - ColumnFamilyHandle* cfh = - column_family ? column_family : db_impl->DefaultColumnFamily(); + column_family ? column_family : db_impl_->DefaultColumnFamily(); - return TransactionUtil::CheckKeyForConflicts(db_impl, cfh, key.ToString(), + return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), snapshot_->GetSequenceNumber(), false /* cache_only */); } -bool TransactionImpl::TryStealingLocks() { +bool PessimisticTxn::TryStealingLocks() { assert(IsExpired()); TransactionState expected = STARTED; return std::atomic_compare_exchange_strong(&txn_state_, &expected, LOCKS_STOLEN); } -void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family, - const Slice& key) { +void PessimisticTxn::UnlockGetForUpdate(ColumnFamilyHandle* column_family, + const Slice& key) { txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); } -Status TransactionImpl::SetName(const TransactionName& name) { +Status PessimisticTxn::SetName(const TransactionName& name) { Status s; if (txn_state_ == STARTED) { if (name_.length()) { diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index 01f8f4b2a..8445b0a50 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -31,24 +31,28 @@ namespace rocksdb { class TransactionDBImpl; +class PessimisticTxn; -class TransactionImpl : public TransactionBaseImpl { +// A transaction under pessimistic concurrency control. This class implements +// the locking API and interfaces with the lock manager as well as the +// pessimistic transactional db. +class PessimisticTxn : public TransactionBaseImpl { public: - TransactionImpl(TransactionDB* db, const WriteOptions& write_options, - const TransactionOptions& txn_options); + PessimisticTxn(TransactionDB* db, const WriteOptions& write_options, + const TransactionOptions& txn_options); - virtual ~TransactionImpl(); + virtual ~PessimisticTxn(); void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options); - Status Prepare() override; + Status Prepare() override = 0; - Status Commit() override; + Status Commit() override = 0; - Status CommitBatch(WriteBatch* batch); + virtual Status CommitBatch(WriteBatch* batch) = 0; - Status Rollback() override; + Status Rollback() override = 0; Status RollbackToSavePoint() override; @@ -107,14 +111,24 @@ class TransactionImpl : public TransactionBaseImpl { int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } protected: + void Initialize(const TransactionOptions& txn_options); + + Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); + Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, bool exclusive, bool untracked = false) override; - private: + void Clear() override; + TransactionDBImpl* txn_db_impl_; DBImpl* db_impl_; + // If non-zero, this transaction should not be committed after this time (in + // microseconds according to Env->NowMicros()) + uint64_t expiration_time_; + + private: // Used to create unique ids for transactions. static std::atomic txn_id_counter_; @@ -140,10 +154,6 @@ class TransactionImpl : public TransactionBaseImpl { // Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_. mutable std::mutex wait_mutex_; - // If non-zero, this transaction should not be committed after this time (in - // microseconds according to Env->NowMicros()) - uint64_t expiration_time_; - // Timeout in microseconds when locking a key or -1 if there is no timeout. int64_t lock_timeout_; @@ -153,32 +163,46 @@ class TransactionImpl : public TransactionBaseImpl { // Whether to perform deadlock detection or not. int64_t deadlock_detect_depth_; - void Clear() override; - - void Initialize(const TransactionOptions& txn_options); - Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, SequenceNumber prev_seqno, SequenceNumber* new_seqno); - Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); - - Status DoCommit(WriteBatch* batch); - - void RollbackLastN(size_t num); - void UnlockGetForUpdate(ColumnFamilyHandle* column_family, const Slice& key) override; // No copying allowed - TransactionImpl(const TransactionImpl&); - void operator=(const TransactionImpl&); + PessimisticTxn(const PessimisticTxn&); + void operator=(const PessimisticTxn&); +}; + +class WriteCommittedTxnImpl : public PessimisticTxn { + public: + WriteCommittedTxnImpl(TransactionDB* db, const WriteOptions& write_options, + const TransactionOptions& txn_options); + + virtual ~WriteCommittedTxnImpl() {} + + Status Prepare() override; + + Status Commit() override; + + Status CommitBatch(WriteBatch* batch) override; + + Status Rollback() override; + + private: + Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, + SequenceNumber prev_seqno, SequenceNumber* new_seqno); + + // No copying allowed + WriteCommittedTxnImpl(const WriteCommittedTxnImpl&); + void operator=(const WriteCommittedTxnImpl&); }; // Used at commit time to check whether transaction is committing before its // expiration time. class TransactionCallback : public WriteCallback { public: - explicit TransactionCallback(TransactionImpl* txn) : txn_(txn) {} + explicit TransactionCallback(PessimisticTxn* txn) : txn_(txn) {} Status Callback(DB* db) override { if (txn_->IsExpired()) { @@ -191,7 +215,7 @@ class TransactionCallback : public WriteCallback { bool AllowWriteBatching() override { return true; } private: - TransactionImpl* txn_; + PessimisticTxn* txn_; }; } // namespace rocksdb diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 1184f667d..99e71eeb0 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -227,7 +227,7 @@ bool TransactionLockMgr::IsLockExpired(TransactionID txn_id, return expired; } -Status TransactionLockMgr::TryLock(TransactionImpl* txn, +Status TransactionLockMgr::TryLock(PessimisticTxn* txn, uint32_t column_family_id, const std::string& key, Env* env, bool exclusive) { @@ -256,7 +256,7 @@ Status TransactionLockMgr::TryLock(TransactionImpl* txn, // Helper function for TryLock(). Status TransactionLockMgr::AcquireWithTimeout( - TransactionImpl* txn, LockMap* lock_map, LockMapStripe* stripe, + PessimisticTxn* txn, LockMap* lock_map, LockMapStripe* stripe, uint32_t column_family_id, const std::string& key, Env* env, int64_t timeout, const LockInfo& lock_info) { Status result; @@ -357,13 +357,13 @@ Status TransactionLockMgr::AcquireWithTimeout( } void TransactionLockMgr::DecrementWaiters( - const TransactionImpl* txn, const autovector& wait_ids) { + const PessimisticTxn* txn, const autovector& wait_ids) { std::lock_guard lock(wait_txn_map_mutex_); DecrementWaitersImpl(txn, wait_ids); } void TransactionLockMgr::DecrementWaitersImpl( - const TransactionImpl* txn, const autovector& wait_ids) { + const PessimisticTxn* txn, const autovector& wait_ids) { auto id = txn->GetID(); assert(wait_txn_map_.Contains(id)); wait_txn_map_.Delete(id); @@ -377,7 +377,7 @@ void TransactionLockMgr::DecrementWaitersImpl( } bool TransactionLockMgr::IncrementWaiters( - const TransactionImpl* txn, const autovector& wait_ids) { + const PessimisticTxn* txn, const autovector& wait_ids) { auto id = txn->GetID(); std::vector queue(txn->GetDeadlockDetectDepth()); std::lock_guard lock(wait_txn_map_mutex_); @@ -501,7 +501,7 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, return result; } -void TransactionLockMgr::UnLockKey(const TransactionImpl* txn, +void TransactionLockMgr::UnLockKey(const PessimisticTxn* txn, const std::string& key, LockMapStripe* stripe, LockMap* lock_map, Env* env) { @@ -537,7 +537,7 @@ void TransactionLockMgr::UnLockKey(const TransactionImpl* txn, } } -void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, +void TransactionLockMgr::UnLock(PessimisticTxn* txn, uint32_t column_family_id, const std::string& key, Env* env) { std::shared_ptr lock_map_ptr = GetLockMap(column_family_id); LockMap* lock_map = lock_map_ptr.get(); @@ -559,7 +559,7 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id, stripe->stripe_cv->NotifyAll(); } -void TransactionLockMgr::UnLock(const TransactionImpl* txn, +void TransactionLockMgr::UnLock(const PessimisticTxn* txn, const TransactionKeyMap* key_map, Env* env) { for (auto& key_map_iter : *key_map) { uint32_t column_family_id = key_map_iter.first; diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 6389f8d7d..6c0d1e99d 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -47,14 +47,14 @@ class TransactionLockMgr { // Attempt to lock key. If OK status is returned, the caller is responsible // for calling UnLock() on this key. - Status TryLock(TransactionImpl* txn, uint32_t column_family_id, + Status TryLock(PessimisticTxn* txn, uint32_t column_family_id, const std::string& key, Env* env, bool exclusive); // Unlock a key locked by TryLock(). txn must be the same Transaction that // locked this key. - void UnLock(const TransactionImpl* txn, const TransactionKeyMap* keys, + void UnLock(const PessimisticTxn* txn, const TransactionKeyMap* keys, Env* env); - void UnLock(TransactionImpl* txn, uint32_t column_family_id, + void UnLock(PessimisticTxn* txn, uint32_t column_family_id, const std::string& key, Env* env); using LockStatusData = std::unordered_multimap; @@ -102,7 +102,7 @@ class TransactionLockMgr { std::shared_ptr GetLockMap(uint32_t column_family_id); - Status AcquireWithTimeout(TransactionImpl* txn, LockMap* lock_map, + Status AcquireWithTimeout(PessimisticTxn* txn, LockMap* lock_map, LockMapStripe* stripe, uint32_t column_family_id, const std::string& key, Env* env, int64_t timeout, const LockInfo& lock_info); @@ -112,14 +112,14 @@ class TransactionLockMgr { const LockInfo& lock_info, uint64_t* wait_time, autovector* txn_ids); - void UnLockKey(const TransactionImpl* txn, const std::string& key, + void UnLockKey(const PessimisticTxn* txn, const std::string& key, LockMapStripe* stripe, LockMap* lock_map, Env* env); - bool IncrementWaiters(const TransactionImpl* txn, + bool IncrementWaiters(const PessimisticTxn* txn, const autovector& wait_ids); - void DecrementWaiters(const TransactionImpl* txn, + void DecrementWaiters(const PessimisticTxn* txn, const autovector& wait_ids); - void DecrementWaitersImpl(const TransactionImpl* txn, + void DecrementWaitersImpl(const PessimisticTxn* txn, const autovector& wait_ids); // No copying allowed diff --git a/utilities/transactions/write_prepared_transaction_impl.cc b/utilities/transactions/write_prepared_transaction_impl.cc new file mode 100644 index 000000000..ded6bcb2b --- /dev/null +++ b/utilities/transactions/write_prepared_transaction_impl.cc @@ -0,0 +1,65 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/write_prepared_transaction_impl.h" + +#include +#include +#include +#include + +#include "db/column_family.h" +#include "db/db_impl.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/snapshot.h" +#include "rocksdb/status.h" +#include "rocksdb/utilities/transaction_db.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "utilities/transactions/transaction_db_impl.h" +#include "utilities/transactions/transaction_impl.h" +#include "utilities/transactions/transaction_util.h" + +namespace rocksdb { + +struct WriteOptions; + +WritePreparedTxnImpl::WritePreparedTxnImpl( + TransactionDB* txn_db, const WriteOptions& write_options, + const TransactionOptions& txn_options) + : PessimisticTxn(txn_db, write_options, txn_options) { + PessimisticTxn::Initialize(txn_options); +} + +Status WritePreparedTxnImpl::CommitBatch(WriteBatch* batch) { + // TODO(myabandeh) Implement this + throw std::runtime_error("CommitBatch not Implemented"); + return Status::OK(); +} + +Status WritePreparedTxnImpl::Prepare() { + // TODO(myabandeh) Implement this + throw std::runtime_error("Prepare not Implemented"); + return Status::OK(); +} + +Status WritePreparedTxnImpl::Commit() { + // TODO(myabandeh) Implement this + throw std::runtime_error("Commit not Implemented"); + return Status::OK(); +} + +Status WritePreparedTxnImpl::Rollback() { + // TODO(myabandeh) Implement this + throw std::runtime_error("Rollback not Implemented"); + return Status::OK(); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_prepared_transaction_impl.h b/utilities/transactions/write_prepared_transaction_impl.h new file mode 100644 index 000000000..eab2b8669 --- /dev/null +++ b/utilities/transactions/write_prepared_transaction_impl.h @@ -0,0 +1,70 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include +#include +#include +#include + +#include "db/write_callback.h" +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "rocksdb/snapshot.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "rocksdb/utilities/write_batch_with_index.h" +#include "util/autovector.h" +#include "utilities/transactions/transaction_base.h" +#include "utilities/transactions/transaction_impl.h" +#include "utilities/transactions/transaction_util.h" + +namespace rocksdb { + +class TransactionDBImpl; + +// This impl could write to DB also uncomitted data and then later tell apart +// committed data from uncomitted data. Uncommitted data could be after the +// Prepare phase in 2PC (WritePreparedTxnImpl) or before that +// (WriteUnpreparedTxnImpl). +class WritePreparedTxnImpl : public PessimisticTxn { + public: + WritePreparedTxnImpl(TransactionDB* db, const WriteOptions& write_options, + const TransactionOptions& txn_options); + + virtual ~WritePreparedTxnImpl() {} + + Status Prepare() override; + + Status Commit() override; + + Status CommitBatch(WriteBatch* batch) override; + + Status Rollback() override; + + private: + // TODO(myabandeh): verify that the current impl work with values being + // written with prepare sequence number too. + // Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& + // key, + // SequenceNumber prev_seqno, SequenceNumber* + // new_seqno); + + // No copying allowed + WritePreparedTxnImpl(const WritePreparedTxnImpl&); + void operator=(const WritePreparedTxnImpl&); +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE