diff --git a/include/rocksdb/utilities/optimistic_transaction_db.h b/include/rocksdb/utilities/optimistic_transaction_db.h index 42ebe191f..b2c2f99a8 100644 --- a/include/rocksdb/utilities/optimistic_transaction_db.h +++ b/include/rocksdb/utilities/optimistic_transaction_db.h @@ -43,15 +43,19 @@ class OptimisticTransactionDB { virtual ~OptimisticTransactionDB() {} - // Starts a new Transaction. Passing set_snapshot=true has the same effect - // as calling SetSnapshot(). + // Starts a new Transaction. // - // Caller should delete the returned transaction after calling - // Commit() or Rollback(). + // Caller is responsible for deleting the returned transaction when no + // longer needed. + // + // If old_txn is not null, BeginTransaction will reuse this Transaction + // handle instead of allocating a new one. This is an optimization to avoid + // extra allocations when repeatedly creating transactions. virtual Transaction* BeginTransaction( const WriteOptions& write_options, - const OptimisticTransactionOptions& - txn_options = OptimisticTransactionOptions()) = 0; + const OptimisticTransactionOptions& txn_options = + OptimisticTransactionOptions(), + Transaction* old_txn = nullptr) = 0; // Return the underlying Database that was opened virtual DB* GetBaseDB() = 0; diff --git a/utilities/transactions/optimistic_transaction_db_impl.cc b/utilities/transactions/optimistic_transaction_db_impl.cc index d54173d3d..190440242 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.cc +++ b/utilities/transactions/optimistic_transaction_db_impl.cc @@ -5,11 +5,11 @@ #ifndef ROCKSDB_LITE +#include "utilities/transactions/optimistic_transaction_db_impl.h" + #include #include -#include "utilities/transactions/optimistic_transaction_db_impl.h" - #include "db/db_impl.h" #include "rocksdb/db.h" #include "rocksdb/options.h" @@ -20,11 +20,13 @@ namespace rocksdb { Transaction* OptimisticTransactionDBImpl::BeginTransaction( const WriteOptions& write_options, - const OptimisticTransactionOptions& txn_options) { - Transaction* txn = - new OptimisticTransactionImpl(this, write_options, txn_options); - - return txn; + const OptimisticTransactionOptions& txn_options, Transaction* old_txn) { + if (old_txn != nullptr) { + ReinitializeTransaction(old_txn, write_options, txn_options); + return old_txn; + } else { + return new OptimisticTransactionImpl(this, write_options, txn_options); + } } Status OptimisticTransactionDB::Open(const Options& options, @@ -76,5 +78,14 @@ Status OptimisticTransactionDB::Open( return s; } +void OptimisticTransactionDBImpl::ReinitializeTransaction( + Transaction* txn, const WriteOptions& write_options, + const OptimisticTransactionOptions& txn_options) { + assert(dynamic_cast(txn) != nullptr); + auto txn_impl = reinterpret_cast(txn); + + txn_impl->Reinitialize(this, write_options, txn_options); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/optimistic_transaction_db_impl.h b/utilities/transactions/optimistic_transaction_db_impl.h index 72f186188..e426a21be 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.h +++ b/utilities/transactions/optimistic_transaction_db_impl.h @@ -19,14 +19,19 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB { ~OptimisticTransactionDBImpl() {} - Transaction* BeginTransaction( - const WriteOptions& write_options, - const OptimisticTransactionOptions& txn_options) override; + Transaction* BeginTransaction(const WriteOptions& write_options, + const OptimisticTransactionOptions& txn_options, + Transaction* old_txn) override; DB* GetBaseDB() override { return db_.get(); } private: std::unique_ptr db_; + + void ReinitializeTransaction(Transaction* txn, + const WriteOptions& write_options, + const OptimisticTransactionOptions& txn_options = + OptimisticTransactionOptions()); }; } // namespace rocksdb diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index 5cb1a8f8d..2647b3dd7 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -28,11 +28,23 @@ OptimisticTransactionImpl::OptimisticTransactionImpl( OptimisticTransactionDB* txn_db, const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options) : TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_(txn_db) { + Initialize(txn_options); +} + +void OptimisticTransactionImpl::Initialize( + const OptimisticTransactionOptions& txn_options) { if (txn_options.set_snapshot) { SetSnapshot(); } } +void OptimisticTransactionImpl::Reinitialize( + OptimisticTransactionDB* txn_db, const WriteOptions& write_options, + const OptimisticTransactionOptions& txn_options) { + TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options); + Initialize(txn_options); +} + OptimisticTransactionImpl::~OptimisticTransactionImpl() { } diff --git a/utilities/transactions/optimistic_transaction_impl.h b/utilities/transactions/optimistic_transaction_impl.h index cbd167505..4876a100d 100644 --- a/utilities/transactions/optimistic_transaction_impl.h +++ b/utilities/transactions/optimistic_transaction_impl.h @@ -34,6 +34,10 @@ class OptimisticTransactionImpl : public TransactionBaseImpl { virtual ~OptimisticTransactionImpl(); + void Reinitialize(OptimisticTransactionDB* txn_db, + const WriteOptions& write_options, + const OptimisticTransactionOptions& txn_options); + Status Commit() override; void Rollback() override; @@ -47,6 +51,8 @@ class OptimisticTransactionImpl : public TransactionBaseImpl { friend class OptimisticTransactionCallback; + void Initialize(const OptimisticTransactionOptions& txn_options); + // Returns OK if it is safe to commit this transaction. Returns Status::Busy // if there are read or write conflicts that would prevent us from committing // OR if we can not determine whether there would be any such conflicts. diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 991771757..b672b8722 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -1267,6 +1267,90 @@ TEST_F(OptimisticTransactionTest, UndoGetForUpdateTest) { delete txn1; } +TEST_F(OptimisticTransactionTest, ReinitializeTest) { + WriteOptions write_options; + ReadOptions read_options; + OptimisticTransactionOptions txn_options; + string value; + Status s; + + Transaction* txn1 = txn_db->BeginTransaction(write_options, txn_options); + + txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->Put("Z", "z"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->Put("Z", "zz"); + ASSERT_OK(s); + + // Reinitilize txn1 and verify that zz is not written + txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); + + s = txn1->Commit(); + ASSERT_OK(s); + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "z"); + + // Verify snapshots get reinitialized correctly + txn1->SetSnapshot(); + s = txn1->Put("Z", "zzzz"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "zzzz"); + + const Snapshot* snapshot = txn1->GetSnapshot(); + ASSERT_TRUE(snapshot); + + txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); + snapshot = txn1->GetSnapshot(); + ASSERT_FALSE(snapshot); + + txn_options.set_snapshot = true; + txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); + snapshot = txn1->GetSnapshot(); + ASSERT_TRUE(snapshot); + + s = txn1->Put("Z", "a"); + ASSERT_OK(s); + + txn1->Rollback(); + + s = txn1->Put("Y", "y"); + ASSERT_OK(s); + + txn_options.set_snapshot = false; + txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); + snapshot = txn1->GetSnapshot(); + ASSERT_FALSE(snapshot); + + s = txn1->Put("X", "x"); + ASSERT_OK(s); + + s = txn1->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "zzzz"); + + s = db->Get(read_options, "Y", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete txn1; +} + } // namespace rocksdb int main(int argc, char** argv) {