diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 8c23ac373..acab2f34d 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -290,6 +290,8 @@ DECLARE_bool(adaptive_readahead); DECLARE_bool(async_io); DECLARE_string(wal_compression); +DECLARE_int32(create_shared_snapshot_one_in); + constexpr long KB = 1024; constexpr int kRandomValueMaxFactor = 3; constexpr int kValueMaxLen = 100; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 79b072d7d..6c2d573da 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -936,4 +936,7 @@ DEFINE_bool( DEFINE_string(wal_compression, "none", "Algorithm to use for WAL compression. none to disable."); +DEFINE_int32(create_shared_snapshot_one_in, 0, + "On non-zero, create shared snapshots upon transaction commits."); + #endif // GFLAGS diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 3fd497901..d6c57e70e 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -617,13 +617,24 @@ Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) { return s; } -Status StressTest::CommitTxn(Transaction* txn) { +Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) { if (!FLAGS_use_txn) { return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); } Status s = txn->Prepare(); if (s.ok()) { + if (thread && FLAGS_create_shared_snapshot_one_in && + thread->rand.OneIn(FLAGS_create_shared_snapshot_one_in)) { + uint64_t ts = db_stress_env->NowNanos(); + s = txn->SetCommitTimestamp(ts); + assert(s.ok()); + txn->SetSnapshotOnNextOperation(); + } s = txn->Commit(); + assert(txn_db_); + uint64_t ts2 = db_stress_env->NowNanos(); + uint64_t ts_delta = 5ULL * 1000 * 1000 * 1000; + txn_db_->ReleaseSharedSnapshotsOlderThan(ts2 - ts_delta); } delete txn; return s; diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 2307e1261..e6c2fd8e0 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -59,7 +59,7 @@ class StressTest { #ifndef ROCKSDB_LITE Status NewTxn(WriteOptions& write_opts, Transaction** txn); - Status CommitTxn(Transaction* txn); + Status CommitTxn(Transaction* txn, ThreadState* thread = nullptr); Status RollbackTxn(Transaction* txn); #endif diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index fe7c3a4b2..6b75aaeab 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -568,7 +568,7 @@ class NonBatchedOpsStressTest : public StressTest { if (s.ok()) { s = txn->Merge(cfh, key, v); if (s.ok()) { - s = CommitTxn(txn); + s = CommitTxn(txn, thread); } } #endif @@ -587,7 +587,7 @@ class NonBatchedOpsStressTest : public StressTest { if (s.ok()) { s = txn->Put(cfh, key, v); if (s.ok()) { - s = CommitTxn(txn); + s = CommitTxn(txn, thread); } } #endif @@ -648,7 +648,7 @@ class NonBatchedOpsStressTest : public StressTest { if (s.ok()) { s = txn->Delete(cfh, key); if (s.ok()) { - s = CommitTxn(txn); + s = CommitTxn(txn, thread); } } #endif @@ -685,7 +685,7 @@ class NonBatchedOpsStressTest : public StressTest { if (s.ok()) { s = txn->SingleDelete(cfh, key); if (s.ok()) { - s = CommitTxn(txn); + s = CommitTxn(txn, thread); } } #endif diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 1aed479e0..48c6f0bcb 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -310,6 +310,7 @@ txn_params = { "checkpoint_one_in": 0, # pipeline write is not currnetly compatible with WritePrepared txns "enable_pipelined_write": 0, + "create_shared_snapshot_one_in": 10, } best_efforts_recovery_params = {