diff --git a/util/transaction_test_util.cc b/util/transaction_test_util.cc index d439cf202..bb6412488 100644 --- a/util/transaction_test_util.cc +++ b/util/transaction_test_util.cc @@ -11,6 +11,8 @@ #include "util/transaction_test_util.h" #include +#include +#include #include #include @@ -45,8 +47,16 @@ RandomTransactionInserter::~RandomTransactionInserter() { bool RandomTransactionInserter::TransactionDBInsert( TransactionDB* db, const TransactionOptions& txn_options) { txn_ = db->BeginTransaction(write_options_, txn_options, txn_); - - return DoInsert(nullptr, txn_, false); + bool take_snapshot = rand_->OneIn(2); + if (take_snapshot) { + txn_->SetSnapshot(); + read_options_.snapshot = txn_->GetSnapshot(); + } + auto res = DoInsert(nullptr, txn_, false); + if (take_snapshot) { + read_options_.snapshot = nullptr; + } + return res; } bool RandomTransactionInserter::OptimisticTransactionDBInsert( @@ -62,52 +72,71 @@ bool RandomTransactionInserter::DBInsert(DB* db) { return DoInsert(db, nullptr, false); } +Status RandomTransactionInserter::DBGet( + DB* db, Transaction* txn, ReadOptions& read_options, uint16_t set_i, + uint64_t ikey, bool get_for_update, uint64_t* int_value, + std::string* full_key, bool* unexpected_error) { + Status s; + // four digits and zero end char + char prefix_buf[5]; + // Pad prefix appropriately so we can iterate over each set + snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1); + // key format: [SET#][random#] + std::string skey = ToString(ikey); + Slice base_key(skey); + *full_key = std::string(prefix_buf) + base_key.ToString(); + Slice key(*full_key); + + std::string value; + if (txn != nullptr) { + if (get_for_update) { + s = txn->GetForUpdate(read_options, key, &value); + } else { + s = txn->Get(read_options, key, &value); + } + } else { + s = db->Get(read_options, key, &value); + } + + if (s.ok()) { + // Found key, parse its value + *int_value = std::stoull(value); + if (*int_value == 0 || *int_value == ULONG_MAX) { + *unexpected_error = true; + fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str()); + s = Status::Corruption(); + } + } else if (s.IsNotFound()) { + // Have not yet written to this key, so assume its value is 0 + *int_value = 0; + s = Status::OK(); + } + return s; +} + bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, bool is_optimistic) { Status s; WriteBatch batch; - std::string value; // pick a random number to use to increment a key in each set uint64_t incr = (rand_->Next() % 100) + 1; - bool unexpected_error = false; + std::vector set_vec(num_sets_); + std::iota(set_vec.begin(), set_vec.end(), 0); + std::random_shuffle(set_vec.begin(), set_vec.end(), + [&](uint64_t r) { return rand_->Uniform(r); }); // For each set, pick a key at random and increment it - for (uint8_t i = 0; i < num_sets_; i++) { + for (uint16_t set_i : set_vec) { uint64_t int_value = 0; - char prefix_buf[5]; - // prefix_buf needs to be large enough to hold a uint16 in string form - - // key format: [SET#][random#] - std::string rand_key = ToString(rand_->Next() % num_keys_); - Slice base_key(rand_key); - - // Pad prefix appropriately so we can iterate over each set - snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", i + 1); - std::string full_key = std::string(prefix_buf) + base_key.ToString(); + std::string full_key; + uint64_t rand_key = rand_->Next() % num_keys_; + const bool get_for_update = txn ? rand_->OneIn(2) : false; + s = DBGet(db, txn, read_options_, set_i, rand_key, get_for_update, + &int_value, &full_key, &unexpected_error); Slice key(full_key); - - if (txn != nullptr) { - s = txn->GetForUpdate(read_options_, key, &value); - } else { - s = db->Get(read_options_, key, &value); - } - - if (s.ok()) { - // Found key, parse its value - int_value = std::stoull(value); - - if (int_value == 0 || int_value == ULONG_MAX) { - unexpected_error = true; - fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str()); - s = Status::Corruption(); - } - } else if (s.IsNotFound()) { - // Have not yet written to this key, so assume its value is 0 - int_value = 0; - s = Status::OK(); - } else { + if (!s.ok()) { // Optimistic transactions should never return non-ok status here. // Non-optimistic transactions may return write-coflict/timeout errors. if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { @@ -123,7 +152,11 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, std::string sum = ToString(int_value + incr); if (txn != nullptr) { s = txn->Put(key, sum); - if (!s.ok()) { + if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) { + // If the initial get was not for update, then the key is not locked + // before put and put could fail due to concurrent writes. + break; + } else if (!s.ok()) { // Since we did a GetForUpdate, Put should not fail. fprintf(stderr, "Put returned an unexpected error: %s\n", s.ToString().c_str()); @@ -143,9 +176,22 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, snprintf(name, 64, "txn%zu-%d", hasher(std::this_thread::get_id()), txn_id_++); assert(strlen(name) < 64 - 1); - txn->SetName(name); - s = txn->Prepare(); - s = txn->Commit(); + if (!is_optimistic && !rand_->OneIn(10)) { + // also try commit without prpare + txn->SetName(name); + s = txn->Prepare(); + assert(s.ok()); + } + // TODO(myabandeh): enable this when WritePreparedTxnDB::RollbackPrepared + // is updated to handle in-the-middle rollbacks. + if (!rand_->OneIn(0)) { + s = txn->Commit(); + } else { + // Also try 5% rollback + s = txn->Rollback(); + assert(s.ok()); + } + assert(is_optimistic || s.ok()); if (!s.ok()) { if (is_optimistic) { @@ -168,7 +214,6 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, s.ToString().c_str()); } } - } else { s = db->Write(write_options_, &batch); if (!s.ok()) { @@ -179,7 +224,7 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, } } else { if (txn != nullptr) { - txn->Rollback(); + assert(txn->Rollback().ok()); } } @@ -195,48 +240,80 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, return !unexpected_error; } -Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets) { +// Verify that the sum of the keys in each set are equal +Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets, + uint64_t num_keys_per_set, + bool take_snapshot, Random64* rand) { uint64_t prev_total = 0; + uint32_t prev_i = 0; + bool prev_assigned = false; + ReadOptions roptions; + if (take_snapshot) { + roptions.snapshot = db->GetSnapshot(); + } + + std::vector set_vec(num_sets); + std::iota(set_vec.begin(), set_vec.end(), 0); + if (rand) { + std::random_shuffle(set_vec.begin(), set_vec.end(), + [&](uint64_t r) { return rand->Uniform(r); }); + } // For each set of keys with the same prefix, sum all the values - for (uint32_t i = 0; i < num_sets; i++) { - char prefix_buf[6]; - snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", i + 1); + for (uint16_t set_i : set_vec) { + // four digits and zero end char + char prefix_buf[5]; + snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1); uint64_t total = 0; - Iterator* iter = db->NewIterator(ReadOptions()); - - for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) { - Slice key = iter->key(); - - // stop when we reach a different prefix - if (key.ToString().compare(0, 4, prefix_buf) != 0) { - break; + // Use either point lookup or iterator. Point lookups are slower so we use + // it less often. + if (num_keys_per_set != 0 && rand && rand->OneIn(10)) { // use point lookup + ReadOptions read_options; + for (uint64_t k = 0; k < num_keys_per_set; k++) { + std::string dont_care; + uint64_t int_value = 0; + bool unexpected_error = false; + const bool FOR_UPDATE = false; + Status s = DBGet(db, nullptr, roptions, set_i, k, FOR_UPDATE, + &int_value, &dont_care, &unexpected_error); + assert(s.ok()); + assert(!unexpected_error); + total += int_value; } - - Slice value = iter->value(); - uint64_t int_value = std::stoull(value.ToString()); - if (int_value == 0 || int_value == ULONG_MAX) { - fprintf(stderr, "Iter returned unexpected value: %s\n", - value.ToString().c_str()); - return Status::Corruption(); + } else { // user iterators + Iterator* iter = db->NewIterator(roptions); + for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) { + Slice key = iter->key(); + // stop when we reach a different prefix + if (key.ToString().compare(0, 4, prefix_buf) != 0) { + break; + } + Slice value = iter->value(); + uint64_t int_value = std::stoull(value.ToString()); + if (int_value == 0 || int_value == ULONG_MAX) { + fprintf(stderr, "Iter returned unexpected value: %s\n", + value.ToString().c_str()); + return Status::Corruption(); + } + total += int_value; } - - total += int_value; + delete iter; } - delete iter; - if (i > 0) { - if (total != prev_total) { - fprintf(stderr, - "RandomTransactionVerify found inconsistent totals. " - "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 - " \n", - i - 1, prev_total, i, total); - return Status::Corruption(); - } + if (prev_assigned && total != prev_total) { + fprintf(stdout, + "RandomTransactionVerify found inconsistent totals. " + "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 " \n", + prev_i, prev_total, set_i, total); + return Status::Corruption(); } prev_total = total; + prev_i = set_i; + prev_assigned = true; + } + if (take_snapshot) { + db->ReleaseSnapshot(roptions.snapshot); } return Status::OK(); diff --git a/util/transaction_test_util.h b/util/transaction_test_util.h index 768fc5283..414a4267e 100644 --- a/util/transaction_test_util.h +++ b/util/transaction_test_util.h @@ -68,8 +68,15 @@ class RandomTransactionInserter { // Error status may be obtained by calling GetLastStatus(). bool DBInsert(DB* db); + // Get the ikey'th key from set set_i + static Status DBGet(DB* db, Transaction* txn, ReadOptions& read_options, + uint16_t set_i, uint64_t ikey, bool get_for_update, + uint64_t* int_value, std::string* full_key, + bool* unexpected_error); + // Returns OK if Invariant is true. - static Status Verify(DB* db, uint16_t num_sets); + static Status Verify(DB* db, uint16_t num_sets, uint64_t num_keys_per_set = 0, + bool take_snapshot = false, Random64* rand = nullptr); // Returns the status of the previous Insert operation Status GetLastStatus() { return last_status_; } @@ -90,7 +97,7 @@ class RandomTransactionInserter { // Input options Random64* rand_; const WriteOptions write_options_; - const ReadOptions read_options_; + ReadOptions read_options_; const uint64_t num_keys_; const uint16_t num_sets_; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 85662362b..bea0a9224 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -44,6 +44,7 @@ INSTANTIATE_TEST_CASE_P( DBAsBaseDB, TransactionTest, ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), std::make_tuple(false, true, WRITE_COMMITTED), + std::make_tuple(false, false, WRITE_PREPARED), std::make_tuple(false, true, WRITE_PREPARED))); INSTANTIATE_TEST_CASE_P( StackableDBAsBaseDB, TransactionTest, @@ -832,7 +833,7 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { s = txn->Commit(); ASSERT_EQ(s, Status::InvalidArgument()); - // no longer is prpared results + // no longer is prepared results db->GetAllPreparedTransactions(&prepared_trans); ASSERT_EQ(prepared_trans.size(), 0); ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); @@ -1189,7 +1190,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { s = txn->Commit(); ASSERT_EQ(s, Status::InvalidArgument()); - // no longer is prpared results + // no longer is prepared results prepared_trans.clear(); db->GetAllPreparedTransactions(&prepared_trans); ASSERT_EQ(prepared_trans.size(), 0); @@ -4772,34 +4773,55 @@ Status TransactionStressTestInserter(TransactionDB* db, } } // namespace +// Worker threads add a number to a key from each set of keys. The checker +// threads verify that the sum of all keys in each set are equal. TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { - const size_t num_threads = 4; + // Small write buffer to trigger more compactions + options.write_buffer_size = 1024; + ReOpenNoDelete(); + const size_t num_workers = 4; // worker threads count + const size_t num_checkers = 2; // checker threads count const size_t num_transactions_per_thread = 10000; - const size_t num_sets = 3; + const uint16_t num_sets = 3; const size_t num_keys_per_set = 100; // Setting the key-space to be 100 keys should cause enough write-conflicts // to make this test interesting. std::vector threads; + std::atomic finished = {0}; + bool TAKE_SNAPSHOT = true; std::function call_inserter = [&] { ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread, num_sets, num_keys_per_set)); + finished++; + }; + std::function call_checker = [&] { + size_t seed = std::hash()(std::this_thread::get_id()); + Random64 rand(seed); + // Verify that data is consistent + while (finished < num_workers) { + Status s = RandomTransactionInserter::Verify( + db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand); + ASSERT_OK(s); + } }; - // Create N threads that use RandomTransactionInserter to write - // many transactions. - for (uint32_t i = 0; i < num_threads; i++) { + for (uint32_t i = 0; i < num_workers; i++) { threads.emplace_back(call_inserter); } + for (uint32_t i = 0; i < num_checkers; i++) { + threads.emplace_back(call_checker); + } - // Wait for all threads to run + // Wait for all threads to finish for (auto& t : threads) { t.join(); } // Verify that data is consistent - Status s = RandomTransactionInserter::Verify(db, num_sets); + Status s = RandomTransactionInserter::Verify(db, num_sets, num_keys_per_set, + !TAKE_SNAPSHOT); ASSERT_OK(s); } diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index ad34e92b1..d83aae812 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -168,6 +168,10 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { new std::atomic[COMMIT_CACHE_SIZE] {}); } +#define ROCKSDB_LOG_DETAILS(LGR, FMT, ...) \ + ; // due to overhead by default skip such lines +// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__) + // Returns true if commit_seq <= snapshot_seq bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq) const { @@ -179,10 +183,16 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, if (prep_seq == 0) { // Compaction will output keys to bottom-level with sequence number 0 if // it is visible to the earliest snapshot. + ROCKSDB_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 1); return true; } if (snapshot_seq < prep_seq) { // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq + ROCKSDB_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 0); return false; } if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { @@ -190,6 +200,9 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, ReadLock rl(&prepared_mutex_); if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { // Then it is not committed yet + ROCKSDB_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 0); return false; } } @@ -199,6 +212,9 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); if (exist && prep_seq == cached.prep_seq) { // It is committed and also not evicted from commit cache + ROCKSDB_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); return cached.commit_seq <= snapshot_seq; } // else it could be committed but not inserted in the map which could happen @@ -209,6 +225,9 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire); if (max_evicted_seq < prep_seq) { // Not evicted from cache and also not present, so must be still prepared + ROCKSDB_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 0); return false; } // When advancing max_evicted_seq_, we move older entires from prepared to @@ -221,12 +240,18 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, // only (iii) is the case: committed // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < // snapshot_seq + ROCKSDB_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 1); return true; } // else (ii) might be the case: check the commit data saved for this snapshot. // If there was no overlapping commit entry, then it is committed with a // commit_seq lower than any live snapshot, including snapshot_seq. if (old_commit_map_empty_.load(std::memory_order_acquire)) { + ROCKSDB_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 1); return true; } { @@ -236,15 +261,21 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, auto old_commit_entry = old_commit_map_.find(prep_seq); if (old_commit_entry == old_commit_map_.end() || old_commit_entry->second <= snapshot_seq) { + ROCKSDB_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 1); return true; } } // (ii) it the case: it is committed but after the snapshot_seq + ROCKSDB_LOG_DETAILS( + info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, 0); return false; } void WritePreparedTxnDB::AddPrepared(uint64_t seq) { - ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq); + ROCKSDB_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq); // TODO(myabandeh): Add a runtime check to ensure the following assert. assert(seq > max_evicted_seq_); WriteLock wl(&prepared_mutex_); @@ -253,7 +284,7 @@ void WritePreparedTxnDB::AddPrepared(uint64_t seq) { void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq) { - ROCKS_LOG_DEBUG( + ROCKSDB_LOG_DETAILS( info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "", prep_seq, rollback_seq); std::vector snapshots = @@ -281,8 +312,8 @@ void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq, void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq) { - ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, - prepare_seq, commit_seq); + ROCKSDB_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, + prepare_seq, commit_seq); TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start"); TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause"); auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;