From 94e245a14d2a3c57214bb884b843d7f89f5a1bf6 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 27 Apr 2022 17:50:54 -0700 Subject: [PATCH] Improve stress test for MultiOpsTxnsStressTest (#9829) Summary: Adds more coverage to `MultiOpsTxnsStressTest` with a focus on write-prepared transactions. 1. Add a hack to manually evict commit cache entries. We currently cannot assign small values to `wp_commit_cache_bits` because it requires a prepared transaction to commit within a certain range of sequence numbers, otherwise it will throw. 2. Add coverage for commit-time-write-batch. If write policy is write-prepared, we need to set `use_only_the_last_commit_time_batch_for_recovery` to true. 3. After each flush/compaction, verify data consistency. This is possible since data size can be small: default numbers of primary/secondary keys are just 1000. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9829 Test Plan: ``` TEST_TMPDIR=/dev/shm/rocksdb_crashtest_blackbox/ make blackbox_crash_test_with_multiops_wp_txn ``` Reviewed By: pdillinger Differential Revision: D35806678 Pulled By: riversand963 fbshipit-source-id: d7fde7a29fda0fb481a61f553e0ca0c47da93616 --- db_stress_tool/db_stress_test_base.cc | 2 + db_stress_tool/db_stress_test_base.h | 7 + db_stress_tool/multi_ops_txns_stress.cc | 383 +++++++++++++----- db_stress_tool/multi_ops_txns_stress.h | 62 +++ tools/db_crashtest.py | 5 + .../transactions/write_prepared_txn_db.cc | 18 + .../transactions/write_prepared_txn_db.h | 1 + 7 files changed, 371 insertions(+), 107 deletions(-) diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 9fde2ecd7..443b825bc 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -2613,6 +2613,7 @@ void StressTest::Open() { options_.listeners.emplace_back(new DbStressListener( FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env)); #endif // !ROCKSDB_LITE + RegisterAdditionalListeners(); options_.create_missing_column_families = true; if (!FLAGS_use_txn) { #ifndef NDEBUG @@ -2751,6 +2752,7 @@ void StressTest::Open() { static_cast(FLAGS_wp_snapshot_cache_bits); txn_db_options.wp_commit_cache_bits = static_cast(FLAGS_wp_commit_cache_bits); + PrepareTxnDbOptions(txn_db_options); s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, cf_descriptors, &column_families_, &txn_db_); if (!s.ok()) { diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 786ace4ee..ee37eb101 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -16,6 +16,7 @@ namespace ROCKSDB_NAMESPACE { class SystemClock; class Transaction; class TransactionDB; +struct TransactionDBOptions; class StressTest { public: @@ -224,6 +225,12 @@ class StressTest { void CheckAndSetOptionsForUserTimestamp(); + virtual void RegisterAdditionalListeners() {} + +#ifndef ROCKSDB_LITE + virtual void PrepareTxnDbOptions(TransactionDBOptions& /*txn_db_opts*/) {} +#endif + std::shared_ptr cache_; std::shared_ptr compressed_cache_; std::shared_ptr filter_policy_; diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 985af56ac..619a119cb 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -15,6 +15,7 @@ #ifndef NDEBUG #include "utilities/fault_injection_fs.h" #endif // NDEBUG +#include "utilities/transactions/write_prepared_txn_db.h" namespace ROCKSDB_NAMESPACE { @@ -31,6 +32,21 @@ DEFINE_int32(delay_snapshot_read_one_in, 0, "With a chance of 1/N, inject a random delay between taking " "snapshot and read."); +DEFINE_int32(rollback_one_in, 0, + "If non-zero, rollback non-read-only transactions with a " + "probability of 1/N."); + +DEFINE_int32(clear_wp_commit_cache_one_in, 0, + "If non-zero, evict all commit entries from commit cache with a " + "probability of 1/N. This options applies to write-prepared and " + "write-unprepared transactions."); + +extern "C" bool rocksdb_write_prepared_TEST_ShouldClearCommitCache(void) { + static Random rand(static_cast(db_stress_env->NowMicros())); + return FLAGS_clear_wp_commit_cache_one_in > 0 && + rand.OneIn(FLAGS_clear_wp_commit_cache_one_in); +} + // MultiOpsTxnsStressTest can either operate on a database with pre-populated // data (possibly from previous ones), or create a new db and preload it with // data specified via `-lb_a`, `-ub_a`, `-lb_c`, `-ub_c`, etc. Among these, we @@ -75,8 +91,9 @@ void MultiOpsTxnsStressTest::KeyGenerator::FinishInit() { "Cannot allocate key in [%u, %u)\nStart with a new DB or try change " "the number of threads for testing via -threads=<#threads>\n", static_cast(low_), static_cast(high_)); + fflush(stdout); fflush(stderr); - std::terminate(); + assert(false); } initialized_ = true; } @@ -131,33 +148,43 @@ void MultiOpsTxnsStressTest::KeyGenerator::UndoAllocation(uint32_t new_val) { } std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey(uint32_t a) { - char buf[8]; - EncodeFixed32(buf, kPrimaryIndexId); - std::reverse(buf, buf + 4); - EncodeFixed32(buf + 4, a); - std::reverse(buf + 4, buf + 8); - return std::string(buf, sizeof(buf)); + std::string ret; + PutFixed32(&ret, kPrimaryIndexId); + PutFixed32(&ret, a); + + char* const buf = &ret[0]; + std::reverse(buf, buf + sizeof(kPrimaryIndexId)); + std::reverse(buf + sizeof(kPrimaryIndexId), + buf + sizeof(kPrimaryIndexId) + sizeof(a)); + return ret; } std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c) { - char buf[8]; - EncodeFixed32(buf, kSecondaryIndexId); - std::reverse(buf, buf + 4); - EncodeFixed32(buf + 4, c); - std::reverse(buf + 4, buf + 8); - return std::string(buf, sizeof(buf)); + std::string ret; + PutFixed32(&ret, kSecondaryIndexId); + PutFixed32(&ret, c); + + char* const buf = &ret[0]; + std::reverse(buf, buf + sizeof(kSecondaryIndexId)); + std::reverse(buf + sizeof(kSecondaryIndexId), + buf + sizeof(kSecondaryIndexId) + sizeof(c)); + return ret; } std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c, uint32_t a) { - char buf[12]; - EncodeFixed32(buf, kSecondaryIndexId); - std::reverse(buf, buf + 4); - EncodeFixed32(buf + 4, c); - EncodeFixed32(buf + 8, a); - std::reverse(buf + 4, buf + 8); - std::reverse(buf + 8, buf + 12); - return std::string(buf, sizeof(buf)); + std::string ret; + PutFixed32(&ret, kSecondaryIndexId); + PutFixed32(&ret, c); + PutFixed32(&ret, a); + + char* const buf = &ret[0]; + std::reverse(buf, buf + sizeof(kSecondaryIndexId)); + std::reverse(buf + sizeof(kSecondaryIndexId), + buf + sizeof(kSecondaryIndexId) + sizeof(c)); + std::reverse(buf + sizeof(kSecondaryIndexId) + sizeof(c), + buf + sizeof(kSecondaryIndexId) + sizeof(c) + sizeof(a)); + return ret; } std::tuple @@ -201,40 +228,26 @@ std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey() const { } std::string MultiOpsTxnsStressTest::Record::EncodePrimaryIndexValue() const { - char buf[8]; - EncodeFixed32(buf, b_); - EncodeFixed32(buf + 4, c_); - return std::string(buf, sizeof(buf)); + std::string ret; + PutFixed32(&ret, b_); + PutFixed32(&ret, c_); + return ret; } std::pair MultiOpsTxnsStressTest::Record::EncodeSecondaryIndexEntry() const { - std::string secondary_index_key; - char buf[12]; - EncodeFixed32(buf, kSecondaryIndexId); - std::reverse(buf, buf + 4); - EncodeFixed32(buf + 4, c_); - EncodeFixed32(buf + 8, a_); - std::reverse(buf + 4, buf + 8); - std::reverse(buf + 8, buf + 12); - secondary_index_key.assign(buf, sizeof(buf)); + std::string secondary_index_key = EncodeSecondaryKey(c_, a_); // Secondary index value is always 4-byte crc32 of the secondary key std::string secondary_index_value; - uint32_t crc = crc32c::Value(buf, sizeof(buf)); + uint32_t crc = + crc32c::Value(secondary_index_key.data(), secondary_index_key.size()); PutFixed32(&secondary_index_value, crc); - return std::make_pair(secondary_index_key, secondary_index_value); + return std::make_pair(std::move(secondary_index_key), secondary_index_value); } std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey() const { - char buf[12]; - EncodeFixed32(buf, kSecondaryIndexId); - std::reverse(buf, buf + 4); - EncodeFixed32(buf + 4, c_); - EncodeFixed32(buf + 8, a_); - std::reverse(buf + 4, buf + 8); - std::reverse(buf + 8, buf + 12); - return std::string(buf, sizeof(buf)); + return EncodeSecondaryKey(c_, a_); } Status MultiOpsTxnsStressTest::Record::DecodePrimaryIndexEntry( @@ -244,27 +257,22 @@ Status MultiOpsTxnsStressTest::Record::DecodePrimaryIndexEntry( return Status::Corruption("Primary index key length is not 8"); } - const char* const index_id_buf = primary_index_key.data(); - uint32_t index_id = - static_cast(static_cast(index_id_buf[0])) << 24; - index_id += static_cast(static_cast(index_id_buf[1])) - << 16; - index_id += static_cast(static_cast(index_id_buf[2])) - << 8; - index_id += - static_cast(static_cast(index_id_buf[3])); - primary_index_key.remove_prefix(sizeof(uint32_t)); + uint32_t index_id = 0; + + [[maybe_unused]] bool res = GetFixed32(&primary_index_key, &index_id); + assert(res); + index_id = EndianSwapValue(index_id); + if (index_id != kPrimaryIndexId) { std::ostringstream oss; oss << "Unexpected primary index id: " << index_id; return Status::Corruption(oss.str()); } - const char* const buf = primary_index_key.data(); - a_ = static_cast(static_cast(buf[0])) << 24; - a_ += static_cast(static_cast(buf[1])) << 16; - a_ += static_cast(static_cast(buf[2])) << 8; - a_ += static_cast(static_cast(buf[3])); + res = GetFixed32(&primary_index_key, &a_); + assert(res); + a_ = EndianSwapValue(a_); + assert(primary_index_key.empty()); if (primary_index_value.size() != 8) { return Status::Corruption("Primary index value length is not 8"); @@ -282,33 +290,28 @@ Status MultiOpsTxnsStressTest::Record::DecodeSecondaryIndexEntry( uint32_t crc = crc32c::Value(secondary_index_key.data(), secondary_index_key.size()); - const char* const index_id_buf = secondary_index_key.data(); - uint32_t index_id = - static_cast(static_cast(index_id_buf[0])) << 24; - index_id += static_cast(static_cast(index_id_buf[1])) - << 16; - index_id += static_cast(static_cast(index_id_buf[2])) - << 8; - index_id += - static_cast(static_cast(index_id_buf[3])); - secondary_index_key.remove_prefix(sizeof(uint32_t)); + uint32_t index_id = 0; + + [[maybe_unused]] bool res = GetFixed32(&secondary_index_key, &index_id); + assert(res); + index_id = EndianSwapValue(index_id); + if (index_id != kSecondaryIndexId) { std::ostringstream oss; oss << "Unexpected secondary index id: " << index_id; return Status::Corruption(oss.str()); } - const char* const buf = secondary_index_key.data(); assert(secondary_index_key.size() == 8); - c_ = static_cast(static_cast(buf[0])) << 24; - c_ += static_cast(static_cast(buf[1])) << 16; - c_ += static_cast(static_cast(buf[2])) << 8; - c_ += static_cast(static_cast(buf[3])); + res = GetFixed32(&secondary_index_key, &c_); + assert(res); + c_ = EndianSwapValue(c_); - a_ = static_cast(static_cast(buf[4])) << 24; - a_ += static_cast(static_cast(buf[5])) << 16; - a_ += static_cast(static_cast(buf[6])) << 8; - a_ += static_cast(static_cast(buf[7])); + assert(secondary_index_key.size() == 4); + res = GetFixed32(&secondary_index_key, &a_); + assert(res); + a_ = EndianSwapValue(a_); + assert(secondary_index_key.empty()); if (secondary_index_value.size() != 4) { return Status::Corruption("Secondary index value length is not 4"); @@ -520,9 +523,35 @@ Status MultiOpsTxnsStressTest::TestCustomOperations( // Should never reach here. assert(false); } + return s; } +void MultiOpsTxnsStressTest::RegisterAdditionalListeners() { + options_.listeners.emplace_back(new MultiOpsTxnsStressListener(this)); +} + +#ifndef ROCKSDB_LITE +void MultiOpsTxnsStressTest::PrepareTxnDbOptions( + TransactionDBOptions& txn_db_opts) { + // MultiOpsTxnStressTest uses SingleDelete to delete secondary keys, thus we + // register this callback to let TxnDb know that when rolling back + // a transaction, use only SingleDelete to cancel prior Put from the same + // transaction if applicable. + txn_db_opts.rollback_deletion_type_callback = + [](TransactionDB* /*db*/, ColumnFamilyHandle* /*column_family*/, + const Slice& key) { + Slice ks = key; + uint32_t index_id = 0; + [[maybe_unused]] bool res = GetFixed32(&ks, &index_id); + assert(res); + index_id = EndianSwapValue(index_id); + assert(index_id <= Record::kSecondaryIndexId); + return index_id == Record::kSecondaryIndexId; + }; +} +#endif // !ROCKSDB_LITE + Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a, uint32_t old_a_pos, @@ -561,8 +590,10 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, } if (s.IsNotFound()) { thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0); - } else if (s.IsBusy()) { + } else if (s.IsBusy() || s.IsIncomplete()) { // ignore. + // Incomplete also means rollback by application. See the transaction + // implementations. } else { thread->stats.AddErrors(1); } @@ -631,6 +662,16 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, return s; } + if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) { + s = Status::Incomplete(); + return s; + } + + s = WriteToCommitTimeWriteBatch(*txn); + if (!s.ok()) { + return s; + } + s = txn->Commit(); auto& key_gen = key_gen_for_a_.at(thread->tid); @@ -677,11 +718,12 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, Record::kPrimaryIndexEntrySize + Record::kSecondaryIndexEntrySize); return; } else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() || - s.IsMergeInProgress()) { + s.IsMergeInProgress() || s.IsIncomplete()) { // ww-conflict detected, or // lock cannot be acquired, or // memtable history is not large enough for conflict checking, or - // Merge operation cannot be resolved. + // Merge operation cannot be resolved, or + // application rollback. // TODO (yanqin) add stats for other cases? } else if (s.IsNotFound()) { // ignore. @@ -727,8 +769,9 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, Record record; s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); if (!s.ok()) { - fprintf(stderr, "Cannot decode secondary key: %s\n", - s.ToString().c_str()); + fprintf(stderr, "Cannot decode secondary key (%s => %s): %s\n", + it->key().ToString(true).c_str(), + it->value().ToString(true).c_str(), s.ToString().c_str()); assert(false); break; } @@ -749,21 +792,31 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, } else if (s.IsNotFound()) { // We can also fail verification here. std::ostringstream oss; - oss << "pk should exist: " << Slice(pk).ToString(true); + auto* dbimpl = static_cast_with_check(db_->GetRootDB()); + assert(dbimpl); + oss << "snap " << read_opts.snapshot->GetSequenceNumber() + << " (published " << dbimpl->GetLastPublishedSequence() + << "), pk should exist: " << Slice(pk).ToString(true); fprintf(stderr, "%s\n", oss.str().c_str()); assert(false); break; } if (!s.ok()) { - fprintf(stderr, "%s\n", s.ToString().c_str()); + std::ostringstream oss; + auto* dbimpl = static_cast_with_check(db_->GetRootDB()); + assert(dbimpl); + oss << "snap " << read_opts.snapshot->GetSequenceNumber() + << " (published " << dbimpl->GetLastPublishedSequence() << "), " + << s.ToString(); + fprintf(stderr, "%s\n", oss.str().c_str()); assert(false); break; } auto result = Record::DecodePrimaryIndexValue(value); s = std::get<0>(result); if (!s.ok()) { - fprintf(stderr, "Cannot decode primary index value: %s\n", - s.ToString().c_str()); + fprintf(stderr, "Cannot decode primary index value %s: %s\n", + Slice(value).ToString(true).c_str(), s.ToString().c_str()); assert(false); break; } @@ -771,8 +824,12 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t c = std::get<2>(result); if (c != old_c) { std::ostringstream oss; - oss << "c in primary index does not match secondary index: " << c - << " != " << old_c; + auto* dbimpl = static_cast_with_check(db_->GetRootDB()); + assert(dbimpl); + oss << "snap " << read_opts.snapshot->GetSequenceNumber() + << " (published " << dbimpl->GetLastPublishedSequence() + << "), pk/sk mismatch. pk: (a=" << record.a_value() << ", " + << "c=" << c << "), sk: (c=" << old_c << ")"; s = Status::Corruption(); fprintf(stderr, "%s\n", oss.str().c_str()); assert(false); @@ -811,6 +868,16 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, return s; } + if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) { + s = Status::Incomplete(); + return s; + } + + s = WriteToCommitTimeWriteBatch(*txn); + if (!s.ok()) { + return s; + } + s = txn->Commit(); if (s.ok()) { @@ -856,7 +923,7 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, } else if (s.IsInvalidArgument()) { // ignored. } else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() || - s.IsMergeInProgress()) { + s.IsMergeInProgress() || s.IsIncomplete()) { // ignored. } else { thread->stats.AddErrors(1); @@ -874,8 +941,8 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, auto result = Record::DecodePrimaryIndexValue(value); if (!std::get<0>(result).ok()) { s = std::get<0>(result); - fprintf(stderr, "Cannot decode primary index value: %s\n", - s.ToString().c_str()); + fprintf(stderr, "Cannot decode primary index value %s: %s\n", + Slice(value).ToString(true).c_str(), s.ToString().c_str()); assert(false); return s; } @@ -892,6 +959,17 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, if (!s.ok()) { return s; } + + if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) { + s = Status::Incomplete(); + return s; + } + + s = WriteToCommitTimeWriteBatch(*txn); + if (!s.ok()) { + return s; + } + s = txn->Commit(); if (s.ok()) { delete txn; @@ -1050,12 +1128,15 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { // First, iterate primary index. size_t primary_index_entries_count = 0; { - char buf[4]; - EncodeFixed32(buf, Record::kPrimaryIndexId + 1); - std::reverse(buf, buf + sizeof(buf)); - std::string iter_ub_str(buf, sizeof(buf)); + std::string iter_ub_str; + PutFixed32(&iter_ub_str, Record::kPrimaryIndexId + 1); + std::reverse(iter_ub_str.begin(), iter_ub_str.end()); Slice iter_ub = iter_ub_str; + std::string start_key; + PutFixed32(&start_key, Record::kPrimaryIndexId); + std::reverse(start_key.begin(), start_key.end()); + // This `ReadOptions` is for validation purposes. Ignore // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions ropts; @@ -1064,7 +1145,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { ropts.iterate_upper_bound = &iter_ub; std::unique_ptr it(db_->NewIterator(ropts)); - for (it->SeekToFirst(); it->Valid(); it->Next()) { + for (it->Seek(start_key); it->Valid(); it->Next()) { Record record; Status s = record.DecodePrimaryIndexEntry(it->key(), it->value()); if (!s.ok()) { @@ -1101,10 +1182,9 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { // Second, iterate secondary index. size_t secondary_index_entries_count = 0; { - char buf[4]; - EncodeFixed32(buf, Record::kSecondaryIndexId); - std::reverse(buf, buf + sizeof(buf)); - const std::string start_key(buf, sizeof(buf)); + std::string start_key; + PutFixed32(&start_key, Record::kSecondaryIndexId); + std::reverse(start_key.begin(), start_key.end()); // This `ReadOptions` is for validation purposes. Ignore // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. @@ -1118,7 +1198,8 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { Record record; Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); if (!s.ok()) { - oss << "Cannot decode secondary index entry"; + oss << "Cannot decode secondary index entry " + << it->key().ToString(true) << "=>" << it->value().ToString(true); VerificationAbort(thread->shared, oss.str(), s); assert(false); return; @@ -1132,7 +1213,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { s = db_->Get(ropts, pk, &value); if (!s.ok()) { oss << "Error searching pk " << Slice(pk).ToString(true) << ". " - << s.ToString(); + << s.ToString() << ". sk " << it->key().ToString(true); VerificationAbort(thread->shared, oss.str(), s); assert(false); return; @@ -1148,8 +1229,10 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { } uint32_t c_in_primary = std::get<2>(result); if (c_in_primary != record.c_value()) { - oss << "Pk/sk mismatch. pk: (c=" << c_in_primary - << "), sk: (c=" << record.c_value() << ")"; + oss << "Pk/sk mismatch. pk: " << Slice(pk).ToString(true) << "=>" + << Slice(value).ToString(true) << " (a=" << record.a_value() + << ", c=" << c_in_primary << "), sk: " << it->key().ToString(true) + << " (c=" << record.c_value() << ")"; VerificationAbort(thread->shared, oss.str(), s); assert(false); return; @@ -1167,6 +1250,75 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { } } +void MultiOpsTxnsStressTest::VerifyPkSkFast(int job_id) { + const Snapshot* const snapshot = db_->GetSnapshot(); + assert(snapshot); + ManagedSnapshot snapshot_guard(db_, snapshot); + + std::ostringstream oss; + auto* dbimpl = static_cast_with_check(db_->GetRootDB()); + assert(dbimpl); + + oss << "Job " << job_id << ": [" << snapshot->GetSequenceNumber() << "," + << dbimpl->GetLastPublishedSequence() << "] "; + + std::string start_key; + PutFixed32(&start_key, Record::kSecondaryIndexId); + std::reverse(start_key.begin(), start_key.end()); + + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. + ReadOptions ropts; + ropts.snapshot = snapshot; + ropts.total_order_seek = true; + + std::unique_ptr it(db_->NewIterator(ropts)); + for (it->Seek(start_key); it->Valid(); it->Next()) { + Record record; + Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); + if (!s.ok()) { + oss << "Cannot decode secondary index entry " << it->key().ToString(true) + << "=>" << it->value().ToString(true); + fprintf(stderr, "%s\n", oss.str().c_str()); + fflush(stderr); + assert(false); + } + // After decoding secondary index entry, we know a and c. Crc is verified + // in decoding phase. + // + // Form a primary key and search in the primary index. + std::string pk = Record::EncodePrimaryKey(record.a_value()); + std::string value; + s = db_->Get(ropts, pk, &value); + if (!s.ok()) { + oss << "Error searching pk " << Slice(pk).ToString(true) << ". " + << s.ToString() << ". sk " << it->key().ToString(true); + fprintf(stderr, "%s\n", oss.str().c_str()); + fflush(stderr); + assert(false); + } + auto result = Record::DecodePrimaryIndexValue(value); + s = std::get<0>(result); + if (!s.ok()) { + oss << "Error decoding primary index value " + << Slice(value).ToString(true) << ". " << s.ToString(); + fprintf(stderr, "%s\n", oss.str().c_str()); + fflush(stderr); + assert(false); + } + uint32_t c_in_primary = std::get<2>(result); + if (c_in_primary != record.c_value()) { + oss << "Pk/sk mismatch. pk: " << Slice(pk).ToString(true) << "=>" + << Slice(value).ToString(true) << " (a=" << record.a_value() + << ", c=" << c_in_primary << "), sk: " << it->key().ToString(true) + << " (c=" << record.c_value() << ")"; + fprintf(stderr, "%s\n", oss.str().c_str()); + fflush(stderr); + assert(false); + } + } +} + std::pair MultiOpsTxnsStressTest::ChooseExistingA( ThreadState* thread) { uint32_t tid = thread->tid; @@ -1193,6 +1345,22 @@ uint32_t MultiOpsTxnsStressTest::GenerateNextC(ThreadState* thread) { return key_gen->Allocate(); } +#ifndef ROCKSDB_LITE +Status MultiOpsTxnsStressTest::WriteToCommitTimeWriteBatch(Transaction& txn) { + WriteBatch* ctwb = txn.GetCommitTimeWriteBatch(); + assert(ctwb); + // Do not change the content in key_buf. + static constexpr char key_buf[sizeof(Record::kMetadataPrefix) + 4] = { + '\0', '\0', '\0', '\0', '\0', '\0', '\0', '\xff'}; + + uint64_t counter_val = counter_.Next(); + char val_buf[sizeof(counter_val)]; + EncodeFixed64(val_buf, counter_val); + return ctwb->Put(Slice(key_buf, sizeof(key_buf)), + Slice(val_buf, sizeof(val_buf))); +} +#endif // !ROCKSDB_LITE + std::string MultiOpsTxnsStressTest::KeySpaces::EncodeTo() const { std::string result; PutFixed32(&result, lb_a); @@ -1428,8 +1596,9 @@ void MultiOpsTxnsStressTest::ScanExistingDb(SharedState* shared, int threads) { Record record; Status s = record.DecodePrimaryIndexEntry(it->key(), it->value()); if (!s.ok()) { - fprintf(stderr, "Cannot decode primary index entry: %s\n", - s.ToString().c_str()); + fprintf(stderr, "Cannot decode primary index entry (%s => %s): %s\n", + it->key().ToString(true).c_str(), + it->value().ToString(true).c_str(), s.ToString().c_str()); assert(false); } uint32_t a = record.a_value(); diff --git a/db_stress_tool/multi_ops_txns_stress.h b/db_stress_tool/multi_ops_txns_stress.h index 47b438875..a7db1c69e 100644 --- a/db_stress_tool/multi_ops_txns_stress.h +++ b/db_stress_tool/multi_ops_txns_stress.h @@ -111,6 +111,7 @@ class MultiOpsTxnsStressTest : public StressTest { public: class Record { public: + static constexpr uint32_t kMetadataPrefix = 0; static constexpr uint32_t kPrimaryIndexId = 1; static constexpr uint32_t kSecondaryIndexId = 2; @@ -261,6 +262,12 @@ class MultiOpsTxnsStressTest : public StressTest { ThreadState* thread, const std::vector& rand_column_families) override; + void RegisterAdditionalListeners() override; + +#ifndef ROCKSDB_LITE + void PrepareTxnDbOptions(TransactionDBOptions& txn_db_opts) override; +#endif // !ROCKSDB_LITE + Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a, uint32_t old_a_pos, uint32_t new_a); @@ -280,7 +287,17 @@ class MultiOpsTxnsStressTest : public StressTest { VerifyDb(thread); } + void VerifyPkSkFast(int job_id); + protected: + class Counter { + public: + uint64_t Next() { return value_.fetch_add(1); } + + private: + std::atomic value_ = Env::Default()->NowNanos(); + }; + using KeySet = std::set; class KeyGenerator { public: @@ -330,9 +347,21 @@ class MultiOpsTxnsStressTest : public StressTest { uint32_t GenerateNextC(ThreadState* thread); +#ifndef ROCKSDB_LITE + // Some applications, e.g. MyRocks writes a KV pair to the database via + // commit-time-write-batch (ctwb) in additional to the transaction's regular + // write batch. The key is usually constant representing some system + // metadata, while the value is monoticailly increasing which represents the + // actual value of the metadata. Method WriteToCommitTimeWriteBatch() + // emulates this scenario. + Status WriteToCommitTimeWriteBatch(Transaction& txn); +#endif //! ROCKSDB_LITE + std::vector> key_gen_for_a_; std::vector> key_gen_for_c_; + Counter counter_{}; + private: struct KeySpaces { uint32_t lb_a = 0; @@ -370,5 +399,38 @@ class InvariantChecker { "MultiOpsTxnsStressTest::Record::c_ must be 4 bytes"); }; +class MultiOpsTxnsStressListener : public EventListener { + public: + explicit MultiOpsTxnsStressListener(MultiOpsTxnsStressTest* stress_test) + : stress_test_(stress_test) { + assert(stress_test_); + } + +#ifndef ROCKSDB_LITE + ~MultiOpsTxnsStressListener() override {} + + void OnFlushCompleted(DB* db, const FlushJobInfo& info) override { + assert(db); +#ifdef NDEBUG + (void)db; +#endif + assert(info.cf_id == 0); + stress_test_->VerifyPkSkFast(info.job_id); + } + + void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) override { + assert(db); +#ifdef NDEBUG + (void)db; +#endif + assert(info.cf_id == 0); + stress_test_->VerifyPkSkFast(info.job_id); + } +#endif //! ROCKSDB_LITE + + private: + MultiOpsTxnsStressTest* const stress_test_ = nullptr; +}; + } // namespace ROCKSDB_NAMESPACE #endif // GFLAGS diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 231f79372..696448989 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -383,6 +383,7 @@ multiops_txn_default_params = { # compactions. "flush_one_in": 1000, "key_spaces_path": setup_multiops_txn_key_spaces_file(), + "rollback_one_in": 4, } multiops_wc_txn_params = { @@ -401,6 +402,10 @@ multiops_wp_txn_params = { "enable_pipelined_write": 0, # OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns "checkpoint_one_in": 0, + # Required to be 1 in order to use commit-time-batch + "use_only_the_last_commit_time_batch_for_recovery": 1, + "recycle_log_file_num": 0, + "clear_wp_commit_cache_one_in": 10, } def finalize_and_sanitize(src_params): diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 3b6985737..139afc37a 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -26,6 +26,18 @@ #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/transaction_db_mutex_impl.h" +// This function is for testing only. If it returns true, then all entries in +// the commit cache will be evicted. Unit and/or stress tests (db_stress) +// can implement this function and customize how frequently commit cache +// eviction occurs. +// TODO: remove this function once we can configure commit cache to be very +// small so that eviction occurs very frequently. This requires the commit +// cache entry to be able to encode prepare and commit sequence numbers so that +// the commit sequence number does not have to be within a certain range of +// prepare sequence number. +extern "C" bool rocksdb_write_prepared_TEST_ShouldClearCommitCache(void) + __attribute__((__weak__)); + namespace ROCKSDB_NAMESPACE { Status WritePreparedTxnDB::Initialize( @@ -505,6 +517,12 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, // legit when a commit entry in a write batch overwrite the previous one max_evicted_seq = evicted.commit_seq; } +#ifdef OS_LINUX + if (rocksdb_write_prepared_TEST_ShouldClearCommitCache && + rocksdb_write_prepared_TEST_ShouldClearCommitCache()) { + max_evicted_seq = last; + } +#endif // OS_LINUX ROCKS_LOG_DETAILS(info_log_, "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64 " => %lu", diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 502fea56a..5ae29b3f3 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -513,6 +513,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WriteUnpreparedTxn; friend class WriteUnpreparedTxnDB; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; + friend class MultiOpsTxnsStressTest; void Init(const TransactionDBOptions& txn_db_opts);