Improve stress test for MultiOpsTxnsStressTest (#9829)

Summary:
Adds more coverage to `MultiOpsTxnsStressTest` with a focus on write-prepared transactions.

1. Add a hack to manually evict commit cache entries. We currently cannot assign small values to `wp_commit_cache_bits` because it requires a prepared transaction to commit within a certain range of sequence numbers, otherwise it will throw.
2. Add coverage for commit-time-write-batch. If write policy is write-prepared, we need to set `use_only_the_last_commit_time_batch_for_recovery` to true.
3. After each flush/compaction, verify data consistency. This is possible since data size can be small: default numbers of primary/secondary keys are just 1000.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9829

Test Plan:
```
TEST_TMPDIR=/dev/shm/rocksdb_crashtest_blackbox/ make blackbox_crash_test_with_multiops_wp_txn
```

Reviewed By: pdillinger

Differential Revision: D35806678

Pulled By: riversand963

fbshipit-source-id: d7fde7a29fda0fb481a61f553e0ca0c47da93616
This commit is contained in:
Yanqin Jin 2022-04-27 17:50:54 -07:00 committed by Facebook GitHub Bot
parent d9d456de49
commit 94e245a14d
7 changed files with 371 additions and 107 deletions

View File

@ -2613,6 +2613,7 @@ void StressTest::Open() {
options_.listeners.emplace_back(new DbStressListener(
FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
#endif // !ROCKSDB_LITE
RegisterAdditionalListeners();
options_.create_missing_column_families = true;
if (!FLAGS_use_txn) {
#ifndef NDEBUG
@ -2751,6 +2752,7 @@ void StressTest::Open() {
static_cast<size_t>(FLAGS_wp_snapshot_cache_bits);
txn_db_options.wp_commit_cache_bits =
static_cast<size_t>(FLAGS_wp_commit_cache_bits);
PrepareTxnDbOptions(txn_db_options);
s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
cf_descriptors, &column_families_, &txn_db_);
if (!s.ok()) {

View File

@ -16,6 +16,7 @@ namespace ROCKSDB_NAMESPACE {
class SystemClock;
class Transaction;
class TransactionDB;
struct TransactionDBOptions;
class StressTest {
public:
@ -224,6 +225,12 @@ class StressTest {
void CheckAndSetOptionsForUserTimestamp();
virtual void RegisterAdditionalListeners() {}
#ifndef ROCKSDB_LITE
virtual void PrepareTxnDbOptions(TransactionDBOptions& /*txn_db_opts*/) {}
#endif
std::shared_ptr<Cache> cache_;
std::shared_ptr<Cache> compressed_cache_;
std::shared_ptr<const FilterPolicy> filter_policy_;

View File

@ -15,6 +15,7 @@
#ifndef NDEBUG
#include "utilities/fault_injection_fs.h"
#endif // NDEBUG
#include "utilities/transactions/write_prepared_txn_db.h"
namespace ROCKSDB_NAMESPACE {
@ -31,6 +32,21 @@ DEFINE_int32(delay_snapshot_read_one_in, 0,
"With a chance of 1/N, inject a random delay between taking "
"snapshot and read.");
DEFINE_int32(rollback_one_in, 0,
"If non-zero, rollback non-read-only transactions with a "
"probability of 1/N.");
DEFINE_int32(clear_wp_commit_cache_one_in, 0,
"If non-zero, evict all commit entries from commit cache with a "
"probability of 1/N. This options applies to write-prepared and "
"write-unprepared transactions.");
extern "C" bool rocksdb_write_prepared_TEST_ShouldClearCommitCache(void) {
static Random rand(static_cast<uint32_t>(db_stress_env->NowMicros()));
return FLAGS_clear_wp_commit_cache_one_in > 0 &&
rand.OneIn(FLAGS_clear_wp_commit_cache_one_in);
}
// MultiOpsTxnsStressTest can either operate on a database with pre-populated
// data (possibly from previous ones), or create a new db and preload it with
// data specified via `-lb_a`, `-ub_a`, `-lb_c`, `-ub_c`, etc. Among these, we
@ -75,8 +91,9 @@ void MultiOpsTxnsStressTest::KeyGenerator::FinishInit() {
"Cannot allocate key in [%u, %u)\nStart with a new DB or try change "
"the number of threads for testing via -threads=<#threads>\n",
static_cast<unsigned int>(low_), static_cast<unsigned int>(high_));
fflush(stdout);
fflush(stderr);
std::terminate();
assert(false);
}
initialized_ = true;
}
@ -131,33 +148,43 @@ void MultiOpsTxnsStressTest::KeyGenerator::UndoAllocation(uint32_t new_val) {
}
std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey(uint32_t a) {
char buf[8];
EncodeFixed32(buf, kPrimaryIndexId);
std::reverse(buf, buf + 4);
EncodeFixed32(buf + 4, a);
std::reverse(buf + 4, buf + 8);
return std::string(buf, sizeof(buf));
std::string ret;
PutFixed32(&ret, kPrimaryIndexId);
PutFixed32(&ret, a);
char* const buf = &ret[0];
std::reverse(buf, buf + sizeof(kPrimaryIndexId));
std::reverse(buf + sizeof(kPrimaryIndexId),
buf + sizeof(kPrimaryIndexId) + sizeof(a));
return ret;
}
std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c) {
char buf[8];
EncodeFixed32(buf, kSecondaryIndexId);
std::reverse(buf, buf + 4);
EncodeFixed32(buf + 4, c);
std::reverse(buf + 4, buf + 8);
return std::string(buf, sizeof(buf));
std::string ret;
PutFixed32(&ret, kSecondaryIndexId);
PutFixed32(&ret, c);
char* const buf = &ret[0];
std::reverse(buf, buf + sizeof(kSecondaryIndexId));
std::reverse(buf + sizeof(kSecondaryIndexId),
buf + sizeof(kSecondaryIndexId) + sizeof(c));
return ret;
}
std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c,
uint32_t a) {
char buf[12];
EncodeFixed32(buf, kSecondaryIndexId);
std::reverse(buf, buf + 4);
EncodeFixed32(buf + 4, c);
EncodeFixed32(buf + 8, a);
std::reverse(buf + 4, buf + 8);
std::reverse(buf + 8, buf + 12);
return std::string(buf, sizeof(buf));
std::string ret;
PutFixed32(&ret, kSecondaryIndexId);
PutFixed32(&ret, c);
PutFixed32(&ret, a);
char* const buf = &ret[0];
std::reverse(buf, buf + sizeof(kSecondaryIndexId));
std::reverse(buf + sizeof(kSecondaryIndexId),
buf + sizeof(kSecondaryIndexId) + sizeof(c));
std::reverse(buf + sizeof(kSecondaryIndexId) + sizeof(c),
buf + sizeof(kSecondaryIndexId) + sizeof(c) + sizeof(a));
return ret;
}
std::tuple<Status, uint32_t, uint32_t>
@ -201,40 +228,26 @@ std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey() const {
}
std::string MultiOpsTxnsStressTest::Record::EncodePrimaryIndexValue() const {
char buf[8];
EncodeFixed32(buf, b_);
EncodeFixed32(buf + 4, c_);
return std::string(buf, sizeof(buf));
std::string ret;
PutFixed32(&ret, b_);
PutFixed32(&ret, c_);
return ret;
}
std::pair<std::string, std::string>
MultiOpsTxnsStressTest::Record::EncodeSecondaryIndexEntry() const {
std::string secondary_index_key;
char buf[12];
EncodeFixed32(buf, kSecondaryIndexId);
std::reverse(buf, buf + 4);
EncodeFixed32(buf + 4, c_);
EncodeFixed32(buf + 8, a_);
std::reverse(buf + 4, buf + 8);
std::reverse(buf + 8, buf + 12);
secondary_index_key.assign(buf, sizeof(buf));
std::string secondary_index_key = EncodeSecondaryKey(c_, a_);
// Secondary index value is always 4-byte crc32 of the secondary key
std::string secondary_index_value;
uint32_t crc = crc32c::Value(buf, sizeof(buf));
uint32_t crc =
crc32c::Value(secondary_index_key.data(), secondary_index_key.size());
PutFixed32(&secondary_index_value, crc);
return std::make_pair(secondary_index_key, secondary_index_value);
return std::make_pair(std::move(secondary_index_key), secondary_index_value);
}
std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey() const {
char buf[12];
EncodeFixed32(buf, kSecondaryIndexId);
std::reverse(buf, buf + 4);
EncodeFixed32(buf + 4, c_);
EncodeFixed32(buf + 8, a_);
std::reverse(buf + 4, buf + 8);
std::reverse(buf + 8, buf + 12);
return std::string(buf, sizeof(buf));
return EncodeSecondaryKey(c_, a_);
}
Status MultiOpsTxnsStressTest::Record::DecodePrimaryIndexEntry(
@ -244,27 +257,22 @@ Status MultiOpsTxnsStressTest::Record::DecodePrimaryIndexEntry(
return Status::Corruption("Primary index key length is not 8");
}
const char* const index_id_buf = primary_index_key.data();
uint32_t index_id =
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[0])) << 24;
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[1]))
<< 16;
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[2]))
<< 8;
index_id +=
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[3]));
primary_index_key.remove_prefix(sizeof(uint32_t));
uint32_t index_id = 0;
[[maybe_unused]] bool res = GetFixed32(&primary_index_key, &index_id);
assert(res);
index_id = EndianSwapValue(index_id);
if (index_id != kPrimaryIndexId) {
std::ostringstream oss;
oss << "Unexpected primary index id: " << index_id;
return Status::Corruption(oss.str());
}
const char* const buf = primary_index_key.data();
a_ = static_cast<uint32_t>(static_cast<unsigned char>(buf[0])) << 24;
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[1])) << 16;
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[2])) << 8;
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[3]));
res = GetFixed32(&primary_index_key, &a_);
assert(res);
a_ = EndianSwapValue(a_);
assert(primary_index_key.empty());
if (primary_index_value.size() != 8) {
return Status::Corruption("Primary index value length is not 8");
@ -282,33 +290,28 @@ Status MultiOpsTxnsStressTest::Record::DecodeSecondaryIndexEntry(
uint32_t crc =
crc32c::Value(secondary_index_key.data(), secondary_index_key.size());
const char* const index_id_buf = secondary_index_key.data();
uint32_t index_id =
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[0])) << 24;
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[1]))
<< 16;
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[2]))
<< 8;
index_id +=
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[3]));
secondary_index_key.remove_prefix(sizeof(uint32_t));
uint32_t index_id = 0;
[[maybe_unused]] bool res = GetFixed32(&secondary_index_key, &index_id);
assert(res);
index_id = EndianSwapValue(index_id);
if (index_id != kSecondaryIndexId) {
std::ostringstream oss;
oss << "Unexpected secondary index id: " << index_id;
return Status::Corruption(oss.str());
}
const char* const buf = secondary_index_key.data();
assert(secondary_index_key.size() == 8);
c_ = static_cast<uint32_t>(static_cast<unsigned char>(buf[0])) << 24;
c_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[1])) << 16;
c_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[2])) << 8;
c_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[3]));
res = GetFixed32(&secondary_index_key, &c_);
assert(res);
c_ = EndianSwapValue(c_);
a_ = static_cast<uint32_t>(static_cast<unsigned char>(buf[4])) << 24;
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[5])) << 16;
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[6])) << 8;
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[7]));
assert(secondary_index_key.size() == 4);
res = GetFixed32(&secondary_index_key, &a_);
assert(res);
a_ = EndianSwapValue(a_);
assert(secondary_index_key.empty());
if (secondary_index_value.size() != 4) {
return Status::Corruption("Secondary index value length is not 4");
@ -520,9 +523,35 @@ Status MultiOpsTxnsStressTest::TestCustomOperations(
// Should never reach here.
assert(false);
}
return s;
}
void MultiOpsTxnsStressTest::RegisterAdditionalListeners() {
options_.listeners.emplace_back(new MultiOpsTxnsStressListener(this));
}
#ifndef ROCKSDB_LITE
void MultiOpsTxnsStressTest::PrepareTxnDbOptions(
TransactionDBOptions& txn_db_opts) {
// MultiOpsTxnStressTest uses SingleDelete to delete secondary keys, thus we
// register this callback to let TxnDb know that when rolling back
// a transaction, use only SingleDelete to cancel prior Put from the same
// transaction if applicable.
txn_db_opts.rollback_deletion_type_callback =
[](TransactionDB* /*db*/, ColumnFamilyHandle* /*column_family*/,
const Slice& key) {
Slice ks = key;
uint32_t index_id = 0;
[[maybe_unused]] bool res = GetFixed32(&ks, &index_id);
assert(res);
index_id = EndianSwapValue(index_id);
assert(index_id <= Record::kSecondaryIndexId);
return index_id == Record::kSecondaryIndexId;
};
}
#endif // !ROCKSDB_LITE
Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
uint32_t old_a,
uint32_t old_a_pos,
@ -561,8 +590,10 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
}
if (s.IsNotFound()) {
thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0);
} else if (s.IsBusy()) {
} else if (s.IsBusy() || s.IsIncomplete()) {
// ignore.
// Incomplete also means rollback by application. See the transaction
// implementations.
} else {
thread->stats.AddErrors(1);
}
@ -631,6 +662,16 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
return s;
}
if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) {
s = Status::Incomplete();
return s;
}
s = WriteToCommitTimeWriteBatch(*txn);
if (!s.ok()) {
return s;
}
s = txn->Commit();
auto& key_gen = key_gen_for_a_.at(thread->tid);
@ -677,11 +718,12 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
Record::kPrimaryIndexEntrySize + Record::kSecondaryIndexEntrySize);
return;
} else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
s.IsMergeInProgress()) {
s.IsMergeInProgress() || s.IsIncomplete()) {
// ww-conflict detected, or
// lock cannot be acquired, or
// memtable history is not large enough for conflict checking, or
// Merge operation cannot be resolved.
// Merge operation cannot be resolved, or
// application rollback.
// TODO (yanqin) add stats for other cases?
} else if (s.IsNotFound()) {
// ignore.
@ -727,8 +769,9 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
Record record;
s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
if (!s.ok()) {
fprintf(stderr, "Cannot decode secondary key: %s\n",
s.ToString().c_str());
fprintf(stderr, "Cannot decode secondary key (%s => %s): %s\n",
it->key().ToString(true).c_str(),
it->value().ToString(true).c_str(), s.ToString().c_str());
assert(false);
break;
}
@ -749,21 +792,31 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
} else if (s.IsNotFound()) {
// We can also fail verification here.
std::ostringstream oss;
oss << "pk should exist: " << Slice(pk).ToString(true);
auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
assert(dbimpl);
oss << "snap " << read_opts.snapshot->GetSequenceNumber()
<< " (published " << dbimpl->GetLastPublishedSequence()
<< "), pk should exist: " << Slice(pk).ToString(true);
fprintf(stderr, "%s\n", oss.str().c_str());
assert(false);
break;
}
if (!s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str());
std::ostringstream oss;
auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
assert(dbimpl);
oss << "snap " << read_opts.snapshot->GetSequenceNumber()
<< " (published " << dbimpl->GetLastPublishedSequence() << "), "
<< s.ToString();
fprintf(stderr, "%s\n", oss.str().c_str());
assert(false);
break;
}
auto result = Record::DecodePrimaryIndexValue(value);
s = std::get<0>(result);
if (!s.ok()) {
fprintf(stderr, "Cannot decode primary index value: %s\n",
s.ToString().c_str());
fprintf(stderr, "Cannot decode primary index value %s: %s\n",
Slice(value).ToString(true).c_str(), s.ToString().c_str());
assert(false);
break;
}
@ -771,8 +824,12 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
uint32_t c = std::get<2>(result);
if (c != old_c) {
std::ostringstream oss;
oss << "c in primary index does not match secondary index: " << c
<< " != " << old_c;
auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
assert(dbimpl);
oss << "snap " << read_opts.snapshot->GetSequenceNumber()
<< " (published " << dbimpl->GetLastPublishedSequence()
<< "), pk/sk mismatch. pk: (a=" << record.a_value() << ", "
<< "c=" << c << "), sk: (c=" << old_c << ")";
s = Status::Corruption();
fprintf(stderr, "%s\n", oss.str().c_str());
assert(false);
@ -811,6 +868,16 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
return s;
}
if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) {
s = Status::Incomplete();
return s;
}
s = WriteToCommitTimeWriteBatch(*txn);
if (!s.ok()) {
return s;
}
s = txn->Commit();
if (s.ok()) {
@ -856,7 +923,7 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
} else if (s.IsInvalidArgument()) {
// ignored.
} else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
s.IsMergeInProgress()) {
s.IsMergeInProgress() || s.IsIncomplete()) {
// ignored.
} else {
thread->stats.AddErrors(1);
@ -874,8 +941,8 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
auto result = Record::DecodePrimaryIndexValue(value);
if (!std::get<0>(result).ok()) {
s = std::get<0>(result);
fprintf(stderr, "Cannot decode primary index value: %s\n",
s.ToString().c_str());
fprintf(stderr, "Cannot decode primary index value %s: %s\n",
Slice(value).ToString(true).c_str(), s.ToString().c_str());
assert(false);
return s;
}
@ -892,6 +959,17 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
if (!s.ok()) {
return s;
}
if (FLAGS_rollback_one_in > 0 && thread->rand.OneIn(FLAGS_rollback_one_in)) {
s = Status::Incomplete();
return s;
}
s = WriteToCommitTimeWriteBatch(*txn);
if (!s.ok()) {
return s;
}
s = txn->Commit();
if (s.ok()) {
delete txn;
@ -1050,12 +1128,15 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
// First, iterate primary index.
size_t primary_index_entries_count = 0;
{
char buf[4];
EncodeFixed32(buf, Record::kPrimaryIndexId + 1);
std::reverse(buf, buf + sizeof(buf));
std::string iter_ub_str(buf, sizeof(buf));
std::string iter_ub_str;
PutFixed32(&iter_ub_str, Record::kPrimaryIndexId + 1);
std::reverse(iter_ub_str.begin(), iter_ub_str.end());
Slice iter_ub = iter_ub_str;
std::string start_key;
PutFixed32(&start_key, Record::kPrimaryIndexId);
std::reverse(start_key.begin(), start_key.end());
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts;
@ -1064,7 +1145,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
ropts.iterate_upper_bound = &iter_ub;
std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
for (it->Seek(start_key); it->Valid(); it->Next()) {
Record record;
Status s = record.DecodePrimaryIndexEntry(it->key(), it->value());
if (!s.ok()) {
@ -1101,10 +1182,9 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
// Second, iterate secondary index.
size_t secondary_index_entries_count = 0;
{
char buf[4];
EncodeFixed32(buf, Record::kSecondaryIndexId);
std::reverse(buf, buf + sizeof(buf));
const std::string start_key(buf, sizeof(buf));
std::string start_key;
PutFixed32(&start_key, Record::kSecondaryIndexId);
std::reverse(start_key.begin(), start_key.end());
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
@ -1118,7 +1198,8 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
Record record;
Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
if (!s.ok()) {
oss << "Cannot decode secondary index entry";
oss << "Cannot decode secondary index entry "
<< it->key().ToString(true) << "=>" << it->value().ToString(true);
VerificationAbort(thread->shared, oss.str(), s);
assert(false);
return;
@ -1132,7 +1213,7 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
s = db_->Get(ropts, pk, &value);
if (!s.ok()) {
oss << "Error searching pk " << Slice(pk).ToString(true) << ". "
<< s.ToString();
<< s.ToString() << ". sk " << it->key().ToString(true);
VerificationAbort(thread->shared, oss.str(), s);
assert(false);
return;
@ -1148,8 +1229,10 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
}
uint32_t c_in_primary = std::get<2>(result);
if (c_in_primary != record.c_value()) {
oss << "Pk/sk mismatch. pk: (c=" << c_in_primary
<< "), sk: (c=" << record.c_value() << ")";
oss << "Pk/sk mismatch. pk: " << Slice(pk).ToString(true) << "=>"
<< Slice(value).ToString(true) << " (a=" << record.a_value()
<< ", c=" << c_in_primary << "), sk: " << it->key().ToString(true)
<< " (c=" << record.c_value() << ")";
VerificationAbort(thread->shared, oss.str(), s);
assert(false);
return;
@ -1167,6 +1250,75 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
}
}
void MultiOpsTxnsStressTest::VerifyPkSkFast(int job_id) {
const Snapshot* const snapshot = db_->GetSnapshot();
assert(snapshot);
ManagedSnapshot snapshot_guard(db_, snapshot);
std::ostringstream oss;
auto* dbimpl = static_cast_with_check<DBImpl>(db_->GetRootDB());
assert(dbimpl);
oss << "Job " << job_id << ": [" << snapshot->GetSequenceNumber() << ","
<< dbimpl->GetLastPublishedSequence() << "] ";
std::string start_key;
PutFixed32(&start_key, Record::kSecondaryIndexId);
std::reverse(start_key.begin(), start_key.end());
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts;
ropts.snapshot = snapshot;
ropts.total_order_seek = true;
std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
for (it->Seek(start_key); it->Valid(); it->Next()) {
Record record;
Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
if (!s.ok()) {
oss << "Cannot decode secondary index entry " << it->key().ToString(true)
<< "=>" << it->value().ToString(true);
fprintf(stderr, "%s\n", oss.str().c_str());
fflush(stderr);
assert(false);
}
// After decoding secondary index entry, we know a and c. Crc is verified
// in decoding phase.
//
// Form a primary key and search in the primary index.
std::string pk = Record::EncodePrimaryKey(record.a_value());
std::string value;
s = db_->Get(ropts, pk, &value);
if (!s.ok()) {
oss << "Error searching pk " << Slice(pk).ToString(true) << ". "
<< s.ToString() << ". sk " << it->key().ToString(true);
fprintf(stderr, "%s\n", oss.str().c_str());
fflush(stderr);
assert(false);
}
auto result = Record::DecodePrimaryIndexValue(value);
s = std::get<0>(result);
if (!s.ok()) {
oss << "Error decoding primary index value "
<< Slice(value).ToString(true) << ". " << s.ToString();
fprintf(stderr, "%s\n", oss.str().c_str());
fflush(stderr);
assert(false);
}
uint32_t c_in_primary = std::get<2>(result);
if (c_in_primary != record.c_value()) {
oss << "Pk/sk mismatch. pk: " << Slice(pk).ToString(true) << "=>"
<< Slice(value).ToString(true) << " (a=" << record.a_value()
<< ", c=" << c_in_primary << "), sk: " << it->key().ToString(true)
<< " (c=" << record.c_value() << ")";
fprintf(stderr, "%s\n", oss.str().c_str());
fflush(stderr);
assert(false);
}
}
}
std::pair<uint32_t, uint32_t> MultiOpsTxnsStressTest::ChooseExistingA(
ThreadState* thread) {
uint32_t tid = thread->tid;
@ -1193,6 +1345,22 @@ uint32_t MultiOpsTxnsStressTest::GenerateNextC(ThreadState* thread) {
return key_gen->Allocate();
}
#ifndef ROCKSDB_LITE
Status MultiOpsTxnsStressTest::WriteToCommitTimeWriteBatch(Transaction& txn) {
WriteBatch* ctwb = txn.GetCommitTimeWriteBatch();
assert(ctwb);
// Do not change the content in key_buf.
static constexpr char key_buf[sizeof(Record::kMetadataPrefix) + 4] = {
'\0', '\0', '\0', '\0', '\0', '\0', '\0', '\xff'};
uint64_t counter_val = counter_.Next();
char val_buf[sizeof(counter_val)];
EncodeFixed64(val_buf, counter_val);
return ctwb->Put(Slice(key_buf, sizeof(key_buf)),
Slice(val_buf, sizeof(val_buf)));
}
#endif // !ROCKSDB_LITE
std::string MultiOpsTxnsStressTest::KeySpaces::EncodeTo() const {
std::string result;
PutFixed32(&result, lb_a);
@ -1428,8 +1596,9 @@ void MultiOpsTxnsStressTest::ScanExistingDb(SharedState* shared, int threads) {
Record record;
Status s = record.DecodePrimaryIndexEntry(it->key(), it->value());
if (!s.ok()) {
fprintf(stderr, "Cannot decode primary index entry: %s\n",
s.ToString().c_str());
fprintf(stderr, "Cannot decode primary index entry (%s => %s): %s\n",
it->key().ToString(true).c_str(),
it->value().ToString(true).c_str(), s.ToString().c_str());
assert(false);
}
uint32_t a = record.a_value();

View File

@ -111,6 +111,7 @@ class MultiOpsTxnsStressTest : public StressTest {
public:
class Record {
public:
static constexpr uint32_t kMetadataPrefix = 0;
static constexpr uint32_t kPrimaryIndexId = 1;
static constexpr uint32_t kSecondaryIndexId = 2;
@ -261,6 +262,12 @@ class MultiOpsTxnsStressTest : public StressTest {
ThreadState* thread,
const std::vector<int>& rand_column_families) override;
void RegisterAdditionalListeners() override;
#ifndef ROCKSDB_LITE
void PrepareTxnDbOptions(TransactionDBOptions& txn_db_opts) override;
#endif // !ROCKSDB_LITE
Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a,
uint32_t old_a_pos, uint32_t new_a);
@ -280,7 +287,17 @@ class MultiOpsTxnsStressTest : public StressTest {
VerifyDb(thread);
}
void VerifyPkSkFast(int job_id);
protected:
class Counter {
public:
uint64_t Next() { return value_.fetch_add(1); }
private:
std::atomic<uint64_t> value_ = Env::Default()->NowNanos();
};
using KeySet = std::set<uint32_t>;
class KeyGenerator {
public:
@ -330,9 +347,21 @@ class MultiOpsTxnsStressTest : public StressTest {
uint32_t GenerateNextC(ThreadState* thread);
#ifndef ROCKSDB_LITE
// Some applications, e.g. MyRocks writes a KV pair to the database via
// commit-time-write-batch (ctwb) in additional to the transaction's regular
// write batch. The key is usually constant representing some system
// metadata, while the value is monoticailly increasing which represents the
// actual value of the metadata. Method WriteToCommitTimeWriteBatch()
// emulates this scenario.
Status WriteToCommitTimeWriteBatch(Transaction& txn);
#endif //! ROCKSDB_LITE
std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_a_;
std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_c_;
Counter counter_{};
private:
struct KeySpaces {
uint32_t lb_a = 0;
@ -370,5 +399,38 @@ class InvariantChecker {
"MultiOpsTxnsStressTest::Record::c_ must be 4 bytes");
};
class MultiOpsTxnsStressListener : public EventListener {
public:
explicit MultiOpsTxnsStressListener(MultiOpsTxnsStressTest* stress_test)
: stress_test_(stress_test) {
assert(stress_test_);
}
#ifndef ROCKSDB_LITE
~MultiOpsTxnsStressListener() override {}
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
assert(db);
#ifdef NDEBUG
(void)db;
#endif
assert(info.cf_id == 0);
stress_test_->VerifyPkSkFast(info.job_id);
}
void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) override {
assert(db);
#ifdef NDEBUG
(void)db;
#endif
assert(info.cf_id == 0);
stress_test_->VerifyPkSkFast(info.job_id);
}
#endif //! ROCKSDB_LITE
private:
MultiOpsTxnsStressTest* const stress_test_ = nullptr;
};
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS

View File

@ -383,6 +383,7 @@ multiops_txn_default_params = {
# compactions.
"flush_one_in": 1000,
"key_spaces_path": setup_multiops_txn_key_spaces_file(),
"rollback_one_in": 4,
}
multiops_wc_txn_params = {
@ -401,6 +402,10 @@ multiops_wp_txn_params = {
"enable_pipelined_write": 0,
# OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns
"checkpoint_one_in": 0,
# Required to be 1 in order to use commit-time-batch
"use_only_the_last_commit_time_batch_for_recovery": 1,
"recycle_log_file_num": 0,
"clear_wp_commit_cache_one_in": 10,
}
def finalize_and_sanitize(src_params):

View File

@ -26,6 +26,18 @@
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
// This function is for testing only. If it returns true, then all entries in
// the commit cache will be evicted. Unit and/or stress tests (db_stress)
// can implement this function and customize how frequently commit cache
// eviction occurs.
// TODO: remove this function once we can configure commit cache to be very
// small so that eviction occurs very frequently. This requires the commit
// cache entry to be able to encode prepare and commit sequence numbers so that
// the commit sequence number does not have to be within a certain range of
// prepare sequence number.
extern "C" bool rocksdb_write_prepared_TEST_ShouldClearCommitCache(void)
__attribute__((__weak__));
namespace ROCKSDB_NAMESPACE {
Status WritePreparedTxnDB::Initialize(
@ -505,6 +517,12 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
// legit when a commit entry in a write batch overwrite the previous one
max_evicted_seq = evicted.commit_seq;
}
#ifdef OS_LINUX
if (rocksdb_write_prepared_TEST_ShouldClearCommitCache &&
rocksdb_write_prepared_TEST_ShouldClearCommitCache()) {
max_evicted_seq = last;
}
#endif // OS_LINUX
ROCKS_LOG_DETAILS(info_log_,
"%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64
" => %lu",

View File

@ -513,6 +513,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WriteUnpreparedTxn;
friend class WriteUnpreparedTxnDB;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class MultiOpsTxnsStressTest;
void Init(const TransactionDBOptions& txn_db_opts);