Make the db_stress reopen loop in OperateDb() more robust (#5893)

Summary:
The loop in OperateDb() is getting quite complicated with the introduction of multiple key operations such as MultiGet and Reseeks. This is resulting in a number of corner cases that hangs db_stress due to synchronization problems during reopen (i.e when -reopen=<> option is specified). This PR makes it more robust by ensuring all db_stress threads vote to reopen the DB the exact same number of times.
Most of the changes in this diff are due to indentation.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5893

Test Plan: Run crash test

Differential Revision: D17823827

Pulled By: anand1976

fbshipit-source-id: ec893829f611ac7cac4057c0d3d99f9ffb6a6dd9
This commit is contained in:
anand76 2019-10-09 09:25:56 -07:00 committed by Facebook Github Bot
parent 5b123813f8
commit 80ad996b35

View File

@ -2087,18 +2087,11 @@ class StressTest {
const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
thread->stats.Start();
for (uint64_t i = 0, prev_i = 0; i < FLAGS_ops_per_thread; i++) {
for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
// In case i is incremented more than once due to multiple operations,
// such as MultiGet or iterator seeks, check whether we have crossed
// the ops_per_open boundary in the previous iteration. If it did,
// then vote to reopen
if (i != 0 &&
(i % ops_per_open == 0 ||
i % ops_per_open < prev_i % ops_per_open)) {
{
if (open_cnt != 0) {
thread->stats.FinishedSingleOp();
MutexLock l(thread->shared->GetMutex());
while (!thread->snapshot_queue.empty()) {
@ -2116,261 +2109,265 @@ class StressTest {
}
// Commenting this out as we don't want to reset stats on each open.
// thread->stats.Start();
}
for (uint64_t i = 0; i < ops_per_open; i++) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
}
prev_i = i;
// Change Options
if (FLAGS_set_options_one_in > 0 &&
thread->rand.OneIn(FLAGS_set_options_one_in)) {
SetOptions(thread);
}
// Change Options
if (FLAGS_set_options_one_in > 0 &&
thread->rand.OneIn(FLAGS_set_options_one_in)) {
SetOptions(thread);
}
if (FLAGS_set_in_place_one_in > 0 &&
thread->rand.OneIn(FLAGS_set_in_place_one_in)) {
options_.inplace_update_support ^= options_.inplace_update_support;
}
if (FLAGS_set_in_place_one_in > 0 &&
thread->rand.OneIn(FLAGS_set_in_place_one_in)) {
options_.inplace_update_support ^= options_.inplace_update_support;
}
MaybeClearOneColumnFamily(thread);
MaybeClearOneColumnFamily(thread);
#ifndef ROCKSDB_LITE
if (FLAGS_compact_files_one_in > 0 &&
thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) {
auto* random_cf =
column_families_[thread->rand.Next() % FLAGS_column_families];
rocksdb::ColumnFamilyMetaData cf_meta_data;
db_->GetColumnFamilyMetaData(random_cf, &cf_meta_data);
if (FLAGS_compact_files_one_in > 0 &&
thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) {
auto* random_cf =
column_families_[thread->rand.Next() % FLAGS_column_families];
rocksdb::ColumnFamilyMetaData cf_meta_data;
db_->GetColumnFamilyMetaData(random_cf, &cf_meta_data);
// Randomly compact up to three consecutive files from a level
const int kMaxRetry = 3;
for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
size_t random_level = thread->rand.Uniform(
static_cast<int>(cf_meta_data.levels.size()));
// Randomly compact up to three consecutive files from a level
const int kMaxRetry = 3;
for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
size_t random_level = thread->rand.Uniform(
static_cast<int>(cf_meta_data.levels.size()));
const auto& files = cf_meta_data.levels[random_level].files;
if (files.size() > 0) {
size_t random_file_index =
thread->rand.Uniform(static_cast<int>(files.size()));
if (files[random_file_index].being_compacted) {
// Retry as the selected file is currently being compacted
continue;
}
const auto& files = cf_meta_data.levels[random_level].files;
if (files.size() > 0) {
size_t random_file_index =
thread->rand.Uniform(static_cast<int>(files.size()));
if (files[random_file_index].being_compacted) {
// Retry as the selected file is currently being compacted
continue;
}
std::vector<std::string> input_files;
input_files.push_back(files[random_file_index].name);
if (random_file_index > 0 &&
!files[random_file_index - 1].being_compacted) {
input_files.push_back(files[random_file_index - 1].name);
}
if (random_file_index + 1 < files.size() &&
!files[random_file_index + 1].being_compacted) {
input_files.push_back(files[random_file_index + 1].name);
}
std::vector<std::string> input_files;
input_files.push_back(files[random_file_index].name);
if (random_file_index > 0 &&
!files[random_file_index - 1].being_compacted) {
input_files.push_back(files[random_file_index - 1].name);
}
if (random_file_index + 1 < files.size() &&
!files[random_file_index + 1].being_compacted) {
input_files.push_back(files[random_file_index + 1].name);
}
size_t output_level =
std::min(random_level + 1, cf_meta_data.levels.size() - 1);
auto s =
db_->CompactFiles(CompactionOptions(), random_cf, input_files,
static_cast<int>(output_level));
if (!s.ok()) {
fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
s.ToString().c_str());
thread->stats.AddNumCompactFilesFailed(1);
} else {
thread->stats.AddNumCompactFilesSucceed(1);
size_t output_level =
std::min(random_level + 1, cf_meta_data.levels.size() - 1);
auto s =
db_->CompactFiles(CompactionOptions(), random_cf, input_files,
static_cast<int>(output_level));
if (!s.ok()) {
fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
s.ToString().c_str());
thread->stats.AddNumCompactFilesFailed(1);
} else {
thread->stats.AddNumCompactFilesSucceed(1);
}
break;
}
}
}
#endif // !ROCKSDB_LITE
int64_t rand_key = GenerateOneKey(thread, i);
int rand_column_family = thread->rand.Next() % FLAGS_column_families;
std::string keystr = Key(rand_key);
Slice key = keystr;
std::unique_ptr<MutexLock> lock;
if (ShouldAcquireMutexOnKey()) {
lock.reset(new MutexLock(
shared->GetMutexForKey(rand_column_family, rand_key)));
}
auto column_family = column_families_[rand_column_family];
if (FLAGS_compact_range_one_in > 0 &&
thread->rand.Uniform(FLAGS_compact_range_one_in) == 0) {
int64_t end_key_num;
if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) {
end_key_num = port::kMaxInt64;
} else {
end_key_num = FLAGS_compact_range_width + rand_key;
}
std::string end_key_buf = Key(end_key_num);
Slice end_key(end_key_buf);
CompactRangeOptions cro;
cro.exclusive_manual_compaction =
static_cast<bool>(thread->rand.Next() % 2);
Status status = db_->CompactRange(cro, column_family, &key, &end_key);
if (!status.ok()) {
printf("Unable to perform CompactRange(): %s\n",
status.ToString().c_str());
}
}
std::vector<int> rand_column_families =
GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
if (FLAGS_flush_one_in > 0 &&
thread->rand.Uniform(FLAGS_flush_one_in) == 0) {
FlushOptions flush_opts;
std::vector<ColumnFamilyHandle*> cfhs;
std::for_each(
rand_column_families.begin(), rand_column_families.end(),
[this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
Status status = db_->Flush(flush_opts, cfhs);
if (!status.ok()) {
fprintf(stdout, "Unable to perform Flush(): %s\n",
status.ToString().c_str());
}
}
std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
if (FLAGS_ingest_external_file_one_in > 0 &&
thread->rand.Uniform(FLAGS_ingest_external_file_one_in) == 0) {
TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
}
if (FLAGS_backup_one_in > 0 &&
thread->rand.Uniform(FLAGS_backup_one_in) == 0) {
Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
if (!s.ok()) {
VerificationAbort(shared, "Backup/restore gave inconsistent state",
s);
}
}
if (FLAGS_checkpoint_one_in > 0 &&
thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
if (!s.ok()) {
VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
}
}
if (FLAGS_acquire_snapshot_one_in > 0 &&
thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
auto snapshot = db_->GetSnapshot();
ReadOptions ropt;
ropt.snapshot = snapshot;
std::string value_at;
// When taking a snapshot, we also read a key from that snapshot. We
// will later read the same key before releasing the snapshot and verify
// that the results are the same.
auto status_at = db_->Get(ropt, column_family, key, &value_at);
std::vector<bool> *key_vec = nullptr;
if (FLAGS_compare_full_db_state_snapshot &&
(thread->tid == 0)) {
key_vec = new std::vector<bool>(FLAGS_max_key);
// When `prefix_extractor` is set, seeking to beginning and scanning
// across prefixes are only supported with `total_order_seek` set.
ropt.total_order_seek = true;
std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
uint64_t key_val;
if (GetIntVal(iterator->key().ToString(), &key_val)) {
(*key_vec)[key_val] = true;
}
}
}
ThreadState::SnapshotState snap_state = {
snapshot, rand_column_family, column_family->GetName(),
keystr, status_at, value_at, key_vec};
thread->snapshot_queue.emplace(
std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops),
snap_state);
}
while (!thread->snapshot_queue.empty() &&
i >= thread->snapshot_queue.front().first) {
auto snap_state = thread->snapshot_queue.front().second;
assert(snap_state.snapshot);
// Note: this is unsafe as the cf might be dropped concurrently. But it
// is ok since unclean cf drop is cunnrently not supported by write
// prepared transactions.
Status s =
AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
if (!s.ok()) {
VerificationAbort(shared, "Snapshot gave inconsistent state", s);
}
db_->ReleaseSnapshot(snap_state.snapshot);
delete snap_state.key_vec;
thread->snapshot_queue.pop();
}
int prob_op = thread->rand.Uniform(100);
// Reset this in case we pick something other than a read op. We don't
// want to use a stale value when deciding at the beginning of the loop
// whether to vote to reopen
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
// OPERATION read
if (FLAGS_use_multiget) {
// Leave room for one more iteration of the loop with a single key
// batch. This is to ensure that each thread does exactly the same
// number of ops
int multiget_batch_size = static_cast<int>(
std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
FLAGS_ops_per_thread - i - 1));
// If its the last iteration, ensure that multiget_batch_size is 1
multiget_batch_size = std::max(multiget_batch_size, 1);
rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
i += multiget_batch_size - 1;
} else {
TestGet(thread, read_opts, rand_column_families, rand_keys);
}
} else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
// OPERATION prefix scan
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
// (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
// be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
// prefix
TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
} else if (prefixBound <= prob_op && prob_op < writeBound) {
// OPERATION write
TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
value, lock);
} else if (writeBound <= prob_op && prob_op < delBound) {
// OPERATION delete
TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
} else if (delBound <= prob_op && prob_op < delRangeBound) {
// OPERATION delete range
TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
lock);
} else {
// OPERATION iterate
int num_seeks = static_cast<int>(
std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
FLAGS_ops_per_thread - i - 1));
rand_keys = GenerateNKeys(thread, num_seeks, i);
i += num_seeks - 1;
TestIterate(thread, read_opts, rand_column_families, rand_keys);
}
thread->stats.FinishedSingleOp();
#ifndef ROCKSDB_LITE
uint32_t tid = thread->tid;
assert(secondaries_.empty() ||
static_cast<size_t>(tid) < secondaries_.size());
if (FLAGS_secondary_catch_up_one_in > 0 &&
thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) {
Status s = secondaries_[tid]->TryCatchUpWithPrimary();
if (!s.ok()) {
VerificationAbort(shared, "Secondary instance failed to catch up", s);
break;
}
}
}
#endif // !ROCKSDB_LITE
int64_t rand_key = GenerateOneKey(thread, i);
int rand_column_family = thread->rand.Next() % FLAGS_column_families;
std::string keystr = Key(rand_key);
Slice key = keystr;
std::unique_ptr<MutexLock> lock;
if (ShouldAcquireMutexOnKey()) {
lock.reset(new MutexLock(
shared->GetMutexForKey(rand_column_family, rand_key)));
}
auto column_family = column_families_[rand_column_family];
if (FLAGS_compact_range_one_in > 0 &&
thread->rand.Uniform(FLAGS_compact_range_one_in) == 0) {
int64_t end_key_num;
if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) {
end_key_num = port::kMaxInt64;
} else {
end_key_num = FLAGS_compact_range_width + rand_key;
}
std::string end_key_buf = Key(end_key_num);
Slice end_key(end_key_buf);
CompactRangeOptions cro;
cro.exclusive_manual_compaction =
static_cast<bool>(thread->rand.Next() % 2);
Status status = db_->CompactRange(cro, column_family, &key, &end_key);
if (!status.ok()) {
printf("Unable to perform CompactRange(): %s\n",
status.ToString().c_str());
}
}
std::vector<int> rand_column_families =
GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
if (FLAGS_flush_one_in > 0 &&
thread->rand.Uniform(FLAGS_flush_one_in) == 0) {
FlushOptions flush_opts;
std::vector<ColumnFamilyHandle*> cfhs;
std::for_each(
rand_column_families.begin(), rand_column_families.end(),
[this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
Status status = db_->Flush(flush_opts, cfhs);
if (!status.ok()) {
fprintf(stdout, "Unable to perform Flush(): %s\n",
status.ToString().c_str());
}
}
std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
if (FLAGS_ingest_external_file_one_in > 0 &&
thread->rand.Uniform(FLAGS_ingest_external_file_one_in) == 0) {
TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
}
if (FLAGS_backup_one_in > 0 &&
thread->rand.Uniform(FLAGS_backup_one_in) == 0) {
Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
if (!s.ok()) {
VerificationAbort(shared, "Backup/restore gave inconsistent state",
s);
}
}
if (FLAGS_checkpoint_one_in > 0 &&
thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
if (!s.ok()) {
VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
}
}
if (FLAGS_acquire_snapshot_one_in > 0 &&
thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
auto snapshot = db_->GetSnapshot();
ReadOptions ropt;
ropt.snapshot = snapshot;
std::string value_at;
// When taking a snapshot, we also read a key from that snapshot. We
// will later read the same key before releasing the snapshot and verify
// that the results are the same.
auto status_at = db_->Get(ropt, column_family, key, &value_at);
std::vector<bool> *key_vec = nullptr;
if (FLAGS_compare_full_db_state_snapshot &&
(thread->tid == 0)) {
key_vec = new std::vector<bool>(FLAGS_max_key);
// When `prefix_extractor` is set, seeking to beginning and scanning
// across prefixes are only supported with `total_order_seek` set.
ropt.total_order_seek = true;
std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
uint64_t key_val;
if (GetIntVal(iterator->key().ToString(), &key_val)) {
(*key_vec)[key_val] = true;
}
}
}
ThreadState::SnapshotState snap_state = {
snapshot, rand_column_family, column_family->GetName(),
keystr, status_at, value_at, key_vec};
thread->snapshot_queue.emplace(
std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops),
snap_state);
}
while (!thread->snapshot_queue.empty() &&
i >= thread->snapshot_queue.front().first) {
auto snap_state = thread->snapshot_queue.front().second;
assert(snap_state.snapshot);
// Note: this is unsafe as the cf might be dropped concurrently. But it
// is ok since unclean cf drop is cunnrently not supported by write
// prepared transactions.
Status s =
AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
if (!s.ok()) {
VerificationAbort(shared, "Snapshot gave inconsistent state", s);
}
db_->ReleaseSnapshot(snap_state.snapshot);
delete snap_state.key_vec;
thread->snapshot_queue.pop();
}
int prob_op = thread->rand.Uniform(100);
// Reset this in case we pick something other than a read op. We don't
// want to use a stale value when deciding at the beginning of the loop
// whether to vote to reopen
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
// OPERATION read
if (FLAGS_use_multiget) {
// Leave room for one more iteration of the loop with a single key
// batch. This is to ensure that each thread does exactly the same
// number of ops
int multiget_batch_size = static_cast<int>(
std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
FLAGS_ops_per_thread - i - 1));
// If its the last iteration, ensure that multiget_batch_size is 1
multiget_batch_size = std::max(multiget_batch_size, 1);
rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
i += multiget_batch_size - 1;
} else {
TestGet(thread, read_opts, rand_column_families, rand_keys);
}
} else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
// OPERATION prefix scan
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
// (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
// be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
// prefix
TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
} else if (prefixBound <= prob_op && prob_op < writeBound) {
// OPERATION write
TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
value, lock);
} else if (writeBound <= prob_op && prob_op < delBound) {
// OPERATION delete
TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
} else if (delBound <= prob_op && prob_op < delRangeBound) {
// OPERATION delete range
TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
lock);
} else {
// OPERATION iterate
int num_seeks = static_cast<int>(
std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
FLAGS_ops_per_thread - i - 1));
rand_keys = GenerateNKeys(thread, num_seeks, i);
i += num_seeks - 1;
TestIterate(thread, read_opts, rand_column_families, rand_keys);
}
thread->stats.FinishedSingleOp();
#ifndef ROCKSDB_LITE
uint32_t tid = thread->tid;
assert(secondaries_.empty() ||
static_cast<size_t>(tid) < secondaries_.size());
if (FLAGS_secondary_catch_up_one_in > 0 &&
thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) {
Status s = secondaries_[tid]->TryCatchUpWithPrimary();
if (!s.ok()) {
VerificationAbort(shared, "Secondary instance failed to catch up", s);
break;
}
}
#endif
}
}
while (!thread->snapshot_queue.empty()) {
db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);