Stress test for RocksDB transactions (#8936)
Summary: Current db_stress does not cover complex read-write transactions. Therefore, this PR adds coverage for emulated MyRocks-style transactions in `MultiOpsTxnsStressTest`. To achieve this, we need: - Add a new operation type 'customops' so that we can add new complex groups of operations, e.g. transactions involving multiple read-write operations. - Implement three read-write transactions and two read-only ones to emulate MyRocks-style transactions. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8936 Test Plan: ``` make check ./db_stress -test_multi_ops_txns -use_txn -clear_column_family_one_in=0 -column_families=1 -writepercent=0 -delpercent=0 -delrangepercent=0 -customopspercent=60 -readpercent=20 -prefixpercent=0 -iterpercent=20 -reopen=0 -ops_per_thread=100000 ``` Next step is to add more configurability and refine input generation and result reporting, which will done in separate follow-up PRs. Reviewed By: zhichao-cao Differential Revision: D31071795 Pulled By: riversand963 fbshipit-source-id: 50d7c828346ec643311336b904848a1588a37006
This commit is contained in:
parent
e92a0ed040
commit
e05c2bb549
1
TARGETS
1
TARGETS
@ -875,6 +875,7 @@ cpp_library(
|
|||||||
"db_stress_tool/db_stress_test_base.cc",
|
"db_stress_tool/db_stress_test_base.cc",
|
||||||
"db_stress_tool/db_stress_tool.cc",
|
"db_stress_tool/db_stress_tool.cc",
|
||||||
"db_stress_tool/expected_state.cc",
|
"db_stress_tool/expected_state.cc",
|
||||||
|
"db_stress_tool/multi_ops_txns_stress.cc",
|
||||||
"db_stress_tool/no_batched_ops_stress.cc",
|
"db_stress_tool/no_batched_ops_stress.cc",
|
||||||
"test_util/testutil.cc",
|
"test_util/testutil.cc",
|
||||||
"tools/block_cache_analyzer/block_cache_trace_analyzer.cc",
|
"tools/block_cache_analyzer/block_cache_trace_analyzer.cc",
|
||||||
|
@ -11,6 +11,7 @@ add_executable(db_stress${ARTIFACT_SUFFIX}
|
|||||||
db_stress_test_base.cc
|
db_stress_test_base.cc
|
||||||
db_stress_tool.cc
|
db_stress_tool.cc
|
||||||
expected_state.cc
|
expected_state.cc
|
||||||
|
multi_ops_txns_stress.cc
|
||||||
no_batched_ops_stress.cc)
|
no_batched_ops_stress.cc)
|
||||||
target_link_libraries(db_stress${ARTIFACT_SUFFIX} ${ROCKSDB_LIB} ${THIRDPARTY_LIBS})
|
target_link_libraries(db_stress${ARTIFACT_SUFFIX} ${ROCKSDB_LIB} ${THIRDPARTY_LIBS})
|
||||||
list(APPEND tool_deps db_stress)
|
list(APPEND tool_deps db_stress)
|
||||||
|
@ -87,6 +87,7 @@ DECLARE_int64(active_width);
|
|||||||
DECLARE_bool(test_batches_snapshots);
|
DECLARE_bool(test_batches_snapshots);
|
||||||
DECLARE_bool(atomic_flush);
|
DECLARE_bool(atomic_flush);
|
||||||
DECLARE_bool(test_cf_consistency);
|
DECLARE_bool(test_cf_consistency);
|
||||||
|
DECLARE_bool(test_multi_ops_txns);
|
||||||
DECLARE_int32(threads);
|
DECLARE_int32(threads);
|
||||||
DECLARE_int32(ttl);
|
DECLARE_int32(ttl);
|
||||||
DECLARE_int32(value_size_mult);
|
DECLARE_int32(value_size_mult);
|
||||||
@ -203,6 +204,7 @@ DECLARE_int32(delrangepercent);
|
|||||||
DECLARE_int32(nooverwritepercent);
|
DECLARE_int32(nooverwritepercent);
|
||||||
DECLARE_int32(iterpercent);
|
DECLARE_int32(iterpercent);
|
||||||
DECLARE_uint64(num_iterations);
|
DECLARE_uint64(num_iterations);
|
||||||
|
DECLARE_int32(customopspercent);
|
||||||
DECLARE_string(compression_type);
|
DECLARE_string(compression_type);
|
||||||
DECLARE_string(bottommost_compression_type);
|
DECLARE_string(bottommost_compression_type);
|
||||||
DECLARE_int32(compression_max_dict_bytes);
|
DECLARE_int32(compression_max_dict_bytes);
|
||||||
@ -569,6 +571,8 @@ extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz);
|
|||||||
extern StressTest* CreateCfConsistencyStressTest();
|
extern StressTest* CreateCfConsistencyStressTest();
|
||||||
extern StressTest* CreateBatchedOpsStressTest();
|
extern StressTest* CreateBatchedOpsStressTest();
|
||||||
extern StressTest* CreateNonBatchedOpsStressTest();
|
extern StressTest* CreateNonBatchedOpsStressTest();
|
||||||
|
extern StressTest* CreateMultiOpsTxnsStressTest();
|
||||||
|
extern void CheckAndSetOptionsForMultiOpsTxnStressTest();
|
||||||
extern void InitializeHotKeyGenerator(double alpha);
|
extern void InitializeHotKeyGenerator(double alpha);
|
||||||
extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key);
|
extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key);
|
||||||
|
|
||||||
|
@ -90,6 +90,11 @@ DEFINE_bool(test_cf_consistency, false,
|
|||||||
"multiple column families are consistent. Setting this implies "
|
"multiple column families are consistent. Setting this implies "
|
||||||
"`atomic_flush=true` is set true if `disable_wal=false`.\n");
|
"`atomic_flush=true` is set true if `disable_wal=false`.\n");
|
||||||
|
|
||||||
|
DEFINE_bool(test_multi_ops_txns, false,
|
||||||
|
"If set, runs stress test dedicated to verifying multi-ops "
|
||||||
|
"transactions on a simple relational table with primary and "
|
||||||
|
"secondary index.");
|
||||||
|
|
||||||
DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
|
DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
|
||||||
|
|
||||||
DEFINE_int32(ttl, -1,
|
DEFINE_int32(ttl, -1,
|
||||||
@ -675,6 +680,10 @@ DEFINE_uint64(num_iterations, 10, "Number of iterations per MultiIterate run");
|
|||||||
static const bool FLAGS_num_iterations_dummy __attribute__((__unused__)) =
|
static const bool FLAGS_num_iterations_dummy __attribute__((__unused__)) =
|
||||||
RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range);
|
RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range);
|
||||||
|
|
||||||
|
DEFINE_int32(
|
||||||
|
customopspercent, 0,
|
||||||
|
"Ratio of custom operations to total workload (expressed as a percentage)");
|
||||||
|
|
||||||
DEFINE_string(compression_type, "snappy",
|
DEFINE_string(compression_type, "snappy",
|
||||||
"Algorithm to use to compress the database");
|
"Algorithm to use to compress the database");
|
||||||
|
|
||||||
|
@ -625,11 +625,15 @@ void StressTest::OperateDb(ThreadState* thread) {
|
|||||||
write_opts.sync = true;
|
write_opts.sync = true;
|
||||||
}
|
}
|
||||||
write_opts.disableWAL = FLAGS_disable_wal;
|
write_opts.disableWAL = FLAGS_disable_wal;
|
||||||
const int prefixBound = static_cast<int>(FLAGS_readpercent) +
|
const int prefix_bound = static_cast<int>(FLAGS_readpercent) +
|
||||||
static_cast<int>(FLAGS_prefixpercent);
|
static_cast<int>(FLAGS_prefixpercent);
|
||||||
const int writeBound = prefixBound + static_cast<int>(FLAGS_writepercent);
|
const int write_bound = prefix_bound + static_cast<int>(FLAGS_writepercent);
|
||||||
const int delBound = writeBound + static_cast<int>(FLAGS_delpercent);
|
const int del_bound = write_bound + static_cast<int>(FLAGS_delpercent);
|
||||||
const int delRangeBound = delBound + static_cast<int>(FLAGS_delrangepercent);
|
const int delrange_bound =
|
||||||
|
del_bound + static_cast<int>(FLAGS_delrangepercent);
|
||||||
|
const int iterate_bound =
|
||||||
|
delrange_bound + static_cast<int>(FLAGS_iterpercent);
|
||||||
|
|
||||||
const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
|
const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
@ -885,7 +889,7 @@ void StressTest::OperateDb(ThreadState* thread) {
|
|||||||
} else {
|
} else {
|
||||||
TestGet(thread, read_opts, rand_column_families, rand_keys);
|
TestGet(thread, read_opts, rand_column_families, rand_keys);
|
||||||
}
|
}
|
||||||
} else if (prob_op < prefixBound) {
|
} else if (prob_op < prefix_bound) {
|
||||||
assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
|
assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
|
||||||
// OPERATION prefix scan
|
// OPERATION prefix scan
|
||||||
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
|
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
|
||||||
@ -893,22 +897,22 @@ void StressTest::OperateDb(ThreadState* thread) {
|
|||||||
// be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
|
// be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
|
||||||
// prefix
|
// prefix
|
||||||
TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
|
TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
|
||||||
} else if (prob_op < writeBound) {
|
} else if (prob_op < write_bound) {
|
||||||
assert(prefixBound <= prob_op);
|
assert(prefix_bound <= prob_op);
|
||||||
// OPERATION write
|
// OPERATION write
|
||||||
TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
|
TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
|
||||||
value, lock);
|
value, lock);
|
||||||
} else if (prob_op < delBound) {
|
} else if (prob_op < del_bound) {
|
||||||
assert(writeBound <= prob_op);
|
assert(write_bound <= prob_op);
|
||||||
// OPERATION delete
|
// OPERATION delete
|
||||||
TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
|
TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
|
||||||
} else if (prob_op < delRangeBound) {
|
} else if (prob_op < delrange_bound) {
|
||||||
assert(delBound <= prob_op);
|
assert(del_bound <= prob_op);
|
||||||
// OPERATION delete range
|
// OPERATION delete range
|
||||||
TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
|
TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
|
||||||
lock);
|
lock);
|
||||||
} else {
|
} else if (prob_op < iterate_bound) {
|
||||||
assert(delRangeBound <= prob_op);
|
assert(delrange_bound <= prob_op);
|
||||||
// OPERATION iterate
|
// OPERATION iterate
|
||||||
int num_seeks = static_cast<int>(
|
int num_seeks = static_cast<int>(
|
||||||
std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
|
std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
|
||||||
@ -916,6 +920,9 @@ void StressTest::OperateDb(ThreadState* thread) {
|
|||||||
rand_keys = GenerateNKeys(thread, num_seeks, i);
|
rand_keys = GenerateNKeys(thread, num_seeks, i);
|
||||||
i += num_seeks - 1;
|
i += num_seeks - 1;
|
||||||
TestIterate(thread, read_opts, rand_column_families, rand_keys);
|
TestIterate(thread, read_opts, rand_column_families, rand_keys);
|
||||||
|
} else {
|
||||||
|
assert(iterate_bound <= prob_op);
|
||||||
|
TestCustomOperations(thread, rand_column_families);
|
||||||
}
|
}
|
||||||
thread->stats.FinishedSingleOp();
|
thread->stats.FinishedSingleOp();
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
@ -2122,6 +2129,7 @@ void StressTest::PrintEnv() const {
|
|||||||
fprintf(stdout, "No overwrite percentage : %d%%\n",
|
fprintf(stdout, "No overwrite percentage : %d%%\n",
|
||||||
FLAGS_nooverwritepercent);
|
FLAGS_nooverwritepercent);
|
||||||
fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
|
fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
|
||||||
|
fprintf(stdout, "Custom ops percentage : %d%%\n", FLAGS_customopspercent);
|
||||||
fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
|
fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
|
||||||
FLAGS_db_write_buffer_size);
|
FLAGS_db_write_buffer_size);
|
||||||
fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size);
|
fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size);
|
||||||
@ -2752,7 +2760,7 @@ void StressTest::Reopen(ThreadState* thread) {
|
|||||||
// the db via a callbac ii) they hold on to a snapshot and the upcoming
|
// the db via a callbac ii) they hold on to a snapshot and the upcoming
|
||||||
// ::Close would complain about it.
|
// ::Close would complain about it.
|
||||||
const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0;
|
const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0;
|
||||||
bool bg_canceled = false;
|
bool bg_canceled __attribute__((unused)) = false;
|
||||||
if (write_prepared || thread->rand.OneIn(2)) {
|
if (write_prepared || thread->rand.OneIn(2)) {
|
||||||
const bool wait =
|
const bool wait =
|
||||||
write_prepared || static_cast<bool>(thread->rand.OneIn(2));
|
write_prepared || static_cast<bool>(thread->rand.OneIn(2));
|
||||||
@ -2760,7 +2768,6 @@ void StressTest::Reopen(ThreadState* thread) {
|
|||||||
bg_canceled = wait;
|
bg_canceled = wait;
|
||||||
}
|
}
|
||||||
assert(!write_prepared || bg_canceled);
|
assert(!write_prepared || bg_canceled);
|
||||||
(void) bg_canceled;
|
|
||||||
#else
|
#else
|
||||||
(void) thread;
|
(void) thread;
|
||||||
#endif
|
#endif
|
||||||
|
@ -32,7 +32,7 @@ class StressTest {
|
|||||||
void InitDb();
|
void InitDb();
|
||||||
// The initialization work is split into two parts to avoid a circular
|
// The initialization work is split into two parts to avoid a circular
|
||||||
// dependency with `SharedState`.
|
// dependency with `SharedState`.
|
||||||
void FinishInitDb(SharedState*);
|
virtual void FinishInitDb(SharedState*);
|
||||||
|
|
||||||
// Return false if verification fails.
|
// Return false if verification fails.
|
||||||
bool VerifySecondaries();
|
bool VerifySecondaries();
|
||||||
@ -203,6 +203,12 @@ class StressTest {
|
|||||||
const std::vector<int64_t>& rand_keys);
|
const std::vector<int64_t>& rand_keys);
|
||||||
#endif // !ROCKSDB_LITE
|
#endif // !ROCKSDB_LITE
|
||||||
|
|
||||||
|
virtual Status TestCustomOperations(
|
||||||
|
ThreadState* /*thread*/,
|
||||||
|
const std::vector<int>& /*rand_column_families*/) {
|
||||||
|
return Status::NotSupported("TestCustomOperations() must be overridden");
|
||||||
|
}
|
||||||
|
|
||||||
void VerificationAbort(SharedState* shared, std::string msg, Status s) const;
|
void VerificationAbort(SharedState* shared, std::string msg, Status s) const;
|
||||||
|
|
||||||
void VerificationAbort(SharedState* shared, std::string msg, int cf,
|
void VerificationAbort(SharedState* shared, std::string msg, int cf,
|
||||||
|
@ -150,14 +150,18 @@ int db_stress_tool(int argc, char** argv) {
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
if ((FLAGS_readpercent + FLAGS_prefixpercent + FLAGS_writepercent +
|
if ((FLAGS_readpercent + FLAGS_prefixpercent + FLAGS_writepercent +
|
||||||
FLAGS_delpercent + FLAGS_delrangepercent + FLAGS_iterpercent) != 100) {
|
FLAGS_delpercent + FLAGS_delrangepercent + FLAGS_iterpercent +
|
||||||
fprintf(stderr,
|
FLAGS_customopspercent) != 100) {
|
||||||
"Error: "
|
fprintf(
|
||||||
"Read(%d)+Prefix(%d)+Write(%d)+Delete(%d)+DeleteRange(%d)"
|
stderr,
|
||||||
"+Iterate(%d) percents != "
|
"Error: "
|
||||||
"100!\n",
|
"Read(-readpercent=%d)+Prefix(-prefixpercent=%d)+Write(-writepercent=%"
|
||||||
FLAGS_readpercent, FLAGS_prefixpercent, FLAGS_writepercent,
|
"d)+Delete(-delpercent=%d)+DeleteRange(-delrangepercent=%d)"
|
||||||
FLAGS_delpercent, FLAGS_delrangepercent, FLAGS_iterpercent);
|
"+Iterate(-iterpercent=%d)+CustomOps(-customopspercent=%d) percents != "
|
||||||
|
"100!\n",
|
||||||
|
FLAGS_readpercent, FLAGS_prefixpercent, FLAGS_writepercent,
|
||||||
|
FLAGS_delpercent, FLAGS_delrangepercent, FLAGS_iterpercent,
|
||||||
|
FLAGS_customopspercent);
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) {
|
if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) {
|
||||||
@ -287,6 +291,9 @@ int db_stress_tool(int argc, char** argv) {
|
|||||||
"batch_protection_bytes_per_key > 0\n");
|
"batch_protection_bytes_per_key > 0\n");
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
if (FLAGS_test_multi_ops_txns) {
|
||||||
|
CheckAndSetOptionsForMultiOpsTxnStressTest();
|
||||||
|
}
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
KillPoint* kp = KillPoint::GetInstance();
|
KillPoint* kp = KillPoint::GetInstance();
|
||||||
@ -331,6 +338,8 @@ int db_stress_tool(int argc, char** argv) {
|
|||||||
stress.reset(CreateCfConsistencyStressTest());
|
stress.reset(CreateCfConsistencyStressTest());
|
||||||
} else if (FLAGS_test_batches_snapshots) {
|
} else if (FLAGS_test_batches_snapshots) {
|
||||||
stress.reset(CreateBatchedOpsStressTest());
|
stress.reset(CreateBatchedOpsStressTest());
|
||||||
|
} else if (FLAGS_test_multi_ops_txns) {
|
||||||
|
stress.reset(CreateMultiOpsTxnsStressTest());
|
||||||
} else {
|
} else {
|
||||||
stress.reset(CreateNonBatchedOpsStressTest());
|
stress.reset(CreateNonBatchedOpsStressTest());
|
||||||
}
|
}
|
||||||
|
1035
db_stress_tool/multi_ops_txns_stress.cc
Normal file
1035
db_stress_tool/multi_ops_txns_stress.cc
Normal file
File diff suppressed because it is too large
Load Diff
302
db_stress_tool/multi_ops_txns_stress.h
Normal file
302
db_stress_tool/multi_ops_txns_stress.h
Normal file
@ -0,0 +1,302 @@
|
|||||||
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under both the GPLv2 (found in the
|
||||||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
//
|
||||||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
|
||||||
|
#ifdef GFLAGS
|
||||||
|
#include "db_stress_tool/db_stress_common.h"
|
||||||
|
|
||||||
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
|
// This file defines MultiOpsTxnsStress so that we can stress test RocksDB
|
||||||
|
// transactions on a simple, emulated relational table.
|
||||||
|
//
|
||||||
|
// The record format is similar to the example found at
|
||||||
|
// https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format.
|
||||||
|
//
|
||||||
|
// The table is created by
|
||||||
|
// ```
|
||||||
|
// create table t1 (
|
||||||
|
// a int primary key,
|
||||||
|
// b int,
|
||||||
|
// c int,
|
||||||
|
// key(c),
|
||||||
|
// )
|
||||||
|
// ```
|
||||||
|
//
|
||||||
|
// (For simplicity, we use uint32_t for int here.)
|
||||||
|
//
|
||||||
|
// For this table, there is a primary index using `a`, as well as a secondary
|
||||||
|
// index using `c` and `a`.
|
||||||
|
//
|
||||||
|
// Primary key format:
|
||||||
|
// | index id | M(a) |
|
||||||
|
// Primary index value:
|
||||||
|
// | b | c |
|
||||||
|
// M(a) represents the big-endian format of a.
|
||||||
|
//
|
||||||
|
// Secondary key format:
|
||||||
|
// | index id | M(c) | M(a) |
|
||||||
|
// Secondary index value:
|
||||||
|
// | crc32 |
|
||||||
|
// Similarly to M(a), M(c) is the big-endian format of c.
|
||||||
|
//
|
||||||
|
// The in-memory representation of a record is defined in class
|
||||||
|
// MultiOpsTxnsStress:Record that includes a number of helper methods to
|
||||||
|
// encode/decode primary index keys, primary index values, secondary index keys,
|
||||||
|
// secondary index values, etc.
|
||||||
|
//
|
||||||
|
// Sometimes primary index and secondary index reside on different column
|
||||||
|
// families, but sometimes they colocate in the same column family. Current
|
||||||
|
// implementation puts them in the same (default) column family, and this is
|
||||||
|
// subject to future change if we find it interesting to test the other case.
|
||||||
|
//
|
||||||
|
// Class MultiOpsTxnsStressTest has the following transactions for testing.
|
||||||
|
//
|
||||||
|
// 1. Primary key update
|
||||||
|
// UPDATE t1 SET a = 3 WHERE a = 2;
|
||||||
|
// ```
|
||||||
|
// tx->GetForUpdate(primary key a=2)
|
||||||
|
// tx->GetForUpdate(primary key a=3)
|
||||||
|
// tx->Delete(primary key a=2)
|
||||||
|
// tx->Put(primary key a=3, value)
|
||||||
|
// tx->batch->SingleDelete(secondary key a=2)
|
||||||
|
// tx->batch->Put(secondary key a=3, value)
|
||||||
|
// tx->Prepare()
|
||||||
|
// Tx->Commit()
|
||||||
|
// ```
|
||||||
|
//
|
||||||
|
// 2. Secondary key update
|
||||||
|
// UPDATE t1 SET c = 3 WHERE c = 2;
|
||||||
|
// ```
|
||||||
|
// iter->Seek(secondary key)
|
||||||
|
// // Get corresponding primary key value(s) from iterator
|
||||||
|
// tx->GetForUpdate(primary key)
|
||||||
|
// tx->Put(primary key, value c=3)
|
||||||
|
// tx->batch->SingleDelete(secondary key c=2)
|
||||||
|
// tx->batch->Put(secondary key c=3)
|
||||||
|
// tx->Prepare()
|
||||||
|
// tx->Commit()
|
||||||
|
// ```
|
||||||
|
//
|
||||||
|
// 3. Primary index value update
|
||||||
|
// UPDATE t1 SET b = b + 1 WHERE a = 2;
|
||||||
|
// ```
|
||||||
|
// tx->GetForUpdate(primary key a=2)
|
||||||
|
// tx->Put(primary key a=2, value b=b+1)
|
||||||
|
// tx->Prepare()
|
||||||
|
// tx->Commit()
|
||||||
|
// ```
|
||||||
|
//
|
||||||
|
// 4. Point lookup
|
||||||
|
// SELECT * FROM t1 WHERE a = 3;
|
||||||
|
// ```
|
||||||
|
// tx->Get(primary key a=3)
|
||||||
|
// tx->Commit()
|
||||||
|
// ```
|
||||||
|
//
|
||||||
|
// 5. Range scan
|
||||||
|
// SELECT * FROM t1 WHERE c = 2;
|
||||||
|
// ```
|
||||||
|
// it = tx->GetIterator()
|
||||||
|
// it->Seek(secondary key c=2)
|
||||||
|
// tx->Commit()
|
||||||
|
// ```
|
||||||
|
|
||||||
|
class MultiOpsTxnsStressTest : public StressTest {
|
||||||
|
public:
|
||||||
|
class Record {
|
||||||
|
public:
|
||||||
|
static constexpr uint32_t kPrimaryIndexId = 1;
|
||||||
|
static constexpr uint32_t kSecondaryIndexId = 2;
|
||||||
|
|
||||||
|
static constexpr size_t kPrimaryIndexEntrySize = 8 + 8;
|
||||||
|
static constexpr size_t kSecondaryIndexEntrySize = 12 + 4;
|
||||||
|
|
||||||
|
static_assert(kPrimaryIndexId < kSecondaryIndexId,
|
||||||
|
"kPrimaryIndexId must be smaller than kSecondaryIndexId");
|
||||||
|
|
||||||
|
static_assert(sizeof(kPrimaryIndexId) == sizeof(uint32_t),
|
||||||
|
"kPrimaryIndexId must be 4 bytes");
|
||||||
|
static_assert(sizeof(kSecondaryIndexId) == sizeof(uint32_t),
|
||||||
|
"kSecondaryIndexId must be 4 bytes");
|
||||||
|
|
||||||
|
// Used for generating search key to probe primary index.
|
||||||
|
static std::string EncodePrimaryKey(uint32_t a);
|
||||||
|
// Used for generating search prefix to probe secondary index.
|
||||||
|
static std::string EncodeSecondaryKey(uint32_t c);
|
||||||
|
// Used for generating search key to probe secondary index.
|
||||||
|
static std::string EncodeSecondaryKey(uint32_t c, uint32_t a);
|
||||||
|
|
||||||
|
static std::tuple<Status, uint32_t, uint32_t> DecodePrimaryIndexValue(
|
||||||
|
Slice primary_index_value);
|
||||||
|
|
||||||
|
static std::pair<Status, uint32_t> DecodeSecondaryIndexValue(
|
||||||
|
Slice secondary_index_value);
|
||||||
|
|
||||||
|
Record() = default;
|
||||||
|
Record(uint32_t _a, uint32_t _b, uint32_t _c) : a_(_a), b_(_b), c_(_c) {}
|
||||||
|
|
||||||
|
bool operator==(const Record& other) const {
|
||||||
|
return a_ == other.a_ && b_ == other.b_ && c_ == other.c_;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator!=(const Record& other) const { return !(*this == other); }
|
||||||
|
|
||||||
|
std::pair<std::string, std::string> EncodePrimaryIndexEntry() const;
|
||||||
|
|
||||||
|
std::string EncodePrimaryKey() const;
|
||||||
|
|
||||||
|
std::string EncodePrimaryIndexValue() const;
|
||||||
|
|
||||||
|
std::pair<std::string, std::string> EncodeSecondaryIndexEntry() const;
|
||||||
|
|
||||||
|
std::string EncodeSecondaryKey() const;
|
||||||
|
|
||||||
|
Status DecodePrimaryIndexEntry(Slice primary_index_key,
|
||||||
|
Slice primary_index_value);
|
||||||
|
|
||||||
|
Status DecodeSecondaryIndexEntry(Slice secondary_index_key,
|
||||||
|
Slice secondary_index_value);
|
||||||
|
|
||||||
|
uint32_t a_value() const { return a_; }
|
||||||
|
uint32_t b_value() const { return b_; }
|
||||||
|
uint32_t c_value() const { return c_; }
|
||||||
|
|
||||||
|
void SetA(uint32_t _a) { a_ = _a; }
|
||||||
|
void SetB(uint32_t _b) { b_ = _b; }
|
||||||
|
void SetC(uint32_t _c) { c_ = _c; }
|
||||||
|
|
||||||
|
std::string ToString() const {
|
||||||
|
std::string ret("(");
|
||||||
|
ret.append(std::to_string(a_));
|
||||||
|
ret.append(",");
|
||||||
|
ret.append(std::to_string(b_));
|
||||||
|
ret.append(",");
|
||||||
|
ret.append(std::to_string(c_));
|
||||||
|
ret.append(")");
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
friend class InvariantChecker;
|
||||||
|
|
||||||
|
uint32_t a_{0};
|
||||||
|
uint32_t b_{0};
|
||||||
|
uint32_t c_{0};
|
||||||
|
};
|
||||||
|
|
||||||
|
MultiOpsTxnsStressTest() {}
|
||||||
|
|
||||||
|
~MultiOpsTxnsStressTest() override {}
|
||||||
|
|
||||||
|
void FinishInitDb(SharedState*) override;
|
||||||
|
|
||||||
|
void ReopenAndPreloadDb(SharedState* shared);
|
||||||
|
|
||||||
|
bool IsStateTracked() const override { return false; }
|
||||||
|
|
||||||
|
Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
|
||||||
|
const std::vector<int>& rand_column_families,
|
||||||
|
const std::vector<int64_t>& rand_keys) override;
|
||||||
|
|
||||||
|
std::vector<Status> TestMultiGet(
|
||||||
|
ThreadState* thread, const ReadOptions& read_opts,
|
||||||
|
const std::vector<int>& rand_column_families,
|
||||||
|
const std::vector<int64_t>& rand_keys) override;
|
||||||
|
|
||||||
|
Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
|
||||||
|
const std::vector<int>& rand_column_families,
|
||||||
|
const std::vector<int64_t>& rand_keys) override;
|
||||||
|
|
||||||
|
// Given a key K, this creates an iterator which scans to K and then
|
||||||
|
// does a random sequence of Next/Prev operations.
|
||||||
|
Status TestIterate(ThreadState* thread, const ReadOptions& read_opts,
|
||||||
|
const std::vector<int>& rand_column_families,
|
||||||
|
const std::vector<int64_t>& rand_keys) override;
|
||||||
|
|
||||||
|
Status TestPut(ThreadState* thread, WriteOptions& write_opts,
|
||||||
|
const ReadOptions& read_opts, const std::vector<int>& cf_ids,
|
||||||
|
const std::vector<int64_t>& keys, char (&value)[100],
|
||||||
|
std::unique_ptr<MutexLock>& lock) override;
|
||||||
|
|
||||||
|
Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
|
||||||
|
const std::vector<int>& rand_column_families,
|
||||||
|
const std::vector<int64_t>& rand_keys,
|
||||||
|
std::unique_ptr<MutexLock>& lock) override;
|
||||||
|
|
||||||
|
Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
|
||||||
|
const std::vector<int>& rand_column_families,
|
||||||
|
const std::vector<int64_t>& rand_keys,
|
||||||
|
std::unique_ptr<MutexLock>& lock) override;
|
||||||
|
|
||||||
|
void TestIngestExternalFile(ThreadState* thread,
|
||||||
|
const std::vector<int>& rand_column_families,
|
||||||
|
const std::vector<int64_t>& rand_keys,
|
||||||
|
std::unique_ptr<MutexLock>& lock) override;
|
||||||
|
|
||||||
|
void TestCompactRange(ThreadState* thread, int64_t rand_key,
|
||||||
|
const Slice& start_key,
|
||||||
|
ColumnFamilyHandle* column_family) override;
|
||||||
|
|
||||||
|
Status TestBackupRestore(ThreadState* thread,
|
||||||
|
const std::vector<int>& rand_column_families,
|
||||||
|
const std::vector<int64_t>& rand_keys) override;
|
||||||
|
|
||||||
|
Status TestCheckpoint(ThreadState* thread,
|
||||||
|
const std::vector<int>& rand_column_families,
|
||||||
|
const std::vector<int64_t>& rand_keys) override;
|
||||||
|
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
Status TestApproximateSize(ThreadState* thread, uint64_t iteration,
|
||||||
|
const std::vector<int>& rand_column_families,
|
||||||
|
const std::vector<int64_t>& rand_keys) override;
|
||||||
|
#endif // !ROCKSDB_LITE
|
||||||
|
|
||||||
|
Status TestCustomOperations(
|
||||||
|
ThreadState* thread,
|
||||||
|
const std::vector<int>& rand_column_families) override;
|
||||||
|
|
||||||
|
Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a,
|
||||||
|
uint32_t new_a);
|
||||||
|
|
||||||
|
Status SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t old_c,
|
||||||
|
uint32_t new_c);
|
||||||
|
|
||||||
|
Status UpdatePrimaryIndexValueTxn(ThreadState* thread, uint32_t a,
|
||||||
|
uint32_t b_delta);
|
||||||
|
|
||||||
|
Status PointLookupTxn(ThreadState* thread, ReadOptions ropts, uint32_t a);
|
||||||
|
|
||||||
|
Status RangeScanTxn(ThreadState* thread, ReadOptions ropts, uint32_t c);
|
||||||
|
|
||||||
|
void VerifyDb(ThreadState* thread) const override;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
uint32_t ChooseA(ThreadState* thread);
|
||||||
|
|
||||||
|
uint32_t GenerateNextA();
|
||||||
|
|
||||||
|
private:
|
||||||
|
void PreloadDb(SharedState* shared, size_t num_c);
|
||||||
|
|
||||||
|
// TODO (yanqin) encapsulate the selection of keys a separate class.
|
||||||
|
std::atomic<uint32_t> next_a_{0};
|
||||||
|
};
|
||||||
|
|
||||||
|
class InvariantChecker {
|
||||||
|
public:
|
||||||
|
static_assert(sizeof(MultiOpsTxnsStressTest::Record().a_) == sizeof(uint32_t),
|
||||||
|
"MultiOpsTxnsStressTest::Record::a_ must be 4 bytes");
|
||||||
|
static_assert(sizeof(MultiOpsTxnsStressTest::Record().b_) == sizeof(uint32_t),
|
||||||
|
"MultiOpsTxnsStressTest::Record::b_ must be 4 bytes");
|
||||||
|
static_assert(sizeof(MultiOpsTxnsStressTest::Record().c_) == sizeof(uint32_t),
|
||||||
|
"MultiOpsTxnsStressTest::Record::c_ must be 4 bytes");
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
#endif // GFLAGS
|
1
src.mk
1
src.mk
@ -354,6 +354,7 @@ STRESS_LIB_SOURCES = \
|
|||||||
db_stress_tool/db_stress_tool.cc \
|
db_stress_tool/db_stress_tool.cc \
|
||||||
db_stress_tool/expected_state.cc \
|
db_stress_tool/expected_state.cc \
|
||||||
db_stress_tool/no_batched_ops_stress.cc \
|
db_stress_tool/no_batched_ops_stress.cc \
|
||||||
|
db_stress_tool/multi_ops_txns_stress.cc \
|
||||||
|
|
||||||
TEST_LIB_SOURCES = \
|
TEST_LIB_SOURCES = \
|
||||||
db/db_test_util.cc \
|
db/db_test_util.cc \
|
||||||
|
Loading…
Reference in New Issue
Block a user