From 20d1e547d109177c99c20f8baa70ae68e7acbe42 Mon Sep 17 00:00:00 2001 From: agiardullo Date: Fri, 21 Aug 2015 15:47:21 -0700 Subject: [PATCH] Common base class for transactions Summary: As I keep adding new features to transactions, I keep creating more duplicate code. This diff cleans this up by creating a base implementation class for Transaction and OptimisticTransaction to inherit from. The code in TransactionBase.h/.cc is all just copied from elsewhere. The only entertaining part of this class worth looking at is the virtual TryLock method which allows OptimisticTransactions and Transactions to share the same common code for Put/Get/etc. The rest of this diff is mostly red and easy on the eyes. Test Plan: No functionality change. existing tests pass. Reviewers: sdong, jkedgar, rven, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D45135 --- CMakeLists.txt | 1 + src.mk | 1 + .../optimistic_transaction_impl.cc | 223 +------------ .../optimistic_transaction_impl.h | 138 +-------- utilities/transactions/transaction_base.cc | 278 +++++++++++++++++ utilities/transactions/transaction_base.h | 183 +++++++++++ utilities/transactions/transaction_impl.cc | 293 ++---------------- utilities/transactions/transaction_impl.h | 149 +-------- 8 files changed, 503 insertions(+), 763 deletions(-) create mode 100644 utilities/transactions/transaction_base.cc create mode 100644 utilities/transactions/transaction_base.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d1408349..b3b7a9c5b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -219,6 +219,7 @@ set(SOURCES utilities/redis/redis_lists.cc utilities/spatialdb/spatial_db.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc + utilities/transactions/optimistic_transaction_base.cc utilities/transactions/optimistic_transaction_impl.cc utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/transaction_impl.cc diff --git a/src.mk b/src.mk index 8a6c4dc7f..6bb99f287 100644 --- a/src.mk +++ b/src.mk @@ -118,6 +118,7 @@ LIB_SOURCES = \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/transactions/optimistic_transaction_impl.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \ + utilities/transactions/transaction_base.cc \ utilities/transactions/transaction_db_impl.cc \ utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_impl.cc \ diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index f3f184d3e..a75732290 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -27,11 +27,7 @@ struct WriteOptions; OptimisticTransactionImpl::OptimisticTransactionImpl( OptimisticTransactionDB* txn_db, const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options) - : txn_db_(txn_db), - db_(txn_db->GetBaseDB()), - write_options_(write_options), - cmp_(txn_options.cmp), - write_batch_(new WriteBatchWithIndex(txn_options.cmp, 0, true)) { + : TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_(txn_db) { if (txn_options.set_snapshot) { SetSnapshot(); } @@ -46,10 +42,6 @@ void OptimisticTransactionImpl::Cleanup() { write_batch_->Clear(); } -void OptimisticTransactionImpl::SetSnapshot() { - snapshot_.reset(new ManagedSnapshot(db_)); -} - Status OptimisticTransactionImpl::Commit() { // Set up callback which will call CheckTransactionForConflicts() to // check whether this transaction is safe to be committed. @@ -77,34 +69,12 @@ void OptimisticTransactionImpl::Rollback() { Cleanup(); } -void OptimisticTransactionImpl::SetSavePoint() { - if (save_points_ == nullptr) { - save_points_.reset(new std::stack>()); - } - save_points_->push(snapshot_); - write_batch_->SetSavePoint(); -} - -Status OptimisticTransactionImpl::RollbackToSavePoint() { - if (save_points_ != nullptr && save_points_->size() > 0) { - // Restore saved snapshot - snapshot_ = save_points_->top(); - save_points_->pop(); - - // Rollback batch - Status s = write_batch_->RollbackToSavePoint(); - assert(s.ok()); - - return s; - } else { - assert(write_batch_->RollbackToSavePoint().IsNotFound()); - return Status::NotFound(); - } -} - // Record this key so that we can check it for conflicts at commit time. -void OptimisticTransactionImpl::RecordOperation( - ColumnFamilyHandle* column_family, const Slice& key) { +Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, + const Slice& key, bool untracked) { + if (untracked) { + return Status::OK(); + } uint32_t cfh_id = GetColumnFamilyID(column_family); SequenceNumber seq; @@ -128,190 +98,11 @@ void OptimisticTransactionImpl::RecordOperation( tracked_keys_[cfh_id][key_str] = seq; } } -} - -void OptimisticTransactionImpl::RecordOperation( - ColumnFamilyHandle* column_family, const SliceParts& key) { - size_t key_size = 0; - for (int i = 0; i < key.num_parts; ++i) { - key_size += key.parts[i].size(); - } - - std::string str; - str.reserve(key_size); - - for (int i = 0; i < key.num_parts; ++i) { - str.append(key.parts[i].data(), key.parts[i].size()); - } - - RecordOperation(column_family, str); -} - -Status OptimisticTransactionImpl::Get(const ReadOptions& read_options, - ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) { - return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key, - value); -} - -Status OptimisticTransactionImpl::GetForUpdate( - const ReadOptions& read_options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) { - // Regardless of whether the Get succeeded, track this key. - RecordOperation(column_family, key); - - if (value == nullptr) { - return Status::OK(); - } else { - return Get(read_options, column_family, key, value); - } -} - -std::vector OptimisticTransactionImpl::MultiGet( - const ReadOptions& read_options, - const std::vector& column_family, - const std::vector& keys, std::vector* values) { - // Regardless of whether the MultiGet succeeded, track these keys. - size_t num_keys = keys.size(); - values->resize(num_keys); - - // TODO(agiardullo): optimize multiget? - std::vector stat_list(num_keys); - for (size_t i = 0; i < num_keys; ++i) { - std::string* value = values ? &(*values)[i] : nullptr; - stat_list[i] = Get(read_options, column_family[i], keys[i], value); - } - - return stat_list; -} - -std::vector OptimisticTransactionImpl::MultiGetForUpdate( - const ReadOptions& read_options, - const std::vector& column_family, - const std::vector& keys, std::vector* values) { - // Regardless of whether the MultiGet succeeded, track these keys. - size_t num_keys = keys.size(); - values->resize(num_keys); - - // TODO(agiardullo): optimize multiget? - std::vector stat_list(num_keys); - for (size_t i = 0; i < num_keys; ++i) { - // Regardless of whether the Get succeeded, track this key. - RecordOperation(column_family[i], keys[i]); - - std::string* value = values ? &(*values)[i] : nullptr; - stat_list[i] = Get(read_options, column_family[i], keys[i], value); - } - - return stat_list; -} - -Iterator* OptimisticTransactionImpl::GetIterator( - const ReadOptions& read_options) { - Iterator* db_iter = db_->NewIterator(read_options); - assert(db_iter); - - return write_batch_->NewIteratorWithBase(db_iter); -} - -Iterator* OptimisticTransactionImpl::GetIterator( - const ReadOptions& read_options, ColumnFamilyHandle* column_family) { - Iterator* db_iter = db_->NewIterator(read_options, column_family); - assert(db_iter); - - return write_batch_->NewIteratorWithBase(column_family, db_iter); -} - -Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - RecordOperation(column_family, key); - - write_batch_->Put(column_family, key, value); + // Always return OK. Confilct checking will happen at commit time. return Status::OK(); } -Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family, - const SliceParts& key, - const SliceParts& value) { - RecordOperation(column_family, key); - - write_batch_->Put(column_family, key, value); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::Merge(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - RecordOperation(column_family, key); - - write_batch_->Merge(column_family, key, value); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family, - const Slice& key) { - RecordOperation(column_family, key); - - write_batch_->Delete(column_family, key); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) { - RecordOperation(column_family, key); - - write_batch_->Delete(column_family, key); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::PutUntracked( - ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - write_batch_->Put(column_family, key, value); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::PutUntracked( - ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) { - write_batch_->Put(column_family, key, value); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::MergeUntracked( - ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - write_batch_->Merge(column_family, key, value); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::DeleteUntracked( - ColumnFamilyHandle* column_family, const Slice& key) { - write_batch_->Delete(column_family, key); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::DeleteUntracked( - ColumnFamilyHandle* column_family, const SliceParts& key) { - write_batch_->Delete(column_family, key); - - return Status::OK(); -} - -void OptimisticTransactionImpl::PutLogData(const Slice& blob) { - write_batch_->PutLogData(blob); -} - -WriteBatchWithIndex* OptimisticTransactionImpl::GetWriteBatch() { - return write_batch_.get(); -} - // Returns OK if it is safe to commit this transaction. Returns Status::Busy // if there are read or write conflicts that would prevent us from committing OR // if we can not determine whether there would be any such conflicts. diff --git a/utilities/transactions/optimistic_transaction_impl.h b/utilities/transactions/optimistic_transaction_impl.h index c8f84c387..abbeb1ab9 100644 --- a/utilities/transactions/optimistic_transaction_impl.h +++ b/utilities/transactions/optimistic_transaction_impl.h @@ -21,11 +21,12 @@ #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/write_batch_with_index.h" +#include "utilities/transactions/transaction_base.h" #include "utilities/transactions/transaction_util.h" namespace rocksdb { -class OptimisticTransactionImpl : public Transaction { +class OptimisticTransactionImpl : public TransactionBaseImpl { public: OptimisticTransactionImpl(OptimisticTransactionDB* db, const WriteOptions& write_options, @@ -37,144 +38,21 @@ class OptimisticTransactionImpl : public Transaction { void Rollback() override; - void SetSavePoint() override; - - Status RollbackToSavePoint() override; - - Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) override; - - Status Get(const ReadOptions& options, const Slice& key, - std::string* value) override { - return Get(options, db_->DefaultColumnFamily(), key, value); - } - - Status GetForUpdate(const ReadOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; - - Status GetForUpdate(const ReadOptions& options, const Slice& key, - std::string* value) override { - return GetForUpdate(options, db_->DefaultColumnFamily(), key, value); - } - - std::vector MultiGet( - const ReadOptions& options, - const std::vector& column_family, - const std::vector& keys, - std::vector* values) override; - - std::vector MultiGet(const ReadOptions& options, - const std::vector& keys, - std::vector* values) override { - return MultiGet(options, std::vector( - keys.size(), db_->DefaultColumnFamily()), - keys, values); - } - - std::vector MultiGetForUpdate( - const ReadOptions& options, - const std::vector& column_family, - const std::vector& keys, - std::vector* values) override; - - std::vector MultiGetForUpdate( - const ReadOptions& options, const std::vector& keys, - std::vector* values) override { - return MultiGetForUpdate(options, - std::vector( - keys.size(), db_->DefaultColumnFamily()), - keys, values); - } - - Iterator* GetIterator(const ReadOptions& read_options) override; - Iterator* GetIterator(const ReadOptions& read_options, - ColumnFamilyHandle* column_family) override; - - Status Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status Put(const Slice& key, const Slice& value) override { - return Put(nullptr, key, value); - } - - Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; - Status Put(const SliceParts& key, const SliceParts& value) override { - return Put(nullptr, key, value); - } - - Status Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status Merge(const Slice& key, const Slice& value) override { - return Merge(nullptr, key, value); - } - - Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; - Status Delete(const Slice& key) override { return Delete(nullptr, key); } - Status Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } - - Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status PutUntracked(const Slice& key, const Slice& value) override { - return PutUntracked(nullptr, key, value); - } - - Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; - Status PutUntracked(const SliceParts& key, const SliceParts& value) override { - return PutUntracked(nullptr, key, value); - } - - Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status MergeUntracked(const Slice& key, const Slice& value) override { - return MergeUntracked(nullptr, key, value); - } - - Status DeleteUntracked(ColumnFamilyHandle* column_family, - const Slice& key) override; - Status DeleteUntracked(const Slice& key) override { - return DeleteUntracked(nullptr, key); - } - Status DeleteUntracked(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - Status DeleteUntracked(const SliceParts& key) override { - return DeleteUntracked(nullptr, key); - } - - void PutLogData(const Slice& blob) override; - const TransactionKeyMap* GetTrackedKeys() const { return &tracked_keys_; } - const Snapshot* GetSnapshot() const override { - return snapshot_ ? snapshot_->snapshot() : nullptr; - } - - void SetSnapshot() override; - - WriteBatchWithIndex* GetWriteBatch() override; - protected: - OptimisticTransactionDB* const txn_db_; - DB* db_; - const WriteOptions write_options_; - std::shared_ptr snapshot_; - const Comparator* cmp_; - std::unique_ptr write_batch_; + Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, + bool untracked = false) override; private: + OptimisticTransactionDB* const txn_db_; + // Map of Column Family IDs to keys and corresponding sequence numbers. // The sequence number stored for a key will be used during commit to make // sure this key has // not changed since this sequence number. TransactionKeyMap tracked_keys_; - // Stack of the Snapshot saved at each save point. Saved snapshots may be - // nullptr if there was no snapshot at the time SetSavePoint() was called. - std::unique_ptr>> save_points_; - friend class OptimisticTransactionCallback; // Returns OK if it is safe to commit this transaction. Returns Status::Busy @@ -184,10 +62,6 @@ class OptimisticTransactionImpl : public Transaction { // Should only be called on writer thread. Status CheckTransactionForConflicts(DB* db); - void RecordOperation(ColumnFamilyHandle* column_family, const Slice& key); - void RecordOperation(ColumnFamilyHandle* column_family, - const SliceParts& key); - void Cleanup(); // No copying allowed diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc new file mode 100644 index 000000000..88b8c3fd4 --- /dev/null +++ b/utilities/transactions/transaction_base.cc @@ -0,0 +1,278 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/transaction_base.h" + +#include "db/column_family.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/status.h" +#include "util/string_util.h" + +namespace rocksdb { + +TransactionBaseImpl::TransactionBaseImpl(DB* db, + const WriteOptions& write_options) + : db_(db), + write_options_(write_options), + cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), + write_batch_(new WriteBatchWithIndex(cmp_, 0, true)), + start_time_(db_->GetEnv()->NowMicros()) {} + +TransactionBaseImpl::~TransactionBaseImpl() {} + +void TransactionBaseImpl::SetSnapshot() { + snapshot_.reset(new ManagedSnapshot(db_)); +} + +Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, + const SliceParts& key, bool untracked) { + size_t key_size = 0; + for (int i = 0; i < key.num_parts; ++i) { + key_size += key.parts[i].size(); + } + + std::string str; + str.reserve(key_size); + + for (int i = 0; i < key.num_parts; ++i) { + str.append(key.parts[i].data(), key.parts[i].size()); + } + + return TryLock(column_family, str, untracked); +} + +void TransactionBaseImpl::SetSavePoint() { + if (save_points_ == nullptr) { + save_points_.reset(new std::stack>()); + } + save_points_->push(snapshot_); + write_batch_->SetSavePoint(); +} + +Status TransactionBaseImpl::RollbackToSavePoint() { + if (save_points_ != nullptr && save_points_->size() > 0) { + // Restore saved snapshot + snapshot_ = save_points_->top(); + save_points_->pop(); + + // Rollback batch + Status s = write_batch_->RollbackToSavePoint(); + assert(s.ok()); + + return s; + } else { + assert(write_batch_->RollbackToSavePoint().IsNotFound()); + return Status::NotFound(); + } +} + +Status TransactionBaseImpl::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, std::string* value) { + return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key, + value); +} + +Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, std::string* value) { + Status s = TryLock(column_family, key); + + if (s.ok() && value != nullptr) { + s = Get(read_options, column_family, key, value); + } + return s; +} + +std::vector TransactionBaseImpl::MultiGet( + const ReadOptions& read_options, + const std::vector& column_family, + const std::vector& keys, std::vector* values) { + size_t num_keys = keys.size(); + values->resize(num_keys); + + std::vector stat_list(num_keys); + for (size_t i = 0; i < num_keys; ++i) { + std::string* value = values ? &(*values)[i] : nullptr; + stat_list[i] = Get(read_options, column_family[i], keys[i], value); + } + + return stat_list; +} + +std::vector TransactionBaseImpl::MultiGetForUpdate( + const ReadOptions& read_options, + const std::vector& column_family, + const std::vector& keys, std::vector* values) { + // Regardless of whether the MultiGet succeeded, track these keys. + size_t num_keys = keys.size(); + values->resize(num_keys); + + // Lock all keys + for (size_t i = 0; i < num_keys; ++i) { + Status s = TryLock(column_family[i], keys[i]); + if (!s.ok()) { + // Fail entire multiget if we cannot lock all keys + return std::vector(num_keys, s); + } + } + + // TODO(agiardullo): optimize multiget? + std::vector stat_list(num_keys); + for (size_t i = 0; i < num_keys; ++i) { + std::string* value = values ? &(*values)[i] : nullptr; + stat_list[i] = Get(read_options, column_family[i], keys[i], value); + } + + return stat_list; +} + +Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) { + Iterator* db_iter = db_->NewIterator(read_options); + assert(db_iter); + + return write_batch_->NewIteratorWithBase(db_iter); +} + +Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) { + Iterator* db_iter = db_->NewIterator(read_options, column_family); + assert(db_iter); + + return write_batch_->NewIteratorWithBase(column_family, db_iter); +} + +Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + Status s = TryLock(column_family, key); + + if (s.ok()) { + write_batch_->Put(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, + const SliceParts& key, + const SliceParts& value) { + Status s = TryLock(column_family, key); + + if (s.ok()) { + write_batch_->Put(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + Status s = TryLock(column_family, key); + + if (s.ok()) { + write_batch_->Merge(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, + const Slice& key) { + Status s = TryLock(column_family, key); + + if (s.ok()) { + write_batch_->Delete(column_family, key); + } + + return s; +} + +Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + Status s = TryLock(column_family, key); + + if (s.ok()) { + write_batch_->Delete(column_family, key); + } + + return s; +} + +Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + bool untracked = true; + Status s = TryLock(column_family, key, untracked); + + if (s.ok()) { + write_batch_->Put(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key, + const SliceParts& value) { + bool untracked = true; + Status s = TryLock(column_family, key, untracked); + + if (s.ok()) { + write_batch_->Put(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, + const Slice& key, + const Slice& value) { + bool untracked = true; + Status s = TryLock(column_family, key, untracked); + + if (s.ok()) { + write_batch_->Merge(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) { + bool untracked = true; + Status s = TryLock(column_family, key, untracked); + + if (s.ok()) { + write_batch_->Delete(column_family, key); + } + + return s; +} + +Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key) { + bool untracked = true; + Status s = TryLock(column_family, key, untracked); + + if (s.ok()) { + write_batch_->Delete(column_family, key); + } + + return s; +} + +void TransactionBaseImpl::PutLogData(const Slice& blob) { + write_batch_->PutLogData(blob); +} + +WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() { + return write_batch_.get(); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h new file mode 100644 index 000000000..9cd324e57 --- /dev/null +++ b/utilities/transactions/transaction_base.h @@ -0,0 +1,183 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "rocksdb/snapshot.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "rocksdb/utilities/write_batch_with_index.h" + +namespace rocksdb { + +class TransactionBaseImpl : public Transaction { + public: + TransactionBaseImpl(DB* db, const WriteOptions& write_options); + + virtual ~TransactionBaseImpl(); + + // Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock + // returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed. + // untracked will be true if called from PutUntracked, DeleteUntracked, or + // MergeUntracked. + virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, + bool untracked = false) = 0; + + void SetSavePoint() override; + + Status RollbackToSavePoint() override; + + Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, std::string* value) override; + + Status Get(const ReadOptions& options, const Slice& key, + std::string* value) override { + return Get(options, db_->DefaultColumnFamily(), key, value); + } + + Status GetForUpdate(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) override; + + Status GetForUpdate(const ReadOptions& options, const Slice& key, + std::string* value) override { + return GetForUpdate(options, db_->DefaultColumnFamily(), key, value); + } + + std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, + std::vector* values) override; + + std::vector MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values) override { + return MultiGet(options, std::vector( + keys.size(), db_->DefaultColumnFamily()), + keys, values); + } + + std::vector MultiGetForUpdate( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, + std::vector* values) override; + + std::vector MultiGetForUpdate( + const ReadOptions& options, const std::vector& keys, + std::vector* values) override { + return MultiGetForUpdate(options, + std::vector( + keys.size(), db_->DefaultColumnFamily()), + keys, values); + } + + Iterator* GetIterator(const ReadOptions& read_options) override; + Iterator* GetIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) override; + + Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status Put(const Slice& key, const Slice& value) override { + return Put(nullptr, key, value); + } + + Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override; + Status Put(const SliceParts& key, const SliceParts& value) override { + return Put(nullptr, key, value); + } + + Status Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status Merge(const Slice& key, const Slice& value) override { + return Merge(nullptr, key, value); + } + + Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; + Status Delete(const Slice& key) override { return Delete(nullptr, key); } + Status Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } + + Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status PutUntracked(const Slice& key, const Slice& value) override { + return PutUntracked(nullptr, key, value); + } + + Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override; + Status PutUntracked(const SliceParts& key, const SliceParts& value) override { + return PutUntracked(nullptr, key, value); + } + + Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status MergeUntracked(const Slice& key, const Slice& value) override { + return MergeUntracked(nullptr, key, value); + } + + Status DeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) override; + Status DeleteUntracked(const Slice& key) override { + return DeleteUntracked(nullptr, key); + } + Status DeleteUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + Status DeleteUntracked(const SliceParts& key) override { + return DeleteUntracked(nullptr, key); + } + + void PutLogData(const Slice& blob) override; + + WriteBatchWithIndex* GetWriteBatch() override; + + const Snapshot* GetSnapshot() const override { + return snapshot_ ? snapshot_->snapshot() : nullptr; + } + + void SetSnapshot() override; + + protected: + DB* const db_; + + const WriteOptions write_options_; + + const Comparator* cmp_; + + // Records writes pending in this transaction + std::unique_ptr write_batch_; + + // Stores that time the txn was constructed, in microseconds. + const uint64_t start_time_; + + // Stores the current snapshot that was was set by SetSnapshot or null if + // no snapshot is currently set. + std::shared_ptr snapshot_; + + // Stack of the Snapshot saved at each save point. Saved snapshots may be + // nullptr if there was no snapshot at the time SetSavePoint() was called. + std::unique_ptr>> save_points_; + + private: + Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, + bool untracked = false); +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index d4a197e2a..cfaf0b2ac 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -36,18 +36,13 @@ TransactionID TransactionImpl::GenTxnID() { TransactionImpl::TransactionImpl(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) - : db_(txn_db), + : TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_impl_(nullptr), txn_id_(GenTxnID()), - write_options_(write_options), - cmp_(GetColumnFamilyUserComparator(txn_db->DefaultColumnFamily())), - write_batch_(new WriteBatchWithIndex(cmp_, 0, true)), - start_time_( - txn_options.expiration >= 0 ? db_->GetEnv()->NowMicros() / 1000 : 0), expiration_time_(txn_options.expiration >= 0 - ? start_time_ + txn_options.expiration - : 0), - lock_timeout_(txn_options.lock_timeout) { + ? start_time_ / 1000 + txn_options.expiration + : 0), + lock_timeout_(txn_options.lock_timeout) { txn_db_impl_ = dynamic_cast(txn_db); assert(txn_db_impl_); @@ -65,10 +60,6 @@ TransactionImpl::~TransactionImpl() { txn_db_impl_->UnLock(this, &tracked_keys_); } -void TransactionImpl::SetSnapshot() { - snapshot_.reset(new ManagedSnapshot(db_)); -} - void TransactionImpl::Cleanup() { write_batch_->Clear(); txn_db_impl_->UnLock(this, &tracked_keys_); @@ -112,10 +103,6 @@ Status TransactionImpl::Commit() { Status TransactionImpl::DoCommit(WriteBatch* batch) { Status s; - // Do write directly on base db as TransctionDB::Write() would attempt to - // do conflict checking that we've already done. - DB* db = db_->GetBaseDB(); - if (expiration_time_ > 0) { // We cannot commit a transaction that is expired as its locks might have // been released. @@ -123,11 +110,14 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) { // expiration time once we're on the writer thread. TransactionCallback callback(this); - assert(dynamic_cast(db) != nullptr); - auto db_impl = reinterpret_cast(db); + // Do write directly on base db as TransctionDB::Write() would attempt to + // do conflict checking that we've already done. + assert(dynamic_cast(db_) != nullptr); + auto db_impl = reinterpret_cast(db_); + s = db_impl->WriteWithCallback(write_options_, batch, &callback); } else { - s = db->Write(write_options_, batch); + s = db_->Write(write_options_, batch); } return s; @@ -135,31 +125,6 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) { void TransactionImpl::Rollback() { Cleanup(); } -void TransactionImpl::SetSavePoint() { - if (save_points_ == nullptr) { - save_points_.reset(new std::stack>()); - } - save_points_->push(snapshot_); - write_batch_->SetSavePoint(); -} - -Status TransactionImpl::RollbackToSavePoint() { - if (save_points_ != nullptr && save_points_->size() > 0) { - // Restore saved snapshot - snapshot_ = save_points_->top(); - save_points_->pop(); - - // Rollback batch - Status s = write_batch_->RollbackToSavePoint(); - assert(s.ok()); - - return s; - } else { - assert(write_batch_->RollbackToSavePoint().IsNotFound()); - return Status::NotFound(); - } -} - // Lock all keys in this batch. // On success, caller should unlock keys_to_unlock Status TransactionImpl::LockBatch(WriteBatch* batch, @@ -234,35 +199,26 @@ Status TransactionImpl::LockBatch(WriteBatch* batch, return s; } -Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, - const SliceParts& key, bool check_snapshot) { - size_t key_size = 0; - for (int i = 0; i < key.num_parts; ++i) { - key_size += key.parts[i].size(); - } - - std::string str; - str.reserve(key_size); - - for (int i = 0; i < key.num_parts; ++i) { - str.append(key.parts[i].data(), key.parts[i].size()); - } - - return TryLock(column_family, str, check_snapshot); -} - // Attempt to lock this key. // Returns OK if the key has been successfully locked. Non-ok, otherwise. // If check_shapshot is true and this transaction has a snapshot set, // this key will only be locked if there have been no writes to this key since // the snapshot time. Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, - const Slice& key, bool check_snapshot) { + const Slice& key, bool untracked) { uint32_t cfh_id = GetColumnFamilyID(column_family); std::string key_str = key.ToString(); bool previously_locked; Status s; + // Even though we do not care about doing conflict checking for this write, + // we still need to take a lock to make sure we do not cause a conflict with + // some other write. However, we do not need to check if there have been + // any writes since this transaction's snapshot. + // TODO(agiardullo): could optimize by supporting shared txn locks in the + // future + bool check_snapshot = !untracked; + // lock this key if this transactions hasn't already locked it auto iter = tracked_keys_[cfh_id].find(key_str); if (iter == tracked_keys_[cfh_id].end()) { @@ -327,8 +283,8 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family, const Slice& key) { Status result; if (snapshot_ != nullptr) { - assert(dynamic_cast(db_->GetBaseDB()) != nullptr); - auto db_impl = reinterpret_cast(db_->GetBaseDB()); + assert(dynamic_cast(db_) != nullptr); + auto db_impl = reinterpret_cast(db_); ColumnFamilyHandle* cfh = column_family ? column_family : db_impl->DefaultColumnFamily(); @@ -341,213 +297,6 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family, return result; } -Status TransactionImpl::Get(const ReadOptions& read_options, - ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { - return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key, - value); -} - -Status TransactionImpl::GetForUpdate(const ReadOptions& read_options, - ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) { - Status s = TryLock(column_family, key); - - if (s.ok() && value != nullptr) { - s = Get(read_options, column_family, key, value); - } - return s; -} - -std::vector TransactionImpl::MultiGet( - const ReadOptions& read_options, - const std::vector& column_family, - const std::vector& keys, std::vector* values) { - size_t num_keys = keys.size(); - values->resize(num_keys); - - std::vector stat_list(num_keys); - for (size_t i = 0; i < num_keys; ++i) { - std::string* value = values ? &(*values)[i] : nullptr; - stat_list[i] = Get(read_options, column_family[i], keys[i], value); - } - - return stat_list; -} - -std::vector TransactionImpl::MultiGetForUpdate( - const ReadOptions& read_options, - const std::vector& column_family, - const std::vector& keys, std::vector* values) { - // Regardless of whether the MultiGet succeeded, track these keys. - size_t num_keys = keys.size(); - values->resize(num_keys); - - // Lock all keys - for (size_t i = 0; i < num_keys; ++i) { - Status s = TryLock(column_family[i], keys[i]); - if (!s.ok()) { - // Fail entire multiget if we cannot lock all keys - return std::vector(num_keys, s); - } - } - - // TODO(agiardullo): optimize multiget? - std::vector stat_list(num_keys); - for (size_t i = 0; i < num_keys; ++i) { - std::string* value = values ? &(*values)[i] : nullptr; - stat_list[i] = Get(read_options, column_family[i], keys[i], value); - } - - return stat_list; -} - -Iterator* TransactionImpl::GetIterator(const ReadOptions& read_options) { - Iterator* db_iter = db_->NewIterator(read_options); - assert(db_iter); - - return write_batch_->NewIteratorWithBase(db_iter); -} - -Iterator* TransactionImpl::GetIterator(const ReadOptions& read_options, - ColumnFamilyHandle* column_family) { - Iterator* db_iter = db_->NewIterator(read_options, column_family); - assert(db_iter); - - return write_batch_->NewIteratorWithBase(column_family, db_iter); -} - -Status TransactionImpl::Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) { - Status s = TryLock(column_family, key); - - if (s.ok()) { - write_batch_->Put(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::Put(ColumnFamilyHandle* column_family, - const SliceParts& key, const SliceParts& value) { - Status s = TryLock(column_family, key); - - if (s.ok()) { - write_batch_->Put(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::Merge(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - Status s = TryLock(column_family, key); - - if (s.ok()) { - write_batch_->Merge(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::Delete(ColumnFamilyHandle* column_family, - const Slice& key) { - Status s = TryLock(column_family, key); - - if (s.ok()) { - write_batch_->Delete(column_family, key); - } - - return s; -} - -Status TransactionImpl::Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) { - Status s = TryLock(column_family, key); - - if (s.ok()) { - write_batch_->Delete(column_family, key); - } - - return s; -} - -Status TransactionImpl::PutUntracked(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - // Even though we do not care about doing conflict checking for this write, - // we still need to take a lock to make sure we do not cause a conflict with - // some other write. However, we do not need to check if there have been - // any writes since this transaction's snapshot. - bool check_snapshot = false; - - // TODO(agiardullo): could optimize by supporting shared txn locks in the - // future - Status s = TryLock(column_family, key, check_snapshot); - - if (s.ok()) { - write_batch_->Put(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::PutUntracked(ColumnFamilyHandle* column_family, - const SliceParts& key, - const SliceParts& value) { - bool check_snapshot = false; - Status s = TryLock(column_family, key, check_snapshot); - - if (s.ok()) { - write_batch_->Put(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::MergeUntracked(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - bool check_snapshot = false; - Status s = TryLock(column_family, key, check_snapshot); - - if (s.ok()) { - write_batch_->Merge(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family, - const Slice& key) { - bool check_snapshot = false; - Status s = TryLock(column_family, key, check_snapshot); - - if (s.ok()) { - write_batch_->Delete(column_family, key); - } - - return s; -} - -Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family, - const SliceParts& key) { - bool check_snapshot = false; - Status s = TryLock(column_family, key, check_snapshot); - - if (s.ok()) { - write_batch_->Delete(column_family, key); - } - - return s; -} - -void TransactionImpl::PutLogData(const Slice& blob) { - write_batch_->PutLogData(blob); -} - -WriteBatchWithIndex* TransactionImpl::GetWriteBatch() { - return write_batch_.get(); -} - } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index 06d5903e2..2d11ac0c0 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -22,6 +22,7 @@ #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/write_batch_with_index.h" +#include "utilities/transactions/transaction_base.h" #include "utilities/transactions/transaction_util.h" namespace rocksdb { @@ -30,7 +31,7 @@ using TransactionID = uint64_t; class TransactionDBImpl; -class TransactionImpl : public Transaction { +class TransactionImpl : public TransactionBaseImpl { public: TransactionImpl(TransactionDB* db, const WriteOptions& write_options, const TransactionOptions& txn_options); @@ -43,123 +44,6 @@ class TransactionImpl : public Transaction { void Rollback() override; - void SetSavePoint() override; - - Status RollbackToSavePoint() override; - - Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) override; - - Status Get(const ReadOptions& options, const Slice& key, - std::string* value) override { - return Get(options, db_->DefaultColumnFamily(), key, value); - } - - Status GetForUpdate(const ReadOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; - - Status GetForUpdate(const ReadOptions& options, const Slice& key, - std::string* value) override { - return GetForUpdate(options, db_->DefaultColumnFamily(), key, value); - } - - std::vector MultiGet( - const ReadOptions& options, - const std::vector& column_family, - const std::vector& keys, - std::vector* values) override; - - std::vector MultiGet(const ReadOptions& options, - const std::vector& keys, - std::vector* values) override { - return MultiGet(options, std::vector( - keys.size(), db_->DefaultColumnFamily()), - keys, values); - } - - std::vector MultiGetForUpdate( - const ReadOptions& options, - const std::vector& column_family, - const std::vector& keys, - std::vector* values) override; - - std::vector MultiGetForUpdate( - const ReadOptions& options, const std::vector& keys, - std::vector* values) override { - return MultiGetForUpdate(options, - std::vector( - keys.size(), db_->DefaultColumnFamily()), - keys, values); - } - - Iterator* GetIterator(const ReadOptions& read_options) override; - Iterator* GetIterator(const ReadOptions& read_options, - ColumnFamilyHandle* column_family) override; - - Status Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status Put(const Slice& key, const Slice& value) override { - return Put(nullptr, key, value); - } - - Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; - Status Put(const SliceParts& key, const SliceParts& value) override { - return Put(nullptr, key, value); - } - - Status Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status Merge(const Slice& key, const Slice& value) override { - return Merge(nullptr, key, value); - } - - Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; - Status Delete(const Slice& key) override { return Delete(nullptr, key); } - Status Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } - - Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status PutUntracked(const Slice& key, const Slice& value) override { - return PutUntracked(nullptr, key, value); - } - - Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; - Status PutUntracked(const SliceParts& key, const SliceParts& value) override { - return PutUntracked(nullptr, key, value); - } - - Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status MergeUntracked(const Slice& key, const Slice& value) override { - return MergeUntracked(nullptr, key, value); - } - - Status DeleteUntracked(ColumnFamilyHandle* column_family, - const Slice& key) override; - Status DeleteUntracked(const Slice& key) override { - return DeleteUntracked(nullptr, key); - } - Status DeleteUntracked(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - Status DeleteUntracked(const SliceParts& key) override { - return DeleteUntracked(nullptr, key); - } - - void PutLogData(const Slice& blob) override; - - const Snapshot* GetSnapshot() const override { - return snapshot_ ? snapshot_->snapshot() : nullptr; - } - - void SetSnapshot() override; - - WriteBatchWithIndex* GetWriteBatch() override; - // Generate a new unique transaction identifier static TransactionID GenTxnID(); @@ -178,9 +62,11 @@ class TransactionImpl : public Transaction { int64_t GetLockTimeout() const { return lock_timeout_; } void SetLockTimeout(int64_t timeout) { lock_timeout_ = timeout; } - private: - TransactionDB* const db_; + protected: + Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, + bool untracked = false) override; + private: TransactionDBImpl* txn_db_impl_; // Used to create unique ids for transactions. @@ -189,21 +75,6 @@ class TransactionImpl : public Transaction { // Unique ID for this transaction const TransactionID txn_id_; - const WriteOptions write_options_; - - // If snapshot_ is set, all keys that locked must also have not been written - // since this snapshot - std::shared_ptr snapshot_; - - const Comparator* cmp_; - - std::unique_ptr write_batch_; - - // If expiration_ is non-zero, start_time_ stores that time the txn was - // constructed, - // in milliseconds. - const uint64_t start_time_; - // If non-zero, this transaction should not be committed after this time (in // milliseconds) const uint64_t expiration_time_; @@ -217,14 +88,6 @@ class TransactionImpl : public Transaction { // stored. TransactionKeyMap tracked_keys_; - // Stack of the Snapshot saved at each save point. Saved snapshots may be - // nullptr if there was no snapshot at the time SetSavePoint() was called. - std::unique_ptr>> save_points_; - - Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, - bool check_snapshot = true); - Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, - bool check_snapshot = true); void Cleanup(); Status CheckKeySequence(ColumnFamilyHandle* column_family, const Slice& key);