Add user-defined timestamps to db_stress (#8061)

Summary:
Add some basic test for user-defined timestamp to db_stress. Currently,
read with timestamp always tries to read using the current timestamp.
Due to the per-key timestamp-sequence ordering constraint, we only add timestamp-
related tests to the `NonBatchedOpsStressTest` since this test serializes accesses
to the same key and uses a file to cross-check data correctness.
The timestamp feature is not supported in a number of components, e.g. Merge, SingleDelete,
DeleteRange, CompactionFilter, Readonly instance, secondary instance, SST file ingestion, transaction,
etc. Therefore, db_stress should exit if user enables both timestamp and these features at the same
time. The (currently) incompatible features can be found in
`CheckAndSetOptionsForUserTimestamp`.

This PR also fixes a bug triggered when timestamp is enabled together with
`index_type=kBinarySearchWithFirstKey`. This bug fix will also be in another separate PR
with more unit tests coverage. Fixing it here because I do not want to exclude the index type
from crash test.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8061

Test Plan: make crash_test_with_ts

Reviewed By: jay-zhuang

Differential Revision: D27056282

Pulled By: riversand963

fbshipit-source-id: c3e00ad1023fdb9ebbdf9601ec18270c5e2925a9
This commit is contained in:
Yanqin Jin 2021-03-23 05:12:04 -07:00 committed by Facebook GitHub Bot
parent 0d800dadea
commit 08144bc2f5
9 changed files with 260 additions and 34 deletions

View File

@ -929,7 +929,8 @@ endif # PLATFORM_SHARED_EXT
analyze tools tools_lib \ analyze tools tools_lib \
blackbox_crash_test_with_atomic_flush whitebox_crash_test_with_atomic_flush \ blackbox_crash_test_with_atomic_flush whitebox_crash_test_with_atomic_flush \
blackbox_crash_test_with_txn whitebox_crash_test_with_txn \ blackbox_crash_test_with_txn whitebox_crash_test_with_txn \
blackbox_crash_test_with_best_efforts_recovery blackbox_crash_test_with_best_efforts_recovery \
blackbox_crash_test_with_ts whitebox_crash_test_with_ts
all: $(LIBRARY) $(BENCHMARKS) tools tools_lib test_libs $(TESTS) all: $(LIBRARY) $(BENCHMARKS) tools tools_lib test_libs $(TESTS)
@ -1167,6 +1168,8 @@ crash_test_with_txn: whitebox_crash_test_with_txn blackbox_crash_test_with_txn
crash_test_with_best_efforts_recovery: blackbox_crash_test_with_best_efforts_recovery crash_test_with_best_efforts_recovery: blackbox_crash_test_with_best_efforts_recovery
crash_test_with_ts: whitebox_crash_test_with_ts blackbox_crash_test_with_ts
blackbox_crash_test: db_stress blackbox_crash_test: db_stress
$(PYTHON) -u tools/db_crashtest.py --simple blackbox $(CRASH_TEST_EXT_ARGS) $(PYTHON) -u tools/db_crashtest.py --simple blackbox $(CRASH_TEST_EXT_ARGS)
$(PYTHON) -u tools/db_crashtest.py blackbox $(CRASH_TEST_EXT_ARGS) $(PYTHON) -u tools/db_crashtest.py blackbox $(CRASH_TEST_EXT_ARGS)
@ -1180,6 +1183,9 @@ blackbox_crash_test_with_txn: db_stress
blackbox_crash_test_with_best_efforts_recovery: db_stress blackbox_crash_test_with_best_efforts_recovery: db_stress
$(PYTHON) -u tools/db_crashtest.py --test_best_efforts_recovery blackbox $(CRASH_TEST_EXT_ARGS) $(PYTHON) -u tools/db_crashtest.py --test_best_efforts_recovery blackbox $(CRASH_TEST_EXT_ARGS)
blackbox_crash_test_with_ts: db_stress
$(PYTHON) -u tools/db_crashtest.py --enable_ts blackbox $(CRASH_TEST_EXT_ARGS)
ifeq ($(CRASH_TEST_KILL_ODD),) ifeq ($(CRASH_TEST_KILL_ODD),)
CRASH_TEST_KILL_ODD=888887 CRASH_TEST_KILL_ODD=888887
endif endif
@ -1198,6 +1204,10 @@ whitebox_crash_test_with_txn: db_stress
$(PYTHON) -u tools/db_crashtest.py --txn whitebox --random_kill_odd \ $(PYTHON) -u tools/db_crashtest.py --txn whitebox --random_kill_odd \
$(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS) $(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS)
whitebox_crash_test_with_ts: db_stress
$(PYTHON) -u tools/db_crashtest.py --enable_ts whitebox --random_kill_odd \
$(CRASH_TEST_KILL_ODD) $(CRASH_TEST_EXT_ARGS)
asan_check: clean asan_check: clean
COMPILE_WITH_ASAN=1 $(MAKE) check -j32 COMPILE_WITH_ASAN=1 $(MAKE) check -j32
$(MAKE) clean $(MAKE) clean
@ -1474,7 +1484,7 @@ memtablerep_bench: $(OBJ_DIR)/memtable/memtablerep_bench.o $(LIBRARY)
filter_bench: $(OBJ_DIR)/util/filter_bench.o $(LIBRARY) filter_bench: $(OBJ_DIR)/util/filter_bench.o $(LIBRARY)
$(AM_LINK) $(AM_LINK)
db_stress: $(OBJ_DIR)/db_stress_tool/db_stress.o $(STRESS_LIBRARY) $(TOOLS_LIBRARY) $(LIBRARY) db_stress: $(OBJ_DIR)/db_stress_tool/db_stress.o $(STRESS_LIBRARY) $(TOOLS_LIBRARY) $(TESTUTIL) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
write_stress: $(OBJ_DIR)/tools/write_stress.o $(LIBRARY) write_stress: $(OBJ_DIR)/tools/write_stress.o $(LIBRARY)

View File

@ -30,7 +30,7 @@ enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e =
ROCKSDB_NAMESPACE::kCRC32c; ROCKSDB_NAMESPACE::kCRC32c;
enum RepFactory FLAGS_rep_factory = kSkipList; enum RepFactory FLAGS_rep_factory = kSkipList;
std::vector<double> sum_probs(100001); std::vector<double> sum_probs(100001);
int64_t zipf_sum_size = 100000; constexpr int64_t zipf_sum_size = 100000;
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -233,6 +233,15 @@ size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) {
return value_sz; // the size of the value set. return value_sz; // the size of the value set.
} }
std::string NowNanosStr() {
uint64_t t = db_stress_env->NowNanos();
std::string ret;
PutFixed64(&ret, t);
return ret;
}
std::string GenerateTimestampForRead() { return NowNanosStr(); }
namespace { namespace {
class MyXXH64Checksum : public FileChecksumGenerator { class MyXXH64Checksum : public FileChecksumGenerator {

View File

@ -260,9 +260,11 @@ DECLARE_bool(enable_compaction_filter);
DECLARE_bool(paranoid_file_checks); DECLARE_bool(paranoid_file_checks);
DECLARE_uint64(batch_protection_bytes_per_key); DECLARE_uint64(batch_protection_bytes_per_key);
const long KB = 1024; DECLARE_uint64(user_timestamp_size);
const int kRandomValueMaxFactor = 3;
const int kValueMaxLen = 100; constexpr long KB = 1024;
constexpr int kRandomValueMaxFactor = 3;
constexpr int kValueMaxLen = 100;
// wrapped posix or hdfs environment // wrapped posix or hdfs environment
extern ROCKSDB_NAMESPACE::Env* db_stress_env; extern ROCKSDB_NAMESPACE::Env* db_stress_env;
@ -561,6 +563,9 @@ extern StressTest* CreateNonBatchedOpsStressTest();
extern void InitializeHotKeyGenerator(double alpha); extern void InitializeHotKeyGenerator(double alpha);
extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key); extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key);
extern std::string GenerateTimestampForRead();
extern std::string NowNanosStr();
std::shared_ptr<FileChecksumGenFactory> GetFileChecksumImpl( std::shared_ptr<FileChecksumGenFactory> GetFileChecksumImpl(
const std::string& name); const std::string& name);
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -804,4 +804,8 @@ DEFINE_string(file_checksum_impl, "none",
DEFINE_int32(write_fault_one_in, 0, DEFINE_int32(write_fault_one_in, 0,
"On non-zero, enables fault injection on write"); "On non-zero, enables fault injection on write");
DEFINE_uint64(user_timestamp_size, 0,
"Number of bytes for a user-defined timestamp. Currently, only "
"8-byte is supported");
#endif // GFLAGS #endif // GFLAGS

View File

@ -418,6 +418,8 @@ struct ThreadState {
std::string value; std::string value;
// optional state of all keys in the db // optional state of all keys in the db
std::vector<bool>* key_vec; std::vector<bool>* key_vec;
std::string timestamp;
}; };
std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue; std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue;

View File

@ -317,6 +317,11 @@ Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
} }
ReadOptions ropt; ReadOptions ropt;
ropt.snapshot = snap_state.snapshot; ropt.snapshot = snap_state.snapshot;
Slice ts;
if (!snap_state.timestamp.empty()) {
ts = snap_state.timestamp;
ropt.timestamp = &ts;
}
PinnableSlice exp_v(&snap_state.value); PinnableSlice exp_v(&snap_state.value);
exp_v.PinSelf(); exp_v.PinSelf();
PinnableSlice v; PinnableSlice v;
@ -422,6 +427,13 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
} }
} else { } else {
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = NowNanosStr();
ts = ts_str;
write_opts.timestamp = &ts;
}
s = db_->Put(write_opts, cfh, key, v); s = db_->Put(write_opts, cfh, key, v);
} else { } else {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -564,10 +576,9 @@ void StressTest::OperateDb(ThreadState* thread) {
if (FLAGS_write_fault_one_in) { if (FLAGS_write_fault_one_in) {
IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
error_msg.SetRetryable(true); error_msg.SetRetryable(true);
std::vector<FileType> types; std::vector<FileType> types = {FileType::kTableFile,
types.push_back(FileType::kTableFile); FileType::kDescriptorFile,
types.push_back(FileType::kDescriptorFile); FileType::kCurrentFile};
types.push_back(FileType::kCurrentFile);
fault_fs_guard->SetRandomWriteError( fault_fs_guard->SetRandomWriteError(
thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg, types); thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg, types);
} }
@ -766,6 +777,20 @@ void StressTest::OperateDb(ThreadState* thread) {
} }
} }
// Assign timestamps if necessary.
std::string read_ts_str;
std::string write_ts_str;
Slice read_ts;
Slice write_ts;
if (ShouldAcquireMutexOnKey() && FLAGS_user_timestamp_size > 0) {
read_ts_str = GenerateTimestampForRead();
read_ts = read_ts_str;
read_opts.timestamp = &read_ts;
write_ts_str = NowNanosStr();
write_ts = write_ts_str;
write_opts.timestamp = &write_ts;
}
int prob_op = thread->rand.Uniform(100); int prob_op = thread->rand.Uniform(100);
// Reset this in case we pick something other than a read op. We don't // Reset this in case we pick something other than a read op. We don't
// want to use a stale value when deciding at the beginning of the loop // want to use a stale value when deciding at the beginning of the loop
@ -856,8 +881,16 @@ std::vector<std::string> StressTest::GetWhiteBoxKeys(ThreadState* thread,
std::vector<std::string> boundaries; std::vector<std::string> boundaries;
for (const LevelMetaData& lmd : cfmd.levels) { for (const LevelMetaData& lmd : cfmd.levels) {
for (const SstFileMetaData& sfmd : lmd.files) { for (const SstFileMetaData& sfmd : lmd.files) {
boundaries.push_back(sfmd.smallestkey); // If FLAGS_user_timestamp_size > 0, then both smallestkey and largestkey
boundaries.push_back(sfmd.largestkey); // have timestamps.
const auto& skey = sfmd.smallestkey;
const auto& lkey = sfmd.largestkey;
assert(skey.size() >= FLAGS_user_timestamp_size);
assert(lkey.size() >= FLAGS_user_timestamp_size);
boundaries.push_back(
skey.substr(0, skey.size() - FLAGS_user_timestamp_size));
boundaries.push_back(
lkey.substr(0, lkey.size() - FLAGS_user_timestamp_size));
} }
} }
if (boundaries.empty()) { if (boundaries.empty()) {
@ -1007,6 +1040,7 @@ Status StressTest::TestIterate(ThreadState* thread,
// iterators with the same set-up, and it doesn't hurt to check them // iterators with the same set-up, and it doesn't hurt to check them
// to be equal. // to be equal.
ReadOptions cmp_ro; ReadOptions cmp_ro;
cmp_ro.timestamp = readoptionscopy.timestamp;
cmp_ro.snapshot = snapshot; cmp_ro.snapshot = snapshot;
cmp_ro.total_order_seek = true; cmp_ro.total_order_seek = true;
ColumnFamilyHandle* cmp_cfh = ColumnFamilyHandle* cmp_cfh =
@ -1126,21 +1160,25 @@ void StressTest::VerifyIterator(ThreadState* thread,
*diverged = true; *diverged = true;
return; return;
} else if (op == kLastOpSeek && ro.iterate_lower_bound != nullptr && } else if (op == kLastOpSeek && ro.iterate_lower_bound != nullptr &&
(options_.comparator->Compare(*ro.iterate_lower_bound, seek_key) >= (options_.comparator->CompareWithoutTimestamp(
0 || *ro.iterate_lower_bound, /*a_has_ts=*/false, seek_key,
/*b_has_ts=*/false) >= 0 ||
(ro.iterate_upper_bound != nullptr && (ro.iterate_upper_bound != nullptr &&
options_.comparator->Compare(*ro.iterate_lower_bound, options_.comparator->CompareWithoutTimestamp(
*ro.iterate_upper_bound) >= 0))) { *ro.iterate_lower_bound, /*a_has_ts=*/false,
*ro.iterate_upper_bound, /*b_has_ts*/ false) >= 0))) {
// Lower bound behavior is not well defined if it is larger than // Lower bound behavior is not well defined if it is larger than
// seek key or upper bound. Disable the check for now. // seek key or upper bound. Disable the check for now.
*diverged = true; *diverged = true;
return; return;
} else if (op == kLastOpSeekForPrev && ro.iterate_upper_bound != nullptr && } else if (op == kLastOpSeekForPrev && ro.iterate_upper_bound != nullptr &&
(options_.comparator->Compare(*ro.iterate_upper_bound, seek_key) <= (options_.comparator->CompareWithoutTimestamp(
0 || *ro.iterate_upper_bound, /*a_has_ts=*/false, seek_key,
/*b_has_ts=*/false) <= 0 ||
(ro.iterate_lower_bound != nullptr && (ro.iterate_lower_bound != nullptr &&
options_.comparator->Compare(*ro.iterate_lower_bound, options_.comparator->CompareWithoutTimestamp(
*ro.iterate_upper_bound) >= 0))) { *ro.iterate_lower_bound, /*a_has_ts=*/false,
*ro.iterate_upper_bound, /*b_has_ts=*/false) >= 0))) {
// Uppder bound behavior is not well defined if it is smaller than // Uppder bound behavior is not well defined if it is smaller than
// seek key or lower bound. Disable the check for now. // seek key or lower bound. Disable the check for now.
*diverged = true; *diverged = true;
@ -1209,9 +1247,13 @@ void StressTest::VerifyIterator(ThreadState* thread,
if ((iter->Valid() && iter->key() != cmp_iter->key()) || if ((iter->Valid() && iter->key() != cmp_iter->key()) ||
(!iter->Valid() && (!iter->Valid() &&
(ro.iterate_upper_bound == nullptr || (ro.iterate_upper_bound == nullptr ||
cmp->Compare(total_order_key, *ro.iterate_upper_bound) < 0) && cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
*ro.iterate_upper_bound,
/*b_has_ts=*/false) < 0) &&
(ro.iterate_lower_bound == nullptr || (ro.iterate_lower_bound == nullptr ||
cmp->Compare(total_order_key, *ro.iterate_lower_bound) > 0))) { cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
*ro.iterate_lower_bound,
/*b_has_ts=*/false) > 0))) {
fprintf(stderr, fprintf(stderr,
"Iterator diverged from control iterator which" "Iterator diverged from control iterator which"
" has value %s %s\n", " has value %s %s\n",
@ -1407,8 +1449,16 @@ Status StressTest::TestBackupRestore(
std::string key_str = Key(rand_keys[0]); std::string key_str = Key(rand_keys[0]);
Slice key = key_str; Slice key = key_str;
std::string restored_value; std::string restored_value;
ReadOptions read_opts;
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts = ts_str;
read_opts.timestamp = &ts;
}
Status get_status = restored_db->Get( Status get_status = restored_db->Get(
ReadOptions(), restored_cf_handles[rand_column_families[i]], key, read_opts, restored_cf_handles[rand_column_families[i]], key,
&restored_value); &restored_value);
bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]); bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
if (get_status.ok()) { if (get_status.ok()) {
@ -1739,6 +1789,7 @@ void StressTest::TestAcquireSnapshot(ThreadState* thread,
const std::string& keystr, uint64_t i) { const std::string& keystr, uint64_t i) {
Slice key = keystr; Slice key = keystr;
ColumnFamilyHandle* column_family = column_families_[rand_column_family]; ColumnFamilyHandle* column_family = column_families_[rand_column_family];
ReadOptions ropt;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB()); auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
const bool ww_snapshot = thread->rand.OneIn(10); const bool ww_snapshot = thread->rand.OneIn(10);
@ -1748,8 +1799,19 @@ void StressTest::TestAcquireSnapshot(ThreadState* thread,
#else #else
const Snapshot* snapshot = db_->GetSnapshot(); const Snapshot* snapshot = db_->GetSnapshot();
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
ReadOptions ropt;
ropt.snapshot = snapshot; ropt.snapshot = snapshot;
// Ideally, we want snapshot taking and timestamp generation to be atomic
// here, so that the snapshot corresponds to the timestamp. However, it is
// not possible with current GetSnapshot() API.
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts = ts_str;
ropt.timestamp = &ts;
}
std::string value_at; std::string value_at;
// When taking a snapshot, we also read a key from that snapshot. We // When taking a snapshot, we also read a key from that snapshot. We
// will later read the same key before releasing the snapshot and // will later read the same key before releasing the snapshot and
@ -1771,10 +1833,14 @@ void StressTest::TestAcquireSnapshot(ThreadState* thread,
} }
} }
ThreadState::SnapshotState snap_state = { ThreadState::SnapshotState snap_state = {snapshot,
snapshot, rand_column_family, column_family->GetName(), rand_column_family,
keystr, status_at, value_at, column_family->GetName(),
key_vec}; keystr,
status_at,
value_at,
key_vec,
ts_str};
uint64_t hold_for = FLAGS_snapshot_hold_ops; uint64_t hold_for = FLAGS_snapshot_hold_ops;
if (FLAGS_long_running_snapshots) { if (FLAGS_long_running_snapshots) {
// Hold 10% of snapshots for 10x more // Hold 10% of snapshots for 10x more
@ -1879,6 +1945,13 @@ uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
ReadOptions ro; ReadOptions ro;
ro.snapshot = snapshot; ro.snapshot = snapshot;
ro.total_order_seek = true; ro.total_order_seek = true;
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts = ts_str;
ro.timestamp = &ts;
}
std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family)); std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family));
for (it->Seek(start_key); for (it->Seek(start_key);
it->Valid() && options_.comparator->Compare(it->key(), end_key) <= 0; it->Valid() && options_.comparator->Compare(it->key(), end_key) <= 0;
@ -2004,6 +2077,8 @@ void StressTest::PrintEnv() const {
fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection); fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection);
fprintf(stdout, "Best efforts recovery : %d\n", fprintf(stdout, "Best efforts recovery : %d\n",
static_cast<int>(FLAGS_best_efforts_recovery)); static_cast<int>(FLAGS_best_efforts_recovery));
fprintf(stdout, "User timestamp size bytes : %d\n",
static_cast<int>(FLAGS_user_timestamp_size));
fprintf(stdout, "------------------------------------------------\n"); fprintf(stdout, "------------------------------------------------\n");
} }
@ -2247,6 +2322,11 @@ void StressTest::Open() {
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
Status s; Status s;
if (FLAGS_user_timestamp_size > 0) {
CheckAndSetOptionsForUserTimestamp();
}
if (FLAGS_ttl == -1) { if (FLAGS_ttl == -1) {
std::vector<std::string> existing_column_families; std::vector<std::string> existing_column_families;
s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db, s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
@ -2498,5 +2578,72 @@ void StressTest::Reopen(ThreadState* thread) {
clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_); clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
Open(); Open();
} }
void StressTest::CheckAndSetOptionsForUserTimestamp() {
assert(FLAGS_user_timestamp_size > 0);
const Comparator* const cmp = test::ComparatorWithU64Ts();
assert(cmp);
if (FLAGS_user_timestamp_size != cmp->timestamp_size()) {
fprintf(stderr,
"Only -user_timestamp_size=%d is supported in stress test.\n",
static_cast<int>(cmp->timestamp_size()));
exit(1);
}
if (FLAGS_nooverwritepercent > 0) {
fprintf(stderr,
"-nooverwritepercent must be 0 because SingleDelete must be "
"disabled.\n");
exit(1);
}
if (FLAGS_use_merge || FLAGS_use_full_merge_v1) {
fprintf(stderr, "Merge does not support timestamp yet.\n");
exit(1);
}
if (FLAGS_delrangepercent > 0) {
fprintf(stderr, "DeleteRange does not support timestamp yet.\n");
exit(1);
}
if (FLAGS_use_txn) {
fprintf(stderr, "TransactionDB does not support timestamp yet.\n");
exit(1);
}
if (FLAGS_read_only) {
fprintf(stderr, "When opened as read-only, timestamp not supported.\n");
exit(1);
}
if (FLAGS_test_secondary || FLAGS_secondary_catch_up_one_in > 0 ||
FLAGS_continuous_verification_interval > 0) {
fprintf(stderr, "Secondary instance does not support timestamp.\n");
exit(1);
}
if (FLAGS_checkpoint_one_in > 0) {
fprintf(stderr,
"-checkpoint_one_in=%d requires "
"DBImplReadOnly, which is not supported with timestamp\n",
FLAGS_checkpoint_one_in);
exit(1);
}
#ifndef ROCKSDB_LITE
if (FLAGS_enable_blob_files || FLAGS_use_blob_db) {
fprintf(stderr, "BlobDB not supported with timestamp.\n");
exit(1);
}
#endif // !ROCKSDB_LITE
if (FLAGS_enable_compaction_filter) {
fprintf(stderr, "CompactionFilter not supported with timestamp.\n");
exit(1);
}
if (FLAGS_test_cf_consistency || FLAGS_test_batches_snapshots) {
fprintf(stderr,
"Due to per-key ts-seq ordering constraint, only the (default) "
"non-batched test is supported with timestamp.\n");
exit(1);
}
if (FLAGS_ingest_external_file_one_in > 0) {
fprintf(stderr, "Bulk loading may not support timestamp yet.\n");
exit(1);
}
options_.comparator = cmp;
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS #endif // GFLAGS

View File

@ -211,6 +211,8 @@ class StressTest {
void Reopen(ThreadState* thread); void Reopen(ThreadState* thread);
void CheckAndSetOptionsForUserTimestamp();
std::shared_ptr<Cache> cache_; std::shared_ptr<Cache> cache_;
std::shared_ptr<Cache> compressed_cache_; std::shared_ptr<Cache> compressed_cache_;
std::shared_ptr<const FilterPolicy> filter_policy_; std::shared_ptr<const FilterPolicy> filter_policy_;

View File

@ -22,6 +22,13 @@ class NonBatchedOpsStressTest : public StressTest {
void VerifyDb(ThreadState* thread) const override { void VerifyDb(ThreadState* thread) const override {
ReadOptions options(FLAGS_verify_checksum, true); ReadOptions options(FLAGS_verify_checksum, true);
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts = ts_str;
options.timestamp = &ts;
}
auto shared = thread->shared; auto shared = thread->shared;
const int64_t max_key = shared->GetMaxKey(); const int64_t max_key = shared->GetMaxKey();
const int64_t keys_per_thread = max_key / shared->GetNumThreads(); const int64_t keys_per_thread = max_key / shared->GetNumThreads();
@ -477,6 +484,8 @@ class NonBatchedOpsStressTest : public StressTest {
int64_t max_key = shared->GetMaxKey(); int64_t max_key = shared->GetMaxKey();
int64_t rand_key = rand_keys[0]; int64_t rand_key = rand_keys[0];
int rand_column_family = rand_column_families[0]; int rand_column_family = rand_column_families[0];
std::string write_ts_str;
Slice write_ts;
while (!shared->AllowsOverwrite(rand_key) && while (!shared->AllowsOverwrite(rand_key) &&
(FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) { (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
lock.reset(); lock.reset();
@ -484,6 +493,11 @@ class NonBatchedOpsStressTest : public StressTest {
rand_column_family = thread->rand.Next() % FLAGS_column_families; rand_column_family = thread->rand.Next() % FLAGS_column_families;
lock.reset( lock.reset(
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key))); new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
if (FLAGS_user_timestamp_size > 0) {
write_ts_str = NowNanosStr();
write_ts = write_ts_str;
write_opts.timestamp = &write_ts;
}
} }
std::string key_str = Key(rand_key); std::string key_str = Key(rand_key);
@ -559,6 +573,8 @@ class NonBatchedOpsStressTest : public StressTest {
// OPERATION delete // OPERATION delete
// If the chosen key does not allow overwrite and it does not exist, // If the chosen key does not allow overwrite and it does not exist,
// choose another key. // choose another key.
std::string write_ts_str;
Slice write_ts;
while (!shared->AllowsOverwrite(rand_key) && while (!shared->AllowsOverwrite(rand_key) &&
!shared->Exists(rand_column_family, rand_key)) { !shared->Exists(rand_column_family, rand_key)) {
lock.reset(); lock.reset();
@ -566,6 +582,11 @@ class NonBatchedOpsStressTest : public StressTest {
rand_column_family = thread->rand.Next() % FLAGS_column_families; rand_column_family = thread->rand.Next() % FLAGS_column_families;
lock.reset( lock.reset(
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key))); new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
if (FLAGS_user_timestamp_size > 0) {
write_ts_str = NowNanosStr();
write_ts = write_ts_str;
write_opts.timestamp = &write_ts;
}
} }
std::string key_str = Key(rand_key); std::string key_str = Key(rand_key);

View File

@ -281,6 +281,25 @@ blob_params = {
"backup_one_in": 0, "backup_one_in": 0,
} }
ts_params = {
"test_cf_consistency": 0,
"test_batches_snapshots": 0,
"user_timestamp_size": 8,
"use_merge": 0,
"use_full_merge_v1": 0,
# In order to disable SingleDelete
"nooverwritepercent": 0,
"use_txn": 0,
"read_only": 0,
"secondary_catch_up_one_in": 0,
"continuous_verification_interval": 0,
"checkpoint_one_in": 0,
"enable_blob_files": 0,
"use_blob_db": 0,
"enable_compaction_filter": 0,
"ingest_external_file_one_in": 0,
}
def finalize_and_sanitize(src_params): def finalize_and_sanitize(src_params):
dest_params = dict([(k, v() if callable(v) else v) dest_params = dict([(k, v() if callable(v) else v)
for (k, v) in src_params.items()]) for (k, v) in src_params.items()])
@ -306,9 +325,10 @@ def finalize_and_sanitize(src_params):
else: else:
dest_params["mock_direct_io"] = True dest_params["mock_direct_io"] = True
# DeleteRange is not currnetly compatible with Txns # DeleteRange is not currnetly compatible with Txns and timestamp
if dest_params.get("test_batches_snapshots") == 1 or \ if (dest_params.get("test_batches_snapshots") == 1 or
dest_params.get("use_txn") == 1: dest_params.get("use_txn") == 1 or
dest_params.get("user_timestamp_size") > 0):
dest_params["delpercent"] += dest_params["delrangepercent"] dest_params["delpercent"] += dest_params["delrangepercent"]
dest_params["delrangepercent"] = 0 dest_params["delrangepercent"] = 0
# Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb # Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb
@ -373,11 +393,15 @@ def gen_cmd_params(args):
params.update(txn_params) params.update(txn_params)
if args.test_best_efforts_recovery: if args.test_best_efforts_recovery:
params.update(best_efforts_recovery_params) params.update(best_efforts_recovery_params)
if args.enable_ts:
params.update(ts_params)
# Best-effort recovery and BlobDB are currently incompatible. Test BE recovery # Best-effort recovery and BlobDB are currently incompatible. Test BE recovery
# if specified on the command line; otherwise, apply BlobDB related overrides # if specified on the command line; otherwise, apply BlobDB related overrides
# with a 10% chance. # with a 10% chance.
if not args.test_best_efforts_recovery and random.choice([0] * 9 + [1]) == 1: if (not args.test_best_efforts_recovery and
not args.enable_ts and
random.choice([0] * 9 + [1]) == 1):
params.update(blob_params) params.update(blob_params)
for k, v in vars(args).items(): for k, v in vars(args).items():
@ -393,7 +417,7 @@ def gen_cmd(params, unknown_params):
for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)] for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
if k not in set(['test_type', 'simple', 'duration', 'interval', if k not in set(['test_type', 'simple', 'duration', 'interval',
'random_kill_odd', 'cf_consistency', 'txn', 'random_kill_odd', 'cf_consistency', 'txn',
'test_best_efforts_recovery']) 'test_best_efforts_recovery', 'enable_ts'])
and v is not None] + unknown_params and v is not None] + unknown_params
return cmd return cmd
@ -646,6 +670,7 @@ def main():
parser.add_argument("--cf_consistency", action='store_true') parser.add_argument("--cf_consistency", action='store_true')
parser.add_argument("--txn", action='store_true') parser.add_argument("--txn", action='store_true')
parser.add_argument("--test_best_efforts_recovery", action='store_true') parser.add_argument("--test_best_efforts_recovery", action='store_true')
parser.add_argument("--enable_ts", action='store_true')
all_params = dict(list(default_params.items()) all_params = dict(list(default_params.items())
+ list(blackbox_default_params.items()) + list(blackbox_default_params.items())
@ -653,7 +678,8 @@ def main():
+ list(simple_default_params.items()) + list(simple_default_params.items())
+ list(blackbox_simple_default_params.items()) + list(blackbox_simple_default_params.items())
+ list(whitebox_simple_default_params.items()) + list(whitebox_simple_default_params.items())
+ list(blob_params.items())) + list(blob_params.items())
+ list(ts_params.items()))
for k, v in all_params.items(): for k, v in all_params.items():
parser.add_argument("--" + k, type=type(v() if callable(v) else v)) parser.add_argument("--" + k, type=type(v() if callable(v) else v))