Add support for read-only db chkpt stress (#4690)
Summary: Updated stress test will support testing of db in read-only mode. The user has to make sure that only read/scan operations are enabled. This PR relies on #4681. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4690 Differential Revision: D13102741 Pulled By: riversand963 fbshipit-source-id: f5a36b34db187fe12dd355f7eda161f99d6c75e4
This commit is contained in:
parent
ace543a815
commit
565b5bdc42
@ -100,6 +100,8 @@ DEFINE_uint64(seed, 2341234, "Seed for PRNG");
|
||||
static const bool FLAGS_seed_dummy __attribute__((__unused__)) =
|
||||
RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range);
|
||||
|
||||
DEFINE_bool(read_only, false, "True if open DB in read-only mode during tests");
|
||||
|
||||
DEFINE_int64(max_key, 1 * KB* KB,
|
||||
"Max number of key/values to place in database");
|
||||
|
||||
@ -1384,7 +1386,8 @@ class StressTest {
|
||||
txn_db_(nullptr),
|
||||
#endif
|
||||
new_column_family_name_(1),
|
||||
num_times_reopened_(0) {
|
||||
num_times_reopened_(0),
|
||||
db_preload_finished_(false) {
|
||||
if (FLAGS_destroy_db_initially) {
|
||||
std::vector<std::string> files;
|
||||
FLAGS_env->GetChildren(FLAGS_db, &files);
|
||||
@ -1511,6 +1514,13 @@ class StressTest {
|
||||
Open();
|
||||
BuildOptionsTable();
|
||||
SharedState shared(this);
|
||||
|
||||
if (FLAGS_read_only) {
|
||||
now = FLAGS_env->NowMicros();
|
||||
fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
|
||||
FLAGS_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
|
||||
PreloadDbAndReopenAsReadOnly(FLAGS_max_key, &shared);
|
||||
}
|
||||
uint32_t n = shared.GetNumThreads();
|
||||
|
||||
now = FLAGS_env->NowMicros();
|
||||
@ -1761,6 +1771,93 @@ class StressTest {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Currently PreloadDb has to be single-threaded.
|
||||
void PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
|
||||
SharedState* shared) {
|
||||
WriteOptions write_opts;
|
||||
write_opts.disableWAL = FLAGS_disable_wal;
|
||||
if (FLAGS_sync) {
|
||||
write_opts.sync = true;
|
||||
}
|
||||
char value[100];
|
||||
int cf_idx = 0;
|
||||
Status s;
|
||||
for (auto cfh : column_families_) {
|
||||
for (int64_t k = 0; k != number_of_keys; ++k) {
|
||||
std::string key_str = Key(k);
|
||||
Slice key = key_str;
|
||||
size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
|
||||
Slice v(value, sz);
|
||||
shared->Put(cf_idx, k, 0, true /* pending */);
|
||||
|
||||
if (FLAGS_use_merge) {
|
||||
if (!FLAGS_use_txn) {
|
||||
s = db_->Merge(write_opts, cfh, key, v);
|
||||
} else {
|
||||
#ifndef ROCKSDB_LITE
|
||||
Transaction* txn;
|
||||
s = NewTxn(write_opts, &txn);
|
||||
if (s.ok()) {
|
||||
s = txn->Merge(cfh, key, v);
|
||||
if (s.ok()) {
|
||||
s = CommitTxn(txn);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
} else {
|
||||
if (!FLAGS_use_txn) {
|
||||
s = db_->Put(write_opts, cfh, key, v);
|
||||
} else {
|
||||
#ifndef ROCKSDB_LITE
|
||||
Transaction* txn;
|
||||
s = NewTxn(write_opts, &txn);
|
||||
if (s.ok()) {
|
||||
s = txn->Put(cfh, key, v);
|
||||
if (s.ok()) {
|
||||
s = CommitTxn(txn);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
shared->Put(cf_idx, k, 0, false /* pending */);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
++cf_idx;
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = db_->Flush(FlushOptions(), column_families_);
|
||||
}
|
||||
if (s.ok()) {
|
||||
for (auto cf : column_families_) {
|
||||
delete cf;
|
||||
}
|
||||
column_families_.clear();
|
||||
delete db_;
|
||||
db_ = nullptr;
|
||||
#ifndef ROCKSDB_LITE
|
||||
txn_db_ = nullptr;
|
||||
#endif
|
||||
|
||||
db_preload_finished_.store(true);
|
||||
auto now = FLAGS_env->NowMicros();
|
||||
fprintf(stdout, "%s Reopening database in read-only\n",
|
||||
FLAGS_env->TimeToString(now / 1000000).c_str());
|
||||
// Reopen as read-only, can ignore all options related to updates
|
||||
Open();
|
||||
} else {
|
||||
fprintf(stderr, "Failed to preload db");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
Status SetOptions(ThreadState* thread) {
|
||||
assert(FLAGS_set_options_one_in > 0);
|
||||
std::unordered_map<std::string, std::string> opts;
|
||||
@ -1848,8 +1945,7 @@ class StressTest {
|
||||
if (thread->shared->AllVotedReopen()) {
|
||||
thread->shared->GetStressTest()->Reopen();
|
||||
thread->shared->GetCondVar()->SignalAll();
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
thread->shared->GetCondVar()->Wait();
|
||||
}
|
||||
// Commenting this out as we don't want to reset stats on each open.
|
||||
@ -1871,28 +1967,6 @@ class StressTest {
|
||||
MaybeClearOneColumnFamily(thread);
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (FLAGS_checkpoint_one_in > 0 &&
|
||||
thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
|
||||
std::string checkpoint_dir =
|
||||
FLAGS_db + "/.checkpoint" + ToString(thread->tid);
|
||||
DestroyDB(checkpoint_dir, Options());
|
||||
Checkpoint* checkpoint;
|
||||
Status s = Checkpoint::Create(db_, &checkpoint);
|
||||
if (s.ok()) {
|
||||
s = checkpoint->CreateCheckpoint(checkpoint_dir);
|
||||
}
|
||||
std::vector<std::string> files;
|
||||
if (s.ok()) {
|
||||
s = FLAGS_env->GetChildren(checkpoint_dir, &files);
|
||||
}
|
||||
DestroyDB(checkpoint_dir, Options());
|
||||
delete checkpoint;
|
||||
if (!s.ok()) {
|
||||
printf("A checkpoint operation failed with: %s\n",
|
||||
s.ToString().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
if (FLAGS_compact_files_one_in > 0 &&
|
||||
thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) {
|
||||
auto* random_cf =
|
||||
@ -2009,6 +2083,14 @@ class StressTest {
|
||||
}
|
||||
}
|
||||
|
||||
if (FLAGS_checkpoint_one_in > 0 &&
|
||||
thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
|
||||
Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
|
||||
if (!s.ok()) {
|
||||
VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
|
||||
}
|
||||
}
|
||||
|
||||
if (FLAGS_acquire_snapshot_one_in > 0 &&
|
||||
thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
|
||||
auto snapshot = db_->GetSnapshot();
|
||||
@ -2208,6 +2290,17 @@ class StressTest {
|
||||
"TestBackupRestore\n");
|
||||
std::terminate();
|
||||
}
|
||||
|
||||
virtual Status TestCheckpoint(
|
||||
ThreadState* /* thread */,
|
||||
const std::vector<int>& /* rand_column_families */,
|
||||
const std::vector<int64_t>& /* rand_keys */) {
|
||||
assert(false);
|
||||
fprintf(stderr,
|
||||
"RocksDB lite does not support "
|
||||
"TestCheckpoint\n");
|
||||
std::terminate();
|
||||
}
|
||||
#else // ROCKSDB_LITE
|
||||
virtual Status TestBackupRestore(ThreadState* thread,
|
||||
const std::vector<int>& rand_column_families,
|
||||
@ -2295,6 +2388,79 @@ class StressTest {
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual Status TestCheckpoint(ThreadState* thread,
|
||||
const std::vector<int>& rand_column_families,
|
||||
const std::vector<int64_t>& rand_keys) {
|
||||
// Note the column families chosen by `rand_column_families` cannot be
|
||||
// dropped while the locks for `rand_keys` are held. So we should not have
|
||||
// to worry about accessing those column families throughout this function.
|
||||
assert(rand_column_families.size() == rand_keys.size());
|
||||
std::string checkpoint_dir =
|
||||
FLAGS_db + "/.checkpoint" + ToString(thread->tid);
|
||||
DestroyDB(checkpoint_dir, Options());
|
||||
Checkpoint* checkpoint = nullptr;
|
||||
Status s = Checkpoint::Create(db_, &checkpoint);
|
||||
if (s.ok()) {
|
||||
s = checkpoint->CreateCheckpoint(checkpoint_dir);
|
||||
}
|
||||
std::vector<ColumnFamilyHandle*> cf_handles;
|
||||
DB* checkpoint_db = nullptr;
|
||||
if (s.ok()) {
|
||||
delete checkpoint;
|
||||
checkpoint = nullptr;
|
||||
Options options(options_);
|
||||
options.listeners.clear();
|
||||
std::vector<ColumnFamilyDescriptor> cf_descs;
|
||||
// TODO(ajkr): `column_family_names_` is not safe to access here when
|
||||
// `clear_column_family_one_in != 0`. But we can't easily switch to
|
||||
// `ListColumnFamilies` to get names because it won't necessarily give
|
||||
// the same order as `column_family_names_`.
|
||||
if (FLAGS_clear_column_family_one_in == 0) {
|
||||
for (const auto& name : column_family_names_) {
|
||||
cf_descs.emplace_back(name, ColumnFamilyOptions(options));
|
||||
}
|
||||
s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
|
||||
&cf_handles, &checkpoint_db);
|
||||
}
|
||||
}
|
||||
if (checkpoint_db != nullptr) {
|
||||
for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
|
||||
std::string key_str = Key(rand_keys[i]);
|
||||
Slice key = key_str;
|
||||
std::string value;
|
||||
Status get_status = checkpoint_db->Get(
|
||||
ReadOptions(), cf_handles[rand_column_families[i]], key, &value);
|
||||
bool exists =
|
||||
thread->shared->Exists(rand_column_families[i], rand_keys[i]);
|
||||
if (get_status.ok()) {
|
||||
if (!exists) {
|
||||
s = Status::Corruption(
|
||||
"key exists in checkpoint but not in original db");
|
||||
}
|
||||
} else if (get_status.IsNotFound()) {
|
||||
if (exists) {
|
||||
s = Status::Corruption(
|
||||
"key exists in original db but not in checkpoint");
|
||||
}
|
||||
} else {
|
||||
s = get_status;
|
||||
}
|
||||
}
|
||||
for (auto cfh : cf_handles) {
|
||||
delete cfh;
|
||||
}
|
||||
cf_handles.clear();
|
||||
delete checkpoint_db;
|
||||
checkpoint_db = nullptr;
|
||||
}
|
||||
DestroyDB(checkpoint_dir, Options());
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "A checkpoint operation failed with: %s\n",
|
||||
s.ToString().c_str());
|
||||
}
|
||||
return s;
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
void VerificationAbort(SharedState* shared, std::string msg, Status s) const {
|
||||
@ -2316,6 +2482,8 @@ class StressTest {
|
||||
fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
|
||||
fprintf(stdout, "TransactionDB : %s\n",
|
||||
FLAGS_use_txn ? "true" : "false");
|
||||
fprintf(stdout, "Read only mode : %s\n",
|
||||
FLAGS_read_only ? "true" : "false");
|
||||
fprintf(stdout, "Atomic flush : %s\n",
|
||||
FLAGS_atomic_flush ? "true" : "false");
|
||||
fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
|
||||
@ -2585,8 +2753,13 @@ class StressTest {
|
||||
new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
|
||||
options_.create_missing_column_families = true;
|
||||
if (!FLAGS_use_txn) {
|
||||
if (db_preload_finished_.load() && FLAGS_read_only) {
|
||||
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, cf_descriptors,
|
||||
&column_families_, &db_);
|
||||
} else {
|
||||
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
|
||||
&column_families_, &db_);
|
||||
}
|
||||
} else {
|
||||
#ifndef ROCKSDB_LITE
|
||||
TransactionDBOptions txn_db_options;
|
||||
@ -2671,6 +2844,7 @@ class StressTest {
|
||||
int num_times_reopened_;
|
||||
std::unordered_map<std::string, std::vector<std::string>> options_table_;
|
||||
std::vector<std::string> options_index_;
|
||||
std::atomic<bool> db_preload_finished_;
|
||||
};
|
||||
|
||||
class NonBatchedOpsStressTest : public StressTest {
|
||||
@ -3791,6 +3965,18 @@ int main(int argc, char** argv) {
|
||||
if (FLAGS_test_atomic_flush) {
|
||||
FLAGS_atomic_flush = true;
|
||||
}
|
||||
if (FLAGS_read_only) {
|
||||
if (FLAGS_writepercent != 0 || FLAGS_delpercent != 0 ||
|
||||
FLAGS_delrangepercent != 0) {
|
||||
fprintf(stderr, "Error: updates are not supported in read only mode\n");
|
||||
exit(1);
|
||||
} else if (FLAGS_checkpoint_one_in > 0 &&
|
||||
FLAGS_clear_column_family_one_in > 0) {
|
||||
fprintf(stdout,
|
||||
"Warn: checkpoint won't be validated since column families may "
|
||||
"be dropped.\n");
|
||||
}
|
||||
}
|
||||
|
||||
// Choose a location for the test database if none given with --db=<path>
|
||||
if (FLAGS_db.empty()) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user