diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 530434054..07c3eeeeb 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -607,6 +607,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, delayed_prepared_empty_.store(false, std::memory_order_release); } } + // With each change to max_evicted_seq_ fetch the live snapshots behind it { WriteLock wl(&snapshots_mutex_); InstrumentedMutex(db_impl_->mutex()); @@ -622,9 +623,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, // be kept around because it overlaps with a live snapshot. { ReadLock rl(&snapshots_mutex_); - for (auto snapshot : snapshots_) { - auto snapshot_seq = - reinterpret_cast(snapshot)->number_; + for (auto snapshot_seq : snapshots_) { if (evicted.commit_seq <= snapshot_seq) { break; } @@ -691,5 +690,8 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq, return true; } +// 10m entry, 80MB size +uint64_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = + static_cast(1 << 21); } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 4d1a5f4b5..489da30bf 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -161,11 +161,17 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { public: explicit WritePreparedTxnDB(DB* db, const TransactionDBOptions& txn_db_options) - : PessimisticTransactionDB(db, txn_db_options) {} + : PessimisticTransactionDB(db, txn_db_options), + COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) { + init(txn_db_options); + } explicit WritePreparedTxnDB(StackableDB* db, const TransactionDBOptions& txn_db_options) - : PessimisticTransactionDB(db, txn_db_options) {} + : PessimisticTransactionDB(db, txn_db_options), + COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) { + init(txn_db_options); + } virtual ~WritePreparedTxnDB() {} @@ -183,6 +189,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq); private: + friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; + + void init(const TransactionDBOptions& /* unused */) { + commit_cache_ = + unique_ptr(new CommitEntry[COMMIT_CACHE_SIZE]{}); + } + // A heap with the amortized O(1) complexity for erase. It uses one extra heap // to keep track of erased entries that are not yet on top of the main heap. class PreparedHeap { @@ -236,11 +249,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // A heap of prepared transactions. Thread-safety is provided with // prepared_mutex_. PreparedHeap prepared_txns_; - // 10m entry, 80MB size - static const uint64_t COMMIT_CACHE_SIZE = static_cast(1 << 21); - // commit_cache_ is initialized to zero to tell apart an empty index from a - // filled one. Thread-safety is provided with commit_cache_mutex_. - CommitEntry commit_cache_[COMMIT_CACHE_SIZE] = {}; + static uint64_t DEF_COMMIT_CACHE_SIZE; + const uint64_t COMMIT_CACHE_SIZE; + // commit_cache_ must be initialized to zero to tell apart an empty index from + // a filled one. Thread-safety is provided with commit_cache_mutex_. + unique_ptr commit_cache_; // The largest evicted *commit* sequence number from the commit_cache_ std::atomic max_evicted_seq_ = {}; // A map of the evicted entries from commit_cache_ that has to be kept around diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 0eaaf20ac..2e8c87f49 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -17,7 +17,6 @@ #include "rocksdb/utilities/transaction_db.h" #include "table/mock_table.h" #include "util/fault_injection_test_env.h" -#include "util/logging.h" #include "util/random.h" #include "util/string_util.h" #include "util/sync_point.h" @@ -26,6 +25,7 @@ #include "util/transaction_test_util.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" +#include "utilities/transactions/pessimistic_transaction_db.h" #include "port/port.h" @@ -33,8 +33,8 @@ using std::string; namespace rocksdb { -class TransactionTest - : public ::testing::TestWithParam> { +class TransactionTest : public ::testing::TestWithParam< + std::tuple> { public: TransactionDB* db; FaultInjectionTestEnv* env; @@ -57,6 +57,7 @@ class TransactionTest DestroyDB(dbname, options); txn_db_options.transaction_lock_timeout = 0; txn_db_options.default_lock_timeout = 0; + txn_db_options.write_policy = std::get<2>(GetParam()); Status s; if (std::get<0>(GetParam()) == false) { s = TransactionDB::Open(options, txn_db_options, dbname, &db); @@ -123,16 +124,23 @@ class TransactionTest }; class MySQLStyleTransactionTest : public TransactionTest {}; +class WritePreparedTransactionTest : public TransactionTest {}; +static const TxnDBWritePolicy wc = WRITE_COMMITTED; +static const TxnDBWritePolicy wp = WRITE_PREPARED; +// TODO(myabandeh): Instantiate the tests with other write policies INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(false, false))); + ::testing::Values(std::make_tuple(false, false, wc))); INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(true, false))); + ::testing::Values(std::make_tuple(true, false, wc))); INSTANTIATE_TEST_CASE_P(MySQLStyleTransactionTest, MySQLStyleTransactionTest, - ::testing::Values(std::make_tuple(false, false), - std::make_tuple(false, true), - std::make_tuple(true, false), - std::make_tuple(true, true))); + ::testing::Values(std::make_tuple(false, false, wc), + std::make_tuple(false, true, wc), + std::make_tuple(true, false, wc), + std::make_tuple(true, true, wc))); +INSTANTIATE_TEST_CASE_P(WritePreparedTransactionTest, + WritePreparedTransactionTest, + ::testing::Values(std::make_tuple(false, true, wp))); TEST_P(TransactionTest, DoubleEmptyWrite) { WriteOptions write_options; @@ -4720,6 +4728,128 @@ TEST_P(TransactionTest, MemoryLimitTest) { delete txn; } +// Test WritePreparedTxnDB's IsInSnapshot against different ordering of +// snapshot, max_committed_seq_, prepared, and commit entries. +TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { + WriteOptions wo; + // Use small commit cache to trigger lots of eviction and fast advance of + // max_evicted_seq_ + WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = + 8; // will take effect after ReOpen + + // Take some preliminary snapshots first. This is to stress the data structure + // that holds the old snapshots as it will be designed to be efficient when + // only a few snapshots are below the max_evicted_seq_. + for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) { + // Leave some gap between the preliminary snapshots and the final snapshot + // that we check. This should test for also different overlapping scnearios + // between the last snapshot and the commits. + for (int max_gap = 1; max_gap < 10; max_gap++) { + // Since we do not actually write to db, we mock the seq as it would be + // increaased by the db. The only exception is that we need db seq to + // advance for our snapshots. for which we apply a dummy put each time we + // increase our mock of seq. + uint64_t seq = 0; + // At each step we prepare a txn and then we commit it in the next txn. + // This emulates the consecuitive transactions that write to the same key + uint64_t cur_txn = 0; + // Number of snapshots taken so far + int num_snapshots = 0; + // Number of gaps applied so far + int gap_cnt = 0; + // The final snapshot that we will inspect + uint64_t snapshot = 0; + bool found_committed = false; + // To stress the data structure that maintain prepared txns, at each cycle + // we add a new prepare txn. These do not mean to be committed for + // snapshot inspection. + std::set prepared; + // We keep the list of txns comitted before we take the last snaphot. + // These should be the only seq numbers that will be found in the snapshot + std::set committed_before; + ReOpen(); // to restart the db + WritePreparedTxnDB* wp_db = dynamic_cast(db); + assert(wp_db); + assert(wp_db->db_impl_); + // We continue until max advances a bit beyond the snapshot. + while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { + // do prepare for a transaction + wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq + seq++; + ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); + wp_db->AddPrepared(seq); + prepared.insert(seq); + + // If cur_txn is not started, do prepare for it. + if (!cur_txn) { + wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq + seq++; + ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); + cur_txn = seq; + wp_db->AddPrepared(cur_txn); + } else { // else commit it + wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq + seq++; + ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); + wp_db->AddCommitted(cur_txn, seq); + if (!snapshot) { + committed_before.insert(cur_txn); + } + cur_txn = 0; + } + + if (num_snapshots < max_snapshots - 1) { + // Take preliminary snapshots + db->GetSnapshot(); + num_snapshots++; + } else if (gap_cnt < max_gap) { + // Wait for some gap before taking the final snapshot + gap_cnt++; + } else if (!snapshot) { + // Take the final snapshot if it is not already taken + snapshot = db->GetSnapshot()->GetSequenceNumber(); + // We increase the db seq artificailly by a dummy Put. Check that this + // technique is effective and db seq is that same as ours. + ASSERT_EQ(snapshot, seq); + num_snapshots++; + } + + // If the snapshot is taken, verify seq numbers visible to it. We redo + // it at each cycle to test that the system is still sound when + // max_evicted_seq_ advances. + if (snapshot) { + for (uint64_t s = 0; s <= seq; s++) { + bool was_committed = + (committed_before.find(s) != committed_before.end()); + bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot); + if (was_committed != is_in_snapshot) { + printf( + "max_snapshots %d max_gap %d seq %lu max %lu snapshot %lu " + "gap_cnt %d num_snapshots %d\n", + max_snapshots, max_gap, seq, wp_db->max_evicted_seq_.load(), + snapshot, gap_cnt, num_snapshots); + } + ASSERT_EQ(was_committed, is_in_snapshot); + found_committed = found_committed || is_in_snapshot; + } + } + } + // Safety check to make sure the test actually ran + ASSERT_TRUE(found_committed); + // As an extra check, check if prepared set will be properly empty after + // they are committed. + if (cur_txn) { + wp_db->AddCommitted(cur_txn, seq); + } + for (auto p : prepared) { + wp_db->AddCommitted(p, seq); + } + ASSERT_TRUE(wp_db->delayed_prepared_.empty()); + ASSERT_TRUE(wp_db->prepared_txns_.empty()); + } + } +} + } // namespace rocksdb int main(int argc, char** argv) {