diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 6c4346ff3..db32ba0bc 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -94,6 +94,13 @@ struct TransactionDBOptions { // for the special way that myrocks uses this operands. bool rollback_merge_operands = false; + // If true, the TransactionDB implementation might skip concurrency control + // unless it is overridden by TransactionOptions or + // TransactionDBWriteOptimizations. This can be used in conjuction with + // DBOptions::unordered_write when the TransactionDB is used solely for write + // ordering rather than concurrency control. + bool skip_concurrency_control = false; + private: // 128 entries size_t wp_snapshot_cache_bits = static_cast(7); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 18d873343..2ceca4fd9 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -3788,6 +3788,11 @@ void VerifyDBFromDB(std::string& truth_db_name) { } else if (FLAGS_transaction_db) { TransactionDB* ptr; TransactionDBOptions txn_db_options; + if (options.unordered_write) { + options.two_write_queues = true; + txn_db_options.skip_concurrency_control = true; + txn_db_options.write_policy = WRITE_PREPARED; + } s = TransactionDB::Open(options, txn_db_options, db_name, column_families, &db->cfh, &ptr); if (s.ok()) { @@ -3814,6 +3819,11 @@ void VerifyDBFromDB(std::string& truth_db_name) { } else if (FLAGS_transaction_db) { TransactionDB* ptr = nullptr; TransactionDBOptions txn_db_options; + if (options.unordered_write) { + options.two_write_queues = true; + txn_db_options.skip_concurrency_control = true; + txn_db_options.write_policy = WRITE_PREPARED; + } s = CreateLoggerFromOptions(db_name, options, &options.info_log); if (s.ok()) { s = TransactionDB::Open(options, txn_db_options, db_name, &ptr); diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 7b1b0241c..c1b37c148 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -522,23 +522,16 @@ Status PessimisticTransactionDB::Merge(const WriteOptions& options, Status PessimisticTransactionDB::Write(const WriteOptions& opts, WriteBatch* updates) { - // Need to lock all keys in this batch to prevent write conflicts with - // concurrent transactions. - Transaction* txn = BeginInternalTransaction(opts); - txn->DisableIndexing(); + return WriteWithConcurrencyControl(opts, updates); +} - auto txn_impl = - static_cast_with_check(txn); - - // Since commitBatch sorts the keys before locking, concurrent Write() - // operations will not cause a deadlock. - // In order to avoid a deadlock with a concurrent Transaction, Transactions - // should use a lock timeout. - Status s = txn_impl->CommitBatch(updates); - - delete txn; - - return s; +Status WriteCommittedTxnDB::Write(const WriteOptions& opts, + WriteBatch* updates) { + if (txn_db_options_.skip_concurrency_control) { + return db_impl_->Write(opts, updates); + } else { + return WriteWithConcurrencyControl(opts, updates); + } } Status WriteCommittedTxnDB::Write( @@ -547,7 +540,7 @@ Status WriteCommittedTxnDB::Write( if (optimizations.skip_concurrency_control) { return db_impl_->Write(opts, updates); } else { - return Write(opts, updates); + return WriteWithConcurrencyControl(opts, updates); } } diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index e80b28852..5242c6260 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -19,6 +19,7 @@ #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" +#include "util/cast_util.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/transaction_lock_mgr.h" #include "utilities/transactions/write_prepared_txn.h" @@ -67,6 +68,26 @@ class PessimisticTransactionDB : public TransactionDB { using TransactionDB::Write; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; + inline Status WriteWithConcurrencyControl(const WriteOptions& opts, + WriteBatch* updates) { + // Need to lock all keys in this batch to prevent write conflicts with + // concurrent transactions. + Transaction* txn = BeginInternalTransaction(opts); + txn->DisableIndexing(); + + auto txn_impl = + static_cast_with_check(txn); + + // Since commitBatch sorts the keys before locking, concurrent Write() + // operations will not cause a deadlock. + // In order to avoid a deadlock with a concurrent Transaction, Transactions + // should use a lock timeout. + Status s = txn_impl->CommitBatch(updates); + + delete txn; + + return s; + } using StackableDB::CreateColumnFamily; virtual Status CreateColumnFamily(const ColumnFamilyOptions& options, @@ -191,6 +212,7 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB { virtual Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) override; + virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; }; } // namespace rocksdb diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 6b6831fd8..5250f3f2d 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -108,6 +108,18 @@ Transaction* WritePreparedTxnDB::BeginTransaction( } } +Status WritePreparedTxnDB::Write(const WriteOptions& opts, + WriteBatch* updates) { + if (txn_db_options_.skip_concurrency_control) { + // Skip locking the rows + const size_t UNKNOWN_BATCH_CNT = 0; + WritePreparedTxn* NO_TXN = nullptr; + return WriteInternal(opts, updates, UNKNOWN_BATCH_CNT, NO_TXN); + } else { + return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates); + } +} + Status WritePreparedTxnDB::Write( const WriteOptions& opts, const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { @@ -123,7 +135,7 @@ Status WritePreparedTxnDB::Write( } else { // TODO(myabandeh): Make use of skip_duplicate_key_check hint // Fall back to unoptimized version - return PessimisticTransactionDB::Write(opts, updates); + return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates); } } diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 25b9b9a1b..ffdf2f29d 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -72,6 +72,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { const TransactionOptions& txn_options, Transaction* old_txn) override; + using TransactionDB::Write; + Status Write(const WriteOptions& opts, WriteBatch* updates) override; + // Optimized version of ::Write that receives more optimization request such // as skip_concurrency_control. using PessimisticTransactionDB::Write;