diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 1a692f2a7..d4bbe4eda 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -93,6 +93,15 @@ struct TransactionDBOptions { // logic in myrocks. This hack of simply not rolling back merge operands works // for the special way that myrocks uses this operands. bool rollback_merge_operands = false; + + private: + // 128 entries + size_t wp_snapshot_cache_bits = static_cast(7); + // 8m entry, 64MB size + size_t wp_commit_cache_bits = static_cast(23); + + friend class WritePreparedTxnDB; + friend class WritePreparedTransactionTestBase; }; struct TransactionOptions { diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index a13086a4b..e7f50200b 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -324,13 +324,6 @@ class WritePreparedTxnDBMock : public WritePreparedTxnDB { public: WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt) : WritePreparedTxnDB(db_impl, opt) {} - WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt, - size_t snapshot_cache_size) - : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {} - WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt, - size_t snapshot_cache_size, size_t commit_cache_size) - : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size, - commit_cache_size) {} void SetDBSnapshots(const std::vector& snapshots) { snapshots_ = snapshots; } @@ -353,39 +346,14 @@ class WritePreparedTransactionTestBase : public TransactionTestBase { : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}; protected: - // TODO(myabandeh): Avoid duplicating PessimisticTransaction::Open logic here. - void DestroyAndReopenWithExtraOptions(size_t snapshot_cache_bits, - size_t commit_cache_bits) { - delete db; - db = nullptr; - DestroyDB(dbname, options); - - options.create_if_missing = true; - ColumnFamilyOptions cf_options(options); - std::vector column_families; - std::vector handles; - column_families.push_back( - ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); - std::vector compaction_enabled_cf_indices; - TransactionDB::PrepareWrap(&options, &column_families, - &compaction_enabled_cf_indices); - DB* base_db = nullptr; - ASSERT_OK(DBImpl::Open(options, dbname, column_families, &handles, &base_db, - true /*use_seq_per_batch*/, - false /*use_batch_for_txn*/)); - - // The following is equivalent of WrapDB(). - txn_db_options.write_policy = WRITE_PREPARED; - auto* wp_db = new WritePreparedTxnDB( - base_db, txn_db_options, snapshot_cache_bits, commit_cache_bits); - wp_db->UpdateCFComparatorMap(handles); - ASSERT_OK(wp_db->Initialize(compaction_enabled_cf_indices, handles)); - - ASSERT_EQ(1, handles.size()); - delete handles[0]; - db = wp_db; + void UpdateTransactionDBOptions(size_t snapshot_cache_bits, + size_t commit_cache_bits) { + txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits; + txn_db_options.wp_commit_cache_bits = commit_cache_bits; + } + void UpdateTransactionDBOptions(size_t snapshot_cache_bits) { + txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits; } - // If expect_update is set, check if it actually updated old_commit_map_. If // it did not and yet suggested not to check the next snapshot, do the // opposite to check if it was not a bad suggestion. @@ -843,8 +811,9 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { const size_t snapshot_cache_bits = 0; const size_t commit_cache_bits = 0; DBImpl* mock_db = new DBImpl(options, dbname); - std::unique_ptr wp_db(new WritePreparedTxnDBMock( - mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits)); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + std::unique_ptr wp_db( + new WritePreparedTxnDBMock(mock_db, txn_db_options)); SequenceNumber seq = 0; // Take the first snapshot that overlaps with two txn @@ -928,8 +897,9 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) { // the snapshots lists changed. assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size()); DBImpl* mock_db = new DBImpl(options, dbname); + UpdateTransactionDBOptions(snapshot_cache_bits); std::unique_ptr wp_db( - new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits)); + new WritePreparedTxnDBMock(mock_db, txn_db_options)); SequenceNumber version = 1000l; ASSERT_EQ(0, wp_db->snapshots_total_); wp_db->UpdateSnapshots(snapshots, version); @@ -1023,8 +993,9 @@ TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccessTest) { // Choose the cache size so that the new snapshot list could replace all the // existing items in the cache and also have some overflow. DBImpl* mock_db = new DBImpl(options, dbname); + UpdateTransactionDBOptions(snapshot_cache_bits); std::unique_ptr wp_db( - new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits)); + new WritePreparedTxnDBMock(mock_db, txn_db_options)); const size_t extra = 2; size_t loop_id = 0; // Add up to extra items that do not fit into the cache @@ -1197,7 +1168,8 @@ TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) { TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) { const size_t snapshot_cache_bits = 7; // same as default const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction - DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); WriteOptions woptions; WritePreparedTxnDB* wp_db = dynamic_cast(db); @@ -1244,7 +1216,8 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) { TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) { const size_t snapshot_cache_bits = 7; // same as default const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction - DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); WriteOptions woptions; WritePreparedTxnDB* wp_db = dynamic_cast(db); // Insert something to increase seq @@ -1807,8 +1780,9 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { // The set of commit seq numbers to be excluded from IsInSnapshot queries std::set commit_seqs; DBImpl* mock_db = new DBImpl(options, dbname); - std::unique_ptr wp_db(new WritePreparedTxnDBMock( - mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits)); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + std::unique_ptr wp_db( + new WritePreparedTxnDBMock(mock_db, txn_db_options)); // We continue until max advances a bit beyond the snapshot. while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { // do prepare for a transaction @@ -2232,7 +2206,8 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) { const size_t snapshot_cache_bits = 7; // same as default const size_t commit_cache_bits = 0; // disable commit cache for (bool has_recent_prepare : {true, false}) { - DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); auto* transaction = @@ -2280,7 +2255,8 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) { TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) { const size_t snapshot_cache_bits = 7; // same as default const size_t commit_cache_bits = 0; // minimum commit cache - DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1")); auto* transaction = @@ -2328,7 +2304,8 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) { TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) { const size_t snapshot_cache_bits = 7; // same as default const size_t commit_cache_bits = 0; // minimum commit cache - DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); ASSERT_OK(db->Put(WriteOptions(), "key1", "value2")); @@ -2377,7 +2354,8 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) { TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) { const size_t snapshot_cache_bits = 7; // same as default const size_t commit_cache_bits = 1; // commit cache size = 2 - DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); // Add a dummy key to evict v2 commit cache, but keep v1 commit cache. // It also advance max_evicted_seq and can trigger old_commit_map cleanup. @@ -2427,7 +2405,8 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) { TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) { const size_t snapshot_cache_bits = 7; // same as default const size_t commit_cache_bits = 0; // minimum commit cache - DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); auto* transaction = @@ -2706,7 +2685,8 @@ TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfOldPrepared) { const size_t snapshot_cache_bits = 7; // same as default const size_t commit_cache_bits = 3; // 8 entries for (auto split_read : {true, false}) { - DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); // Fill up the commit cache std::string init_value("value1"); for (int i = 0; i < 10; i++) { @@ -2777,7 +2757,8 @@ TEST_P(WritePreparedTransactionTest, CommitOfOldPrepared) { const size_t snapshot_cache_bits = 7; // same as default for (const size_t commit_cache_bits : {0, 2, 3}) { for (const size_t sub_batch_cnt : {1, 2, 3}) { - DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); std::atomic snap = {nullptr}; std::atomic exp_prepare = {0}; // Value is synchronized via snap diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index b2ca89ae8..6533beec7 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -43,27 +43,23 @@ namespace rocksdb { // mechanisms to tell such data apart from committed data. class WritePreparedTxnDB : public PessimisticTransactionDB { public: - explicit WritePreparedTxnDB( - DB* db, const TransactionDBOptions& txn_db_options, - size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS, - size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS) + explicit WritePreparedTxnDB(DB* db, + const TransactionDBOptions& txn_db_options) : PessimisticTransactionDB(db, txn_db_options), - SNAPSHOT_CACHE_BITS(snapshot_cache_bits), + SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), SNAPSHOT_CACHE_SIZE(static_cast(1ull << SNAPSHOT_CACHE_BITS)), - COMMIT_CACHE_BITS(commit_cache_bits), + COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), FORMAT(COMMIT_CACHE_BITS) { Init(txn_db_options); } - explicit WritePreparedTxnDB( - StackableDB* db, const TransactionDBOptions& txn_db_options, - size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS, - size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS) + explicit WritePreparedTxnDB(StackableDB* db, + const TransactionDBOptions& txn_db_options) : PessimisticTransactionDB(db, txn_db_options), - SNAPSHOT_CACHE_BITS(snapshot_cache_bits), + SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), SNAPSHOT_CACHE_SIZE(static_cast(1ull << SNAPSHOT_CACHE_BITS)), - COMMIT_CACHE_BITS(commit_cache_bits), + COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), FORMAT(COMMIT_CACHE_BITS) { Init(txn_db_options); @@ -619,8 +615,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // The list sorted in ascending order. Thread-safety for writes is provided // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for // each entry. In x86_64 architecture such reads are compiled to simple read - // instructions. 128 entries - static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast(7); + // instructions. const size_t SNAPSHOT_CACHE_BITS; const size_t SNAPSHOT_CACHE_SIZE; std::unique_ptr[]> snapshot_cache_; @@ -638,8 +633,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // A heap of prepared transactions. Thread-safety is provided with // prepared_mutex_. PreparedHeap prepared_txns_; - // 8m entry, 64MB size - static const size_t DEF_COMMIT_CACHE_BITS = static_cast(23); const size_t COMMIT_CACHE_BITS; const size_t COMMIT_CACHE_SIZE; const CommitEntry64bFormat FORMAT;