diff --git a/.travis.yml b/.travis.yml index 78e519159..41e0ef2fc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,7 @@ env: - TEST_GROUP=platform_dependent # 16-18 minutes - TEST_GROUP=1 # 33-35 minutes - TEST_GROUP=2 # 30-32 minutes + - TEST_GROUP=3 # ? minutes - under development # Run java tests - JOB_NAME=java_test # 4-11 minutes # Build ROCKSDB_LITE @@ -36,6 +37,8 @@ matrix: env: TEST_GROUP=1 - os: osx env: TEST_GROUP=2 + - os: osx + env: TEST_GROUP=3 - os : osx env: JOB_NAME=cmake-mingw - os : linux @@ -59,7 +62,8 @@ script: - ${CXX} --version - if [ "${TEST_GROUP}" == 'platform_dependent' ]; then ccache -C && OPT=-DTRAVIS V=1 ROCKSDBTESTS_END=db_block_cache_test make -j4 all_but_some_tests check_some; fi - if [ "${TEST_GROUP}" == '1' ]; then OPT=-DTRAVIS V=1 ROCKSDBTESTS_START=db_block_cache_test ROCKSDBTESTS_END=comparator_db_test make -j4 check_some; fi - - if [ "${TEST_GROUP}" == '2' ]; then OPT=-DTRAVIS V=1 ROCKSDBTESTS_START=comparator_db_test make -j4 check_some; fi + - if [ "${TEST_GROUP}" == '2' ]; then OPT=-DTRAVIS V=1 ROCKSDBTESTS_START=comparator_db_test ROCKSDBTESTS_END=write_prepared_transaction_test make -j4 check_some; fi + - if [ "${TEST_GROUP}" == '3' ]; then OPT=-DTRAVIS V=1 ROCKSDBTESTS_START=write_prepared_transaction_test make -j4 check_some; fi - if [ "${JOB_NAME}" == 'java_test' ]; then OPT=-DTRAVIS V=1 make clean jclean && make rocksdbjava jtest; fi - if [ "${JOB_NAME}" == 'lite_build' ]; then OPT="-DTRAVIS -DROCKSDB_LITE" V=1 make -j4 static_lib; fi - if [ "${JOB_NAME}" == 'examples' ]; then OPT=-DTRAVIS V=1 make -j4 static_lib; cd examples; make -j4; fi diff --git a/CMakeLists.txt b/CMakeLists.txt index 31ab5abb8..b0ccc1e4f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -830,6 +830,7 @@ if(WITH_TESTS) utilities/table_properties_collectors/compact_on_deletion_collector_test.cc utilities/transactions/optimistic_transaction_test.cc utilities/transactions/transaction_test.cc + utilities/transactions/write_prepared_transaction_test.cc utilities/ttl/ttl_test.cc utilities/write_batch_with_index/write_batch_with_index_test.cc ) diff --git a/Makefile b/Makefile index a657fad72..7c3b055ea 100644 --- a/Makefile +++ b/Makefile @@ -476,6 +476,7 @@ TESTS = \ object_registry_test \ repair_test \ env_timed_test \ + write_prepared_transaction_test \ PARALLEL_TEST = \ backupable_db_test \ @@ -491,7 +492,8 @@ PARALLEL_TEST = \ manual_compaction_test \ persistent_cache_test \ table_test \ - transaction_test + transaction_test \ + write_prepared_transaction_test SUBSET := $(TESTS) ifdef ROCKSDBTESTS_START @@ -1392,6 +1394,9 @@ heap_test: util/heap_test.o $(GTEST) transaction_test: utilities/transactions/transaction_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +write_prepared_transaction_test: utilities/transactions/write_prepared_transaction_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + sst_dump: tools/sst_dump.o $(LIBOBJECTS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 3fac4a737..5dcdabad9 100644 --- a/TARGETS +++ b/TARGETS @@ -15,7 +15,6 @@ rocksdb_compiler_flags = [ "-DROCKSDB_SCHED_GETCPU_PRESENT", "-DROCKSDB_SUPPORT_THREAD_LOCAL", "-DOS_LINUX", - "-DROCKSDB_UBSAN_RUN", # Flags to enable libs we include "-DSNAPPY", "-DZLIB", @@ -476,7 +475,9 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'], ['thread_list_test', 'util/thread_list_test.cc', 'serial'], ['thread_local_test', 'util/thread_local_test.cc', 'serial'], ['timer_queue_test', 'util/timer_queue_test.cc', 'serial'], - ['transaction_test', 'utilities/transactions/transaction_test.cc', 'serial'], + ['transaction_test', + 'utilities/transactions/transaction_test.cc', + 'parallel'], ['ttl_test', 'utilities/ttl/ttl_test.cc', 'serial'], ['util_merge_operators_test', 'utilities/util_merge_operators_test.cc', @@ -493,7 +494,10 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'], 'memtable/write_buffer_manager_test.cc', 'serial'], ['write_callback_test', 'db/write_callback_test.cc', 'serial'], - ['write_controller_test', 'db/write_controller_test.cc', 'serial']] + ['write_controller_test', 'db/write_controller_test.cc', 'serial'], + ['write_prepared_transaction_test', + 'utilities/transactions/write_prepared_transaction_test.cc', + 'serial']] # Generate a test rule for each entry in ROCKS_TESTS diff --git a/src.mk b/src.mk index 30012d11f..90fecee5b 100644 --- a/src.mk +++ b/src.mk @@ -354,6 +354,7 @@ MAIN_SOURCES = \ utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \ utilities/transactions/optimistic_transaction_test.cc \ utilities/transactions/transaction_test.cc \ + utilities/transactions/write_prepared_transaction_test.cc \ utilities/ttl/ttl_test.cc \ utilities/write_batch_with_index/write_batch_with_index_test.cc \ diff --git a/util/sync_point.h b/util/sync_point.h index ada61becc..d836ed468 100644 --- a/util/sync_point.h +++ b/util/sync_point.h @@ -46,6 +46,7 @@ extern void TestKillRandom(std::string kill_point, int odds, #ifdef NDEBUG #define TEST_SYNC_POINT(x) +#define TEST_IDX_SYNC_POINT(x, index) #define TEST_SYNC_POINT_CALLBACK(x, y) #else @@ -135,6 +136,8 @@ class SyncPoint { // See TransactionLogIteratorRace in db_test.cc for an example use case. // TEST_SYNC_POINT is no op in release build. #define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x) +#define TEST_IDX_SYNC_POINT(x, index) \ + rocksdb::SyncPoint::GetInstance()->Process(x + std::to_string(index)) #define TEST_SYNC_POINT_CALLBACK(x, y) \ rocksdb::SyncPoint::GetInstance()->Process(x, y) #endif // NDEBUG diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 8fa9575e4..f98d11d08 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -22,6 +22,7 @@ #include "rocksdb/utilities/transaction_db.h" #include "util/cast_util.h" #include "util/mutexlock.h" +#include "util/sync_point.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/transaction_db_mutex_impl.h" @@ -605,121 +606,11 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, if (prev_max < evicted.commit_seq) { // TODO(myabandeh) inc max in larger steps to avoid frequent updates auto max_evicted_seq = evicted.commit_seq; - // When max_evicted_seq_ advances, move older entries from prepared_txns_ - // to delayed_prepared_. This guarantees that if a seq is lower than max, - // then it is not in prepared_txns_ ans save an expensive, synchronized - // lookup from a shared set. delayed_prepared_ is expected to be empty in - // normal cases. - { - WriteLock wl(&prepared_mutex_); - while (!prepared_txns_.empty() && - prepared_txns_.top() <= max_evicted_seq) { - auto to_be_popped = prepared_txns_.top(); - delayed_prepared_.insert(to_be_popped); - prepared_txns_.pop(); - delayed_prepared_empty_.store(false, std::memory_order_release); - } - } - - // With each change to max_evicted_seq_ fetch the live snapshots behind it - SequenceNumber curr_seq; - std::vector all_snapshots; - bool update_snapshots = false; - { - InstrumentedMutex(db_impl_->mutex()); - // We use this to identify how fresh are the snapshot list. Since this - // is done atomically with obtaining the snapshot list, the one with - // the larger seq is more fresh. If the seq is equal the full snapshot - // list could be different since taking snapshots does not increase - // the db seq. However since we only care about snapshots before the - // new max, such recent snapshots would not be included the in the - // list anyway. - curr_seq = db_impl_->GetLatestSequenceNumber(); - if (curr_seq > snapshots_version_) { - // This is to avoid updating the snapshots_ if it already updated - // with a more recent vesion by a concrrent thread - update_snapshots = true; - // We only care about snapshots lower then max - all_snapshots = - db_impl_->snapshots().GetAll(nullptr, max_evicted_seq); - } - } - if (update_snapshots) { - WriteLock wl(&snapshots_mutex_); - snapshots_version_ = curr_seq; - // We update the list concurrently with the readers. - // Both new and old lists are sorted and the new list is subset of the - // previous list plus some new items. Thus if a snapshot repeats in - // both new and old lists, it will appear upper in the new list. So if - // we simply insert the new snapshots in order, if an overwritten item - // is still valid in the new list is either written to the same place in - // the array or it is written in a higher palce before it gets - // overwritten by another item. This guarantess a reader that reads the - // list bottom-up will eventaully see a snapshot that repeats in the - // update, either before it gets overwritten by the writer or - // afterwards. - size_t i = 0; - auto it = all_snapshots.begin(); - for (; it != all_snapshots.end() && i < SNAPSHOT_CACHE_SIZE; - it++, i++) { - snapshot_cache_[i].store(*it, std::memory_order_release); - } - snapshots_.clear(); - for (; it != all_snapshots.end(); it++) { - // Insert them to a vector that is less efficient to access - // concurrently - snapshots_.push_back(*it); - } - // Update the size at the end. Otherwise a parallel reader might read - // items that are not set yet. - snapshots_total_.store(all_snapshots.size(), std::memory_order_release); - } - while (prev_max < max_evicted_seq && - !max_evicted_seq_.compare_exchange_weak( - prev_max, max_evicted_seq, std::memory_order_release, - std::memory_order_acquire)) { - }; + AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); } // After each eviction from commit cache, check if the commit entry should // be kept around because it overlaps with a live snapshot. - // First check the snapshot cache that is efficient for concurrent access - auto cnt = snapshots_total_.load(std::memory_order_acquire); - // The list might get updated concurrently as we are reading from it. The - // reader should be able to read all the snapshots that are still valid - // after the update. Since the survived snapshots are written in a higher - // place before gets overwritten the reader that reads bottom-up will - // eventully see it. - const bool next_is_larger = true; - SequenceNumber snapshot_seq = kMaxSequenceNumber; - size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE); - for (; 0 < ip1; ip1--) { - snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire); - if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, - snapshot_seq, !next_is_larger)) { - break; - } - } - if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE && - snapshot_seq < evicted.prep_seq)) { - // Then access the less efficient list of snapshots_ - ReadLock rl(&snapshots_mutex_); - // Items could have moved from the snapshots_ to snapshot_cache_ before - // accquiring the lock. To make sure that we do not miss a valid snapshot, - // read snapshot_cache_ again while holding the lock. - for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) { - snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire); - if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, - snapshot_seq, next_is_larger)) { - break; - } - } - for (auto snapshot_seq_2 : snapshots_) { - if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, - snapshot_seq_2, next_is_larger)) { - break; - } - } - } + CheckAgainstSnapshots(evicted); } bool succ = ExchangeCommitEntry(indexed_seq, evicted, {prepare_seq, commit_seq}); @@ -744,7 +635,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, } } -bool WritePreparedTxnDB::GetCommitEntry(uint64_t indexed_seq, +bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, CommitEntry* entry) { // TODO(myabandeh): implement lock-free commit_cache_ ReadLock rl(&commit_cache_mutex_); @@ -752,8 +643,8 @@ bool WritePreparedTxnDB::GetCommitEntry(uint64_t indexed_seq, return (entry->commit_seq != 0); // initialized } -bool WritePreparedTxnDB::AddCommitEntry(uint64_t indexed_seq, - CommitEntry& new_entry, +bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq, + const CommitEntry& new_entry, CommitEntry* evicted_entry) { // TODO(myabandeh): implement lock-free commit_cache_ WriteLock wl(&commit_cache_mutex_); @@ -762,9 +653,9 @@ bool WritePreparedTxnDB::AddCommitEntry(uint64_t indexed_seq, return (evicted_entry->commit_seq != 0); // initialized } -bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq, - CommitEntry& expected_entry, - CommitEntry new_entry) { +bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, + const CommitEntry& expected_entry, + const CommitEntry& new_entry) { // TODO(myabandeh): implement lock-free commit_cache_ WriteLock wl(&commit_cache_mutex_); auto& evicted_entry = commit_cache_[indexed_seq]; @@ -775,11 +666,167 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq, return true; } +void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, + SequenceNumber& new_max) { + // When max_evicted_seq_ advances, move older entries from prepared_txns_ + // to delayed_prepared_. This guarantees that if a seq is lower than max, + // then it is not in prepared_txns_ ans save an expensive, synchronized + // lookup from a shared set. delayed_prepared_ is expected to be empty in + // normal cases. + { + WriteLock wl(&prepared_mutex_); + while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { + auto to_be_popped = prepared_txns_.top(); + delayed_prepared_.insert(to_be_popped); + prepared_txns_.pop(); + delayed_prepared_empty_.store(false, std::memory_order_release); + } + } + + // With each change to max_evicted_seq_ fetch the live snapshots behind it + SequenceNumber curr_seq; + std::vector snapshots; + bool update_snapshots = false; + { + InstrumentedMutex(db_impl_->mutex()); + // We use this to identify how fresh are the snapshot list. Since this + // is done atomically with obtaining the snapshot list, the one with + // the larger seq is more fresh. If the seq is equal the full snapshot + // list could be different since taking snapshots does not increase + // the db seq. However since we only care about snapshots before the + // new max, such recent snapshots would not be included the in the + // list anyway. + curr_seq = db_impl_->GetLatestSequenceNumber(); + if (curr_seq > snapshots_version_) { + // This is to avoid updating the snapshots_ if it already updated + // with a more recent vesion by a concrrent thread + update_snapshots = true; + // We only care about snapshots lower then max + snapshots = db_impl_->snapshots().GetAll(nullptr, new_max); + } + } + if (update_snapshots) { + UpdateSnapshots(snapshots, curr_seq); + } + // TODO(myabandeh): check if it worked with relaxed ordering + while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak( + prev_max, new_max, std::memory_order_release, + std::memory_order_acquire)) { + }; +} + // 10m entry, 80MB size size_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = static_cast(1 << 21); size_t WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = static_cast(1 << 7); +void WritePreparedTxnDB::UpdateSnapshots( + const std::vector& snapshots, + const SequenceNumber& version) { + TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start"); + TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start"); +#ifndef NDEBUG + size_t sync_i = 0; +#endif + WriteLock wl(&snapshots_mutex_); + snapshots_version_ = version; + // We update the list concurrently with the readers. + // Both new and old lists are sorted and the new list is subset of the + // previous list plus some new items. Thus if a snapshot repeats in + // both new and old lists, it will appear upper in the new list. So if + // we simply insert the new snapshots in order, if an overwritten item + // is still valid in the new list is either written to the same place in + // the array or it is written in a higher palce before it gets + // overwritten by another item. This guarantess a reader that reads the + // list bottom-up will eventaully see a snapshot that repeats in the + // update, either before it gets overwritten by the writer or + // afterwards. + size_t i = 0; + auto it = snapshots.begin(); + for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) { + snapshot_cache_[i].store(*it, std::memory_order_release); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); + } +#ifndef NDEBUG + // Release the remaining sync points since they are useless given that the + // reader would also use lock to access snapshots + for (++sync_i; sync_i <= 10; ++sync_i) { + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); + } +#endif + snapshots_.clear(); + for (; it != snapshots.end(); it++) { + // Insert them to a vector that is less efficient to access + // concurrently + snapshots_.push_back(*it); + } + // Update the size at the end. Otherwise a parallel reader might read + // items that are not set yet. + snapshots_total_.store(snapshots.size(), std::memory_order_release); + TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end"); + TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end"); +} + +void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) { + TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start"); + TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start"); +#ifndef NDEBUG + size_t sync_i = 0; +#endif + // First check the snapshot cache that is efficient for concurrent access + auto cnt = snapshots_total_.load(std::memory_order_acquire); + // The list might get updated concurrently as we are reading from it. The + // reader should be able to read all the snapshots that are still valid + // after the update. Since the survived snapshots are written in a higher + // place before gets overwritten the reader that reads bottom-up will + // eventully see it. + const bool next_is_larger = true; + SequenceNumber snapshot_seq = kMaxSequenceNumber; + size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE); + for (; 0 < ip1; ip1--) { + snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", + ++sync_i); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); + if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, + snapshot_seq, !next_is_larger)) { + break; + } + } +#ifndef NDEBUG + // Release the remaining sync points before accquiring the lock + for (++sync_i; sync_i <= 10; ++sync_i) { + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i); + TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); + } +#endif + TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end"); + TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end"); + if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE && + snapshot_seq < evicted.prep_seq)) { + // Then access the less efficient list of snapshots_ + ReadLock rl(&snapshots_mutex_); + // Items could have moved from the snapshots_ to snapshot_cache_ before + // accquiring the lock. To make sure that we do not miss a valid snapshot, + // read snapshot_cache_ again while holding the lock. + for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) { + snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire); + if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, + snapshot_seq, next_is_larger)) { + break; + } + } + for (auto snapshot_seq_2 : snapshots_) { + if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, + snapshot_seq_2, next_is_larger)) { + break; + } + } + } +} + bool WritePreparedTxnDB::MaybeUpdateOldCommitMap( const uint64_t& prep_seq, const uint64_t& commit_seq, const uint64_t& snapshot_seq, const bool next_is_larger = true) { diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index e3eec6b60..e9ea8b1ff 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -109,6 +109,9 @@ class PessimisticTransactionDB : public TransactionDB { uint64_t commit_seq; CommitEntry() : prep_seq(0), commit_seq(0) {} CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} + bool operator==(const CommitEntry& rhs) const { + return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; + } }; protected: @@ -196,6 +199,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { private: friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; + friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; + friend class WritePreparedTransactionTest_CommitMapTest_Test; + friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test; + friend class WritePreparedTransactionTest; + friend class PreparedHeap_BasicsTest_Test; void init(const TransactionDBOptions& /* unused */) { snapshot_cache_ = unique_ptr[]>( @@ -223,13 +231,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { heap_.pop(); erased_heap_.pop(); } + while (heap_.empty() && !erased_heap_.empty()) { + erased_heap_.pop(); + } } void erase(uint64_t seq) { if (!heap_.empty()) { if (seq < heap_.top()) { // Already popped, ignore it. } else if (heap_.top() == seq) { - heap_.pop(); + pop(); } else { // (heap_.top() > seq) // Down the heap, remember to pop it later erased_heap_.push(seq); @@ -240,17 +251,45 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Get the commit entry with index indexed_seq from the commit table. It // returns true if such entry exists. - bool GetCommitEntry(uint64_t indexed_seq, CommitEntry* entry); + bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry* entry); + // Rewrite the entry with the index indexed_seq in the commit table with the // commit entry . If the rewrite results into eviction, // sets the evicted_entry and returns true. - bool AddCommitEntry(uint64_t indexed_seq, CommitEntry& new_entry, + bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, CommitEntry* evicted_entry); + // Rewrite the entry with the index indexed_seq in the commit table with the // commit entry new_entry only if the existing entry matches the // expected_entry. Returns false otherwise. - bool ExchangeCommitEntry(uint64_t indexed_seq, CommitEntry& expected_entry, - CommitEntry new_entry); + bool ExchangeCommitEntry(const uint64_t indexed_seq, + const CommitEntry& expected_entry, + const CommitEntry& new_entry); + + // Increase max_evicted_seq_ from the previous value prev_max to the new + // value. This also involves taking care of prepared txns that are not + // committed before new_max, as well as updating the list of live snapshots at + // the time of updating the max. Thread-safety: this function can be called + // concurrently. The concurrent invocations of this function is equivalent to + // a serial invocation in which the last invocation is the one with the + // largetst new_max value. + void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max); + + // Update the list of snapshots corresponding to the soon-to-be-updated + // max_eviceted_seq_. Thread-safety: this function can be called concurrently. + // The concurrent invocations of this function is equivalent to a serial + // invocation in which the last invocation is the one with the largetst + // version value. + void UpdateSnapshots(const std::vector& snapshots, + const SequenceNumber& version); + + // Check an evicted entry against live snapshots to see if it should be kept + // around or it can be safely discarded (and hence assume committed for all + // snapshots). Thread-safety: this function can be called concurrently. If it + // is called concurrently with multiple UpdateSnapshots, the result is the + // same as checking the intersection of the snapshot list before updates with + // the snapshot list of all the concurrent updates. + void CheckAgainstSnapshots(const CommitEntry& evicted); // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < // commit_seq. Return false if checking the next snapshot(s) is not needed. diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index eac8e563d..ab88da40b 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -9,7 +9,8 @@ #define __STDC_FORMAT_MACROS #endif -#include +#include "utilities/transactions/transaction_test.h" + #include #include #include @@ -38,114 +39,20 @@ using std::string; namespace rocksdb { -class TransactionTest : public ::testing::TestWithParam< - std::tuple> { - public: - TransactionDB* db; - FaultInjectionTestEnv* env; - string dbname; - Options options; - - TransactionDBOptions txn_db_options; - - TransactionTest() { - options.create_if_missing = true; - options.max_write_buffer_number = 2; - options.write_buffer_size = 4 * 1024; - options.level0_file_num_compaction_trigger = 2; - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - env = new FaultInjectionTestEnv(Env::Default()); - options.env = env; - options.concurrent_prepare = std::get<1>(GetParam()); - dbname = test::TmpDir() + "/transaction_testdb"; - - DestroyDB(dbname, options); - txn_db_options.transaction_lock_timeout = 0; - txn_db_options.default_lock_timeout = 0; - txn_db_options.write_policy = std::get<2>(GetParam()); - Status s; - if (std::get<0>(GetParam()) == false) { - s = TransactionDB::Open(options, txn_db_options, dbname, &db); - } else { - s = OpenWithStackableDB(); - } - assert(s.ok()); - } - - ~TransactionTest() { - delete db; - DestroyDB(dbname, options); - delete env; - } - - Status ReOpenNoDelete() { - delete db; - db = nullptr; - env->AssertNoOpenFile(); - env->DropUnsyncedFileData(); - env->ResetState(); - Status s; - if (std::get<0>(GetParam()) == false) { - s = TransactionDB::Open(options, txn_db_options, dbname, &db); - } else { - s = OpenWithStackableDB(); - } - return s; - } - - Status ReOpen() { - delete db; - DestroyDB(dbname, options); - Status s; - if (std::get<0>(GetParam()) == false) { - s = TransactionDB::Open(options, txn_db_options, dbname, &db); - } else { - s = OpenWithStackableDB(); - } - return s; - } - - Status OpenWithStackableDB() { - std::vector compaction_enabled_cf_indices; - std::vector column_families{ColumnFamilyDescriptor( - kDefaultColumnFamilyName, ColumnFamilyOptions(options))}; - - TransactionDB::PrepareWrap(&options, &column_families, - &compaction_enabled_cf_indices); - std::vector handles; - DB* root_db; - Options options_copy(options); - Status s = - DB::Open(options_copy, dbname, column_families, &handles, &root_db); - if (s.ok()) { - assert(handles.size() == 1); - s = TransactionDB::WrapStackableDB( - new StackableDB(root_db), txn_db_options, - compaction_enabled_cf_indices, handles, &db); - delete handles[0]; - } - return s; - } -}; - -class MySQLStyleTransactionTest : public TransactionTest {}; -class WritePreparedTransactionTest : public TransactionTest {}; - -static const TxnDBWritePolicy wc = WRITE_COMMITTED; -static const TxnDBWritePolicy wp = WRITE_PREPARED; // TODO(myabandeh): Instantiate the tests with other write policies INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(false, false, wc))); + ::testing::Values(std::make_tuple(false, false, + WRITE_COMMITTED))); INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(true, false, wc))); -INSTANTIATE_TEST_CASE_P(MySQLStyleTransactionTest, MySQLStyleTransactionTest, - ::testing::Values(std::make_tuple(false, false, wc), - std::make_tuple(false, true, wc), - std::make_tuple(true, false, wc), - std::make_tuple(true, true, wc))); -INSTANTIATE_TEST_CASE_P(WritePreparedTransactionTest, - WritePreparedTransactionTest, - ::testing::Values(std::make_tuple(false, true, wp))); + ::testing::Values(std::make_tuple(true, false, + WRITE_COMMITTED))); +INSTANTIATE_TEST_CASE_P( + MySQLStyleTransactionTest, MySQLStyleTransactionTest, + ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), + std::make_tuple(false, true, WRITE_COMMITTED), + std::make_tuple(true, false, WRITE_COMMITTED), + std::make_tuple(true, true, WRITE_COMMITTED))); + TEST_P(TransactionTest, DoubleEmptyWrite) { WriteOptions write_options; @@ -4733,138 +4640,6 @@ TEST_P(TransactionTest, MemoryLimitTest) { delete txn; } -// Test WritePreparedTxnDB's IsInSnapshot against different ordering of -// snapshot, max_committed_seq_, prepared, and commit entries. -TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { - WriteOptions wo; - // Use small commit cache to trigger lots of eviction and fast advance of - // max_evicted_seq_ - // will take effect after ReOpen - WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8; - // Same for snapshot cache size - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5; - - // Take some preliminary snapshots first. This is to stress the data structure - // that holds the old snapshots as it will be designed to be efficient when - // only a few snapshots are below the max_evicted_seq_. - for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) { - // Leave some gap between the preliminary snapshots and the final snapshot - // that we check. This should test for also different overlapping scnearios - // between the last snapshot and the commits. - for (int max_gap = 1; max_gap < 10; max_gap++) { - // Since we do not actually write to db, we mock the seq as it would be - // increaased by the db. The only exception is that we need db seq to - // advance for our snapshots. for which we apply a dummy put each time we - // increase our mock of seq. - uint64_t seq = 0; - // At each step we prepare a txn and then we commit it in the next txn. - // This emulates the consecuitive transactions that write to the same key - uint64_t cur_txn = 0; - // Number of snapshots taken so far - int num_snapshots = 0; - std::vector to_be_released; - // Number of gaps applied so far - int gap_cnt = 0; - // The final snapshot that we will inspect - uint64_t snapshot = 0; - bool found_committed = false; - // To stress the data structure that maintain prepared txns, at each cycle - // we add a new prepare txn. These do not mean to be committed for - // snapshot inspection. - std::set prepared; - // We keep the list of txns comitted before we take the last snaphot. - // These should be the only seq numbers that will be found in the snapshot - std::set committed_before; - ReOpen(); // to restart the db - WritePreparedTxnDB* wp_db = dynamic_cast(db); - assert(wp_db); - assert(wp_db->db_impl_); - // We continue until max advances a bit beyond the snapshot. - while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { - // do prepare for a transaction - wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq - seq++; - ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); - wp_db->AddPrepared(seq); - prepared.insert(seq); - - // If cur_txn is not started, do prepare for it. - if (!cur_txn) { - wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq - seq++; - ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); - cur_txn = seq; - wp_db->AddPrepared(cur_txn); - } else { // else commit it - wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq - seq++; - ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); - wp_db->AddCommitted(cur_txn, seq); - if (!snapshot) { - committed_before.insert(cur_txn); - } - cur_txn = 0; - } - - if (num_snapshots < max_snapshots - 1) { - // Take preliminary snapshots - auto tmp_snapshot = db->GetSnapshot(); - to_be_released.push_back(tmp_snapshot); - num_snapshots++; - } else if (gap_cnt < max_gap) { - // Wait for some gap before taking the final snapshot - gap_cnt++; - } else if (!snapshot) { - // Take the final snapshot if it is not already taken - auto tmp_snapshot = db->GetSnapshot(); - to_be_released.push_back(tmp_snapshot); - snapshot = tmp_snapshot->GetSequenceNumber(); - // We increase the db seq artificailly by a dummy Put. Check that this - // technique is effective and db seq is that same as ours. - ASSERT_EQ(snapshot, seq); - num_snapshots++; - } - - // If the snapshot is taken, verify seq numbers visible to it. We redo - // it at each cycle to test that the system is still sound when - // max_evicted_seq_ advances. - if (snapshot) { - for (uint64_t s = 0; s <= seq; s++) { - bool was_committed = - (committed_before.find(s) != committed_before.end()); - bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot); - if (was_committed != is_in_snapshot) { - printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64 - " snapshot %" PRIu64 - " gap_cnt %d num_snapshots %d s %" PRIu64 "\n", - max_snapshots, max_gap, seq, - wp_db->max_evicted_seq_.load(), snapshot, gap_cnt, - num_snapshots, s); - } - ASSERT_EQ(was_committed, is_in_snapshot); - found_committed = found_committed || is_in_snapshot; - } - } - } - // Safety check to make sure the test actually ran - ASSERT_TRUE(found_committed); - // As an extra check, check if prepared set will be properly empty after - // they are committed. - if (cur_txn) { - wp_db->AddCommitted(cur_txn, seq); - } - for (auto p : prepared) { - wp_db->AddCommitted(p, seq); - } - ASSERT_TRUE(wp_db->delayed_prepared_.empty()); - ASSERT_TRUE(wp_db->prepared_txns_.empty()); - for (auto s : to_be_released) { - db->ReleaseSnapshot(s); - } - } - } -} - } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h new file mode 100644 index 000000000..7fc028a4e --- /dev/null +++ b/utilities/transactions/transaction_test.h @@ -0,0 +1,131 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include +#include +#include +#include +#include + +#include "db/db_impl.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "table/mock_table.h" +#include "util/fault_injection_test_env.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "util/testharness.h" +#include "util/testutil.h" +#include "util/transaction_test_util.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/string_append/stringappend.h" +#include "utilities/transactions/pessimistic_transaction_db.h" + +#include "port/port.h" + +namespace rocksdb { + +class TransactionTest : public ::testing::TestWithParam< + std::tuple> { + public: + TransactionDB* db; + FaultInjectionTestEnv* env; + std::string dbname; + Options options; + + TransactionDBOptions txn_db_options; + + TransactionTest() { + options.create_if_missing = true; + options.max_write_buffer_number = 2; + options.write_buffer_size = 4 * 1024; + options.level0_file_num_compaction_trigger = 2; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + env = new FaultInjectionTestEnv(Env::Default()); + options.env = env; + options.concurrent_prepare = std::get<1>(GetParam()); + dbname = test::TmpDir() + "/transaction_testdb"; + + DestroyDB(dbname, options); + txn_db_options.transaction_lock_timeout = 0; + txn_db_options.default_lock_timeout = 0; + txn_db_options.write_policy = std::get<2>(GetParam()); + Status s; + if (std::get<0>(GetParam()) == false) { + s = TransactionDB::Open(options, txn_db_options, dbname, &db); + } else { + s = OpenWithStackableDB(); + } + assert(s.ok()); + } + + ~TransactionTest() { + delete db; + DestroyDB(dbname, options); + delete env; + } + + Status ReOpenNoDelete() { + delete db; + db = nullptr; + env->AssertNoOpenFile(); + env->DropUnsyncedFileData(); + env->ResetState(); + Status s; + if (std::get<0>(GetParam()) == false) { + s = TransactionDB::Open(options, txn_db_options, dbname, &db); + } else { + s = OpenWithStackableDB(); + } + return s; + } + + Status ReOpen() { + delete db; + DestroyDB(dbname, options); + Status s; + if (std::get<0>(GetParam()) == false) { + s = TransactionDB::Open(options, txn_db_options, dbname, &db); + } else { + s = OpenWithStackableDB(); + } + return s; + } + + Status OpenWithStackableDB() { + std::vector compaction_enabled_cf_indices; + std::vector column_families{ColumnFamilyDescriptor( + kDefaultColumnFamilyName, ColumnFamilyOptions(options))}; + + TransactionDB::PrepareWrap(&options, &column_families, + &compaction_enabled_cf_indices); + std::vector handles; + DB* root_db; + Options options_copy(options); + Status s = + DB::Open(options_copy, dbname, column_families, &handles, &root_db); + if (s.ok()) { + assert(handles.size() == 1); + s = TransactionDB::WrapStackableDB( + new StackableDB(root_db), txn_db_options, + compaction_enabled_cf_indices, handles, &db); + delete handles[0]; + } + return s; + } +}; + +class MySQLStyleTransactionTest : public TransactionTest {}; + +} // namespace rocksdb diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc new file mode 100644 index 000000000..0015ac14f --- /dev/null +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -0,0 +1,569 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "utilities/transactions/transaction_test.h" + +#include +#include +#include +#include +#include + +#include "db/db_impl.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "table/mock_table.h" +#include "util/fault_injection_test_env.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "util/testharness.h" +#include "util/testutil.h" +#include "util/transaction_test_util.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/string_append/stringappend.h" +#include "utilities/transactions/pessimistic_transaction_db.h" + +#include "port/port.h" + +using std::string; + +namespace rocksdb { + +using CommitEntry = PessimisticTransactionDB::CommitEntry; + +TEST(PreparedHeap, BasicsTest) { + WritePreparedTxnDB::PreparedHeap heap; + heap.push(14l); + // Test with one element + ASSERT_EQ(14l, heap.top()); + heap.push(24l); + heap.push(34l); + // Test that old min is still on top + ASSERT_EQ(14l, heap.top()); + heap.push(13l); + // Test that the new min will be on top + ASSERT_EQ(13l, heap.top()); + // Test that it is persistent + ASSERT_EQ(13l, heap.top()); + heap.push(44l); + heap.push(54l); + heap.push(64l); + heap.push(74l); + heap.push(84l); + // Test that old min is still on top + ASSERT_EQ(13l, heap.top()); + heap.erase(24l); + // Test that old min is still on top + ASSERT_EQ(13l, heap.top()); + heap.erase(14l); + // Test that old min is still on top + ASSERT_EQ(13l, heap.top()); + heap.erase(13l); + // Test that the new comes to the top after multiple erase + ASSERT_EQ(34l, heap.top()); + heap.erase(34l); + // Test that the new comes to the top after single erase + ASSERT_EQ(44l, heap.top()); + heap.erase(54l); + ASSERT_EQ(44l, heap.top()); + heap.pop(); // pop 44l + // Test that the erased items are ignored after pop + ASSERT_EQ(64l, heap.top()); + heap.erase(44l); + // Test that erasing an already popped item would work + ASSERT_EQ(64l, heap.top()); + heap.erase(84l); + ASSERT_EQ(64l, heap.top()); + heap.push(85l); + heap.push(86l); + heap.push(87l); + heap.push(88l); + heap.push(89l); + heap.erase(87l); + heap.erase(85l); + heap.erase(89l); + heap.erase(86l); + heap.erase(88l); + // Test top remians the same after a ranodm order of many erases + ASSERT_EQ(64l, heap.top()); + heap.pop(); + // Test that pop works with a series of random pending erases + ASSERT_EQ(74l, heap.top()); + ASSERT_FALSE(heap.empty()); + heap.pop(); + // Test that empty works + ASSERT_TRUE(heap.empty()); +} + +class WritePreparedTransactionTest : public TransactionTest { + protected: + // If expect_update is set, check if it actually updated old_commit_map_. If + // it did not and yet suggested not to check the next snapshot, do the + // opposite to check if it was not a bad suggstion. + void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit, + uint64_t snapshot, + uint64_t next_snapshot, + bool expect_update) { + WritePreparedTxnDB* wp_db = dynamic_cast(db); + // reset old_commit_map_empty_ so that its value indicate whether + // old_commit_map_ was updated + wp_db->old_commit_map_empty_ = true; + bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot, + snapshot < next_snapshot); + if (expect_update == wp_db->old_commit_map_empty_) { + printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64 + " next: %" PRIu64 "\n", + prepare, commit, snapshot, next_snapshot); + } + EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_); + if (!check_next && wp_db->old_commit_map_empty_) { + // do the oppotisite to make sure it was not a bad suggestion + const bool dont_care_bool = true; + wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot, + dont_care_bool); + if (!wp_db->old_commit_map_empty_) { + printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64 + " next: %" PRIu64 "\n", + prepare, commit, snapshot, next_snapshot); + } + EXPECT_TRUE(wp_db->old_commit_map_empty_); + } + } + + // Test that a CheckAgainstSnapshots thread reading old_snapshots will not + // miss a snapshot because of a concurrent update by UpdateSnapshots that is + // writing new_snapshots. Both threads are broken at two points. The sync + // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry + // entry is expected to be vital for one of the snapshots that is common + // between the old and new list of snapshots. + void SnapshotConcurrentAccessTestInternal( + WritePreparedTxnDB* wp_db, + const std::vector& old_snapshots, + const std::vector& new_snapshots, CommitEntry& entry, + SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) { + // First reset the snapshot list + const std::vector empty_snapshots; + wp_db->old_commit_map_empty_ = true; + wp_db->UpdateSnapshots(empty_snapshots, ++version); + // Then initialize it with the old_snapshots + wp_db->UpdateSnapshots(old_snapshots, ++version); + + // Starting from the first thread, cut each thread at two points + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1), + "WritePreparedTxnDB::UpdateSnapshots:s:start"}, + {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1), + "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)}, + {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2), + "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)}, + {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2), + "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)}, + {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end", + "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + { + ASSERT_TRUE(wp_db->old_commit_map_empty_); + rocksdb::port::Thread t1( + [&]() { wp_db->UpdateSnapshots(new_snapshots, version); }); + rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); }); + t1.join(); + t2.join(); + ASSERT_FALSE(wp_db->old_commit_map_empty_); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + wp_db->old_commit_map_empty_ = true; + wp_db->UpdateSnapshots(empty_snapshots, ++version); + wp_db->UpdateSnapshots(old_snapshots, ++version); + // Starting from the second thread, cut each thread at two points + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1), + "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"}, + {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1), + "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)}, + {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2), + "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)}, + {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2), + "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)}, + {"WritePreparedTxnDB::UpdateSnapshots:p:end", + "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + { + ASSERT_TRUE(wp_db->old_commit_map_empty_); + rocksdb::port::Thread t1( + [&]() { wp_db->UpdateSnapshots(new_snapshots, version); }); + rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); }); + t1.join(); + t2.join(); + ASSERT_FALSE(wp_db->old_commit_map_empty_); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } +}; + +INSTANTIATE_TEST_CASE_P(WritePreparedTransactionTest, + WritePreparedTransactionTest, + ::testing::Values(std::make_tuple(false, true, + WRITE_PREPARED))); + +TEST_P(WritePreparedTransactionTest, CommitMapTest) { + WritePreparedTxnDB* wp_db = dynamic_cast(db); + assert(wp_db); + assert(wp_db->db_impl_); + size_t size = wp_db->COMMIT_CACHE_SIZE; + CommitEntry c = {5, 12}, e; + bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e); + ASSERT_FALSE(evicted); + + // Should be able to read the same value + bool found = wp_db->GetCommitEntry(c.prep_seq % size, &e); + ASSERT_TRUE(found); + ASSERT_EQ(c, e); + // Should be able to distinguish between overlapping entries + found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &e); + ASSERT_TRUE(found); + ASSERT_NE(c.prep_seq + size, e.prep_seq); + // Should be able to detect non-existent entry + found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &e); + ASSERT_EQ(e.commit_seq, 0); + ASSERT_FALSE(found); + + // Reject an invalid exchange + CommitEntry e2 = {c.prep_seq + size, c.commit_seq}; + bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2, e); + ASSERT_FALSE(exchanged); + // check whether it did actually reject that + found = wp_db->GetCommitEntry(e2.prep_seq % size, &e); + ASSERT_TRUE(found); + ASSERT_EQ(c, e); + + // Accept a valid exchange + CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1}; + exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c, e3); + ASSERT_TRUE(exchanged); + // check whether it did actually accepted that + found = wp_db->GetCommitEntry(c.prep_seq % size, &e); + ASSERT_TRUE(found); + ASSERT_EQ(e3, e); + + // Rewrite an entry + CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1}; + evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e); + ASSERT_TRUE(evicted); + ASSERT_EQ(e3, e); + found = wp_db->GetCommitEntry(e4.prep_seq % size, &e); + ASSERT_TRUE(found); + ASSERT_EQ(e4, e); +} + +TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) { + // If prepare <= snapshot < commit we should keep the entry around since its + // nonexistence could be interpreted as committed in the snapshot while it is + // not true. We keep such entries around by adding them to the + // old_commit_map_. + uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/; + p = 10l, c = 15l, s = 20l, ns = 21l; + MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); + // If we do not expect the old commit map to be updated, try also with a next + // snapshot that is expected to update the old commit map. This would test + // that MaybeUpdateOldCommitMap would not prevent us from checking the next + // snapshot that must be checked. + p = 10l, c = 15l, s = 20l, ns = 11l; + MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); + + p = 10l, c = 20l, s = 20l, ns = 19l; + MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); + p = 10l, c = 20l, s = 20l, ns = 21l; + MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); + + p = 20l, c = 20l, s = 20l, ns = 21l; + MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); + p = 20l, c = 20l, s = 20l, ns = 19l; + MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); + + p = 10l, c = 25l, s = 20l, ns = 21l; + MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true); + + p = 20l, c = 25l, s = 20l, ns = 21l; + MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true); + + p = 21l, c = 25l, s = 20l, ns = 22l; + MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); + p = 21l, c = 25l, s = 20l, ns = 19l; + MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); +} + +TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) { + std::vector snapshots = {100l, 200l, 300l, 400l, + 500l, 600l, 700l}; + // will take effect after ReOpen + WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = snapshots.size() / 2; + ReOpen(); // to restart the db + WritePreparedTxnDB* wp_db = dynamic_cast(db); + assert(wp_db); + assert(wp_db->db_impl_); + SequenceNumber version = 1000l; + ASSERT_EQ(0, wp_db->snapshots_total_); + wp_db->UpdateSnapshots(snapshots, version); + ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_); + // seq numbers are chosen so that we have two of them between each two + // snapshots. If the diff of two consecuitive seq is more than 5, there is a + // snapshot between them. + std::vector seqs = {50l, 55l, 150l, 155l, 250l, 255l, + 350l, 355l, 450l, 455l, 550l, 555l, + 650l, 655l, 750l, 755l}; + assert(seqs.size() > 1); + for (size_t i = 0; i < seqs.size() - 1; i++) { + wp_db->old_commit_map_empty_ = true; // reset + CommitEntry commit_entry = {seqs[i], seqs[i + 1]}; + wp_db->CheckAgainstSnapshots(commit_entry); + // Expect update if there is snapshot in between the prepare and commit + bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 && + commit_entry.commit_seq >= snapshots.front() && + commit_entry.prep_seq <= snapshots.back(); + ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_); + } +} + +// Return true if the ith bit is set in combination represented by comb +bool IsInCombination(size_t i, size_t comb) { return comb & (1 << i); } + +// Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in +// parallel with UpdateSnapshots. +TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) { + // We have a sync point in the method under test after checking each snapshot. + // If you increase the max number of snapshots in this test, more sync points + // in the methods must also be added. + const std::vector snapshots = {10l, 20l, 30l, 40l, 50l, + 60l, 70l, 80l, 90l, 100l}; + SequenceNumber version = 1000l; + // Choose the cache size so that the new snapshot list could replace all the + // existing items in the cache and also have some overflow Will take effect + // after ReOpen + WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = (snapshots.size() - 2) / 2; + ReOpen(); // to restart the db + WritePreparedTxnDB* wp_db = dynamic_cast(db); + assert(wp_db); + assert(wp_db->db_impl_); + // Add up to 2 items that do not fit into the cache + for (size_t old_size = 1; + old_size <= WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE + 2; + old_size++) { + const std::vector old_snapshots( + snapshots.begin(), snapshots.begin() + old_size); + + // Each member of old snapshot might or might not appear in the new list. We + // create a common_snapshots for each combination. + size_t new_comb_cnt = static_cast(1 << old_size); + for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++) { + std::vector common_snapshots; + for (size_t i = 0; i < old_snapshots.size(); i++) { + if (IsInCombination(i, new_comb)) { + common_snapshots.push_back(old_snapshots[i]); + } + } + // And add some new snapshots to the common list + for (size_t added_snapshots = 0; + added_snapshots <= snapshots.size() - old_snapshots.size(); + added_snapshots++) { + std::vector new_snapshots = common_snapshots; + for (size_t i = 0; i < added_snapshots; i++) { + new_snapshots.push_back(snapshots[old_snapshots.size() + i]); + } + for (auto it = common_snapshots.begin(); it != common_snapshots.end(); + it++) { + auto snapshot = *it; + // Create a commit entry that is around the snapshot and thus should + // be not be discarded + CommitEntry entry = {static_cast(snapshot - 1), + snapshot + 1}; + // The critical part is when iterating the snapshot cache. Afterwards, + // we are operating under the lock + size_t a_range = + std::min(old_snapshots.size(), + WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE) + + 1; + size_t b_range = + std::min(new_snapshots.size(), + WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE) + + 1; + // Break each thread at two points + for (size_t a1 = 1; a1 <= a_range; a1++) { + for (size_t a2 = a1 + 1; a2 <= a_range; a2++) { + for (size_t b1 = 1; b1 <= b_range; b1++) { + for (size_t b2 = b1 + 1; b2 <= b_range; b2++) { + SnapshotConcurrentAccessTestInternal(wp_db, old_snapshots, + new_snapshots, entry, + version, a1, a2, b1, b2); + } + } + } + } + } + } + } + } +} + +// Test WritePreparedTxnDB's IsInSnapshot against different ordering of +// snapshot, max_committed_seq_, prepared, and commit entries. +TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { + WriteOptions wo; + // Use small commit cache to trigger lots of eviction and fast advance of + // max_evicted_seq_ + // will take effect after ReOpen + WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8; + // Same for snapshot cache size + WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5; + + // Take some preliminary snapshots first. This is to stress the data structure + // that holds the old snapshots as it will be designed to be efficient when + // only a few snapshots are below the max_evicted_seq_. + for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) { + // Leave some gap between the preliminary snapshots and the final snapshot + // that we check. This should test for also different overlapping scnearios + // between the last snapshot and the commits. + for (int max_gap = 1; max_gap < 10; max_gap++) { + // Since we do not actually write to db, we mock the seq as it would be + // increaased by the db. The only exception is that we need db seq to + // advance for our snapshots. for which we apply a dummy put each time we + // increase our mock of seq. + uint64_t seq = 0; + // At each step we prepare a txn and then we commit it in the next txn. + // This emulates the consecuitive transactions that write to the same key + uint64_t cur_txn = 0; + // Number of snapshots taken so far + int num_snapshots = 0; + std::vector to_be_released; + // Number of gaps applied so far + int gap_cnt = 0; + // The final snapshot that we will inspect + uint64_t snapshot = 0; + bool found_committed = false; + // To stress the data structure that maintain prepared txns, at each cycle + // we add a new prepare txn. These do not mean to be committed for + // snapshot inspection. + std::set prepared; + // We keep the list of txns comitted before we take the last snaphot. + // These should be the only seq numbers that will be found in the snapshot + std::set committed_before; + ReOpen(); // to restart the db + WritePreparedTxnDB* wp_db = dynamic_cast(db); + assert(wp_db); + assert(wp_db->db_impl_); + // We continue until max advances a bit beyond the snapshot. + while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { + // do prepare for a transaction + wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq + seq++; + ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); + wp_db->AddPrepared(seq); + prepared.insert(seq); + + // If cur_txn is not started, do prepare for it. + if (!cur_txn) { + wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq + seq++; + ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); + cur_txn = seq; + wp_db->AddPrepared(cur_txn); + } else { // else commit it + wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq + seq++; + ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); + wp_db->AddCommitted(cur_txn, seq); + if (!snapshot) { + committed_before.insert(cur_txn); + } + cur_txn = 0; + } + + if (num_snapshots < max_snapshots - 1) { + // Take preliminary snapshots + auto tmp_snapshot = db->GetSnapshot(); + to_be_released.push_back(tmp_snapshot); + num_snapshots++; + } else if (gap_cnt < max_gap) { + // Wait for some gap before taking the final snapshot + gap_cnt++; + } else if (!snapshot) { + // Take the final snapshot if it is not already taken + auto tmp_snapshot = db->GetSnapshot(); + to_be_released.push_back(tmp_snapshot); + snapshot = tmp_snapshot->GetSequenceNumber(); + // We increase the db seq artificailly by a dummy Put. Check that this + // technique is effective and db seq is that same as ours. + ASSERT_EQ(snapshot, seq); + num_snapshots++; + } + + // If the snapshot is taken, verify seq numbers visible to it. We redo + // it at each cycle to test that the system is still sound when + // max_evicted_seq_ advances. + if (snapshot) { + for (uint64_t s = 0; s <= seq; s++) { + bool was_committed = + (committed_before.find(s) != committed_before.end()); + bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot); + if (was_committed != is_in_snapshot) { + printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64 + " snapshot %" PRIu64 + " gap_cnt %d num_snapshots %d s %" PRIu64 "\n", + max_snapshots, max_gap, seq, + wp_db->max_evicted_seq_.load(), snapshot, gap_cnt, + num_snapshots, s); + } + ASSERT_EQ(was_committed, is_in_snapshot); + found_committed = found_committed || is_in_snapshot; + } + } + } + // Safety check to make sure the test actually ran + ASSERT_TRUE(found_committed); + // As an extra check, check if prepared set will be properly empty after + // they are committed. + if (cur_txn) { + wp_db->AddCommitted(cur_txn, seq); + } + for (auto p : prepared) { + wp_db->AddCommitted(p, seq); + } + ASSERT_TRUE(wp_db->delayed_prepared_.empty()); + ASSERT_TRUE(wp_db->prepared_txns_.empty()); + for (auto s : to_be_released) { + db->ReleaseSnapshot(s); + } + } + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + fprintf(stderr, + "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // ROCKSDB_LITE