Add secondary instance to stress test (#5479)
Summary: This PR allows users to run stress tests on secondary instance. Test plan (on devserver) ``` ./db_stress -ops_per_thread=100000 -enable_secondary=true -threads=32 -secondary_catch_up_one_in=10000 -clear_column_family_one_in=1000 -reopen=100 ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/5479 Differential Revision: D16074325 Pulled By: riversand963 fbshipit-source-id: c0ed959e7b6c7cda3efd0b3070ab379de3b29f1c
This commit is contained in:
parent
7259e28d91
commit
c360675750
@ -333,6 +333,11 @@ DEFINE_bool(use_block_based_filter, false, "use block based filter"
|
|||||||
|
|
||||||
DEFINE_string(db, "", "Use the db with the following name.");
|
DEFINE_string(db, "", "Use the db with the following name.");
|
||||||
|
|
||||||
|
DEFINE_string(secondaries_base, "",
|
||||||
|
"Use this path as the base path for secondary instances.");
|
||||||
|
|
||||||
|
DEFINE_bool(enable_secondary, false, "Enable secondary instance.");
|
||||||
|
|
||||||
DEFINE_string(
|
DEFINE_string(
|
||||||
expected_values_path, "",
|
expected_values_path, "",
|
||||||
"File where the array of expected uint32_t values will be stored. If "
|
"File where the array of expected uint32_t values will be stored. If "
|
||||||
@ -599,6 +604,13 @@ DEFINE_uint64(max_manifest_file_size, 16384, "Maximum size of a MANIFEST file");
|
|||||||
|
|
||||||
DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable");
|
DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable");
|
||||||
|
|
||||||
|
DEFINE_int32(secondary_catch_up_one_in, 0,
|
||||||
|
"If non-zero, the secondaries attemp to catch up with the primary "
|
||||||
|
"once for every N operations on average. 0 indicates the "
|
||||||
|
"secondaries do not try to catch up after open.");
|
||||||
|
|
||||||
|
static std::shared_ptr<rocksdb::Statistics> dbstats_secondaries;
|
||||||
|
|
||||||
enum RepFactory {
|
enum RepFactory {
|
||||||
kSkipList,
|
kSkipList,
|
||||||
kHashSkipList,
|
kHashSkipList,
|
||||||
@ -1423,6 +1435,17 @@ class StressTest {
|
|||||||
}
|
}
|
||||||
column_families_.clear();
|
column_families_.clear();
|
||||||
delete db_;
|
delete db_;
|
||||||
|
|
||||||
|
assert(secondaries_.size() == secondary_cfh_lists_.size());
|
||||||
|
size_t n = secondaries_.size();
|
||||||
|
for (size_t i = 0; i != n; ++i) {
|
||||||
|
for (auto* cf : secondary_cfh_lists_[i]) {
|
||||||
|
delete cf;
|
||||||
|
}
|
||||||
|
secondary_cfh_lists_[i].clear();
|
||||||
|
delete secondaries_[i];
|
||||||
|
}
|
||||||
|
secondaries_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<Cache> NewCache(size_t capacity) {
|
std::shared_ptr<Cache> NewCache(size_t capacity) {
|
||||||
@ -1620,6 +1643,60 @@ class StressTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
if (FLAGS_enable_secondary) {
|
||||||
|
now = FLAGS_env->NowMicros();
|
||||||
|
fprintf(stdout, "%s Start to verify secondaries against primary\n",
|
||||||
|
FLAGS_env->TimeToString(static_cast<uint64_t>(now) / 1000000)
|
||||||
|
.c_str());
|
||||||
|
}
|
||||||
|
for (size_t k = 0; k != secondaries_.size(); ++k) {
|
||||||
|
Status s = secondaries_[k]->TryCatchUpWithPrimary();
|
||||||
|
if (!s.ok()) {
|
||||||
|
fprintf(stderr, "Secondary failed to catch up with primary\n");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ReadOptions ropts;
|
||||||
|
ropts.total_order_seek = true;
|
||||||
|
// Verify only the default column family since the primary may have
|
||||||
|
// dropped other column families after most recent reopen.
|
||||||
|
std::unique_ptr<Iterator> iter1(db_->NewIterator(ropts));
|
||||||
|
std::unique_ptr<Iterator> iter2(secondaries_[k]->NewIterator(ropts));
|
||||||
|
for (iter1->SeekToFirst(), iter2->SeekToFirst();
|
||||||
|
iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
|
||||||
|
if (iter1->key().compare(iter2->key()) != 0 ||
|
||||||
|
iter1->value().compare(iter2->value())) {
|
||||||
|
fprintf(stderr,
|
||||||
|
"Secondary %d contains different data from "
|
||||||
|
"primary.\nPrimary: %s : %s\nSecondary: %s : %s\n",
|
||||||
|
static_cast<int>(k),
|
||||||
|
iter1->key().ToString(/*hex=*/true).c_str(),
|
||||||
|
iter1->value().ToString(/*hex=*/true).c_str(),
|
||||||
|
iter2->key().ToString(/*hex=*/true).c_str(),
|
||||||
|
iter2->value().ToString(/*hex=*/true).c_str());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (iter1->Valid() && !iter2->Valid()) {
|
||||||
|
fprintf(stderr,
|
||||||
|
"Secondary %d record count is smaller than that of primary\n",
|
||||||
|
static_cast<int>(k));
|
||||||
|
return false;
|
||||||
|
} else if (!iter1->Valid() && iter2->Valid()) {
|
||||||
|
fprintf(stderr,
|
||||||
|
"Secondary %d record count is larger than that of primary\n",
|
||||||
|
static_cast<int>(k));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (FLAGS_enable_secondary) {
|
||||||
|
now = FLAGS_env->NowMicros();
|
||||||
|
fprintf(stdout, "%s Verification of secondaries succeeded\n",
|
||||||
|
FLAGS_env->TimeToString(static_cast<uint64_t>(now) / 1000000)
|
||||||
|
.c_str());
|
||||||
|
}
|
||||||
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
if (shared.HasVerificationFailedYet()) {
|
if (shared.HasVerificationFailedYet()) {
|
||||||
printf("Verification failed :(\n");
|
printf("Verification failed :(\n");
|
||||||
return false;
|
return false;
|
||||||
@ -2231,6 +2308,19 @@ class StressTest {
|
|||||||
TestIterate(thread, read_opts, rand_column_families, rand_keys);
|
TestIterate(thread, read_opts, rand_column_families, rand_keys);
|
||||||
}
|
}
|
||||||
thread->stats.FinishedSingleOp();
|
thread->stats.FinishedSingleOp();
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
uint32_t tid = thread->tid;
|
||||||
|
assert(secondaries_.empty() ||
|
||||||
|
static_cast<size_t>(tid) < secondaries_.size());
|
||||||
|
if (FLAGS_secondary_catch_up_one_in > 0 &&
|
||||||
|
thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) {
|
||||||
|
Status s = secondaries_[tid]->TryCatchUpWithPrimary();
|
||||||
|
if (!s.ok()) {
|
||||||
|
VerificationAbort(shared, "Secondary instance failed to catch up", s);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
thread->stats.Stop();
|
thread->stats.Stop();
|
||||||
@ -2864,11 +2954,52 @@ class StressTest {
|
|||||||
}
|
}
|
||||||
assert(!s.ok() || column_families_.size() ==
|
assert(!s.ok() || column_families_.size() ==
|
||||||
static_cast<size_t>(FLAGS_column_families));
|
static_cast<size_t>(FLAGS_column_families));
|
||||||
|
|
||||||
|
if (FLAGS_enable_secondary) {
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
secondaries_.resize(FLAGS_threads);
|
||||||
|
std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
|
||||||
|
secondary_cfh_lists_.clear();
|
||||||
|
secondary_cfh_lists_.resize(FLAGS_threads);
|
||||||
|
Options tmp_opts;
|
||||||
|
tmp_opts.max_open_files = FLAGS_open_files;
|
||||||
|
tmp_opts.statistics = dbstats_secondaries;
|
||||||
|
tmp_opts.env = FLAGS_env;
|
||||||
|
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
|
||||||
|
const std::string secondary_path =
|
||||||
|
FLAGS_secondaries_base + "/" + std::to_string(i);
|
||||||
|
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
|
||||||
|
cf_descriptors, &secondary_cfh_lists_[i],
|
||||||
|
&secondaries_[i]);
|
||||||
|
if (!s.ok()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
|
||||||
|
exit(1);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
DBWithTTL* db_with_ttl;
|
DBWithTTL* db_with_ttl;
|
||||||
s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
|
s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
|
||||||
db_ = db_with_ttl;
|
db_ = db_with_ttl;
|
||||||
|
if (FLAGS_enable_secondary) {
|
||||||
|
secondaries_.resize(FLAGS_threads);
|
||||||
|
std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
|
||||||
|
Options tmp_opts;
|
||||||
|
tmp_opts.max_open_files = FLAGS_open_files;
|
||||||
|
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
|
||||||
|
const std::string secondary_path =
|
||||||
|
FLAGS_secondaries_base + "/" + std::to_string(i);
|
||||||
|
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
|
||||||
|
&secondaries_[i]);
|
||||||
|
if (!s.ok()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
#else
|
#else
|
||||||
fprintf(stderr, "TTL is not supported in RocksDBLite\n");
|
fprintf(stderr, "TTL is not supported in RocksDBLite\n");
|
||||||
exit(1);
|
exit(1);
|
||||||
@ -2891,6 +3022,17 @@ class StressTest {
|
|||||||
txn_db_ = nullptr;
|
txn_db_ = nullptr;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
assert(secondaries_.size() == secondary_cfh_lists_.size());
|
||||||
|
size_t n = secondaries_.size();
|
||||||
|
for (size_t i = 0; i != n; ++i) {
|
||||||
|
for (auto* cf : secondary_cfh_lists_[i]) {
|
||||||
|
delete cf;
|
||||||
|
}
|
||||||
|
secondary_cfh_lists_[i].clear();
|
||||||
|
delete secondaries_[i];
|
||||||
|
}
|
||||||
|
secondaries_.clear();
|
||||||
|
|
||||||
num_times_reopened_++;
|
num_times_reopened_++;
|
||||||
auto now = FLAGS_env->NowMicros();
|
auto now = FLAGS_env->NowMicros();
|
||||||
fprintf(stdout, "%s Reopening database for the %dth time\n",
|
fprintf(stdout, "%s Reopening database for the %dth time\n",
|
||||||
@ -2903,6 +3045,10 @@ class StressTest {
|
|||||||
if (dbstats) {
|
if (dbstats) {
|
||||||
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
|
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
|
||||||
}
|
}
|
||||||
|
if (dbstats_secondaries) {
|
||||||
|
fprintf(stdout, "Secondary instances STATISTICS:\n%s\n",
|
||||||
|
dbstats_secondaries->ToString().c_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<Cache> cache_;
|
std::shared_ptr<Cache> cache_;
|
||||||
@ -2920,6 +3066,10 @@ class StressTest {
|
|||||||
std::unordered_map<std::string, std::vector<std::string>> options_table_;
|
std::unordered_map<std::string, std::vector<std::string>> options_table_;
|
||||||
std::vector<std::string> options_index_;
|
std::vector<std::string> options_index_;
|
||||||
std::atomic<bool> db_preload_finished_;
|
std::atomic<bool> db_preload_finished_;
|
||||||
|
|
||||||
|
// Fields used for stress-testing secondary instance in the same process
|
||||||
|
std::vector<DB*> secondaries_;
|
||||||
|
std::vector<std::vector<ColumnFamilyHandle*> > secondary_cfh_lists_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class NonBatchedOpsStressTest : public StressTest {
|
class NonBatchedOpsStressTest : public StressTest {
|
||||||
@ -4153,6 +4303,9 @@ int main(int argc, char** argv) {
|
|||||||
|
|
||||||
if (FLAGS_statistics) {
|
if (FLAGS_statistics) {
|
||||||
dbstats = rocksdb::CreateDBStatistics();
|
dbstats = rocksdb::CreateDBStatistics();
|
||||||
|
if (FLAGS_enable_secondary) {
|
||||||
|
dbstats_secondaries = rocksdb::CreateDBStatistics();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
FLAGS_compression_type_e =
|
FLAGS_compression_type_e =
|
||||||
StringToCompressionType(FLAGS_compression_type.c_str());
|
StringToCompressionType(FLAGS_compression_type.c_str());
|
||||||
@ -4261,6 +4414,24 @@ int main(int argc, char** argv) {
|
|||||||
FLAGS_db = default_db_path;
|
FLAGS_db = default_db_path;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (FLAGS_enable_secondary && FLAGS_secondaries_base.empty()) {
|
||||||
|
std::string default_secondaries_path;
|
||||||
|
FLAGS_env->GetTestDirectory(&default_secondaries_path);
|
||||||
|
default_secondaries_path += "/dbstress_secondaries";
|
||||||
|
rocksdb::Status s = FLAGS_env->CreateDirIfMissing(default_secondaries_path);
|
||||||
|
if (!s.ok()) {
|
||||||
|
fprintf(stderr, "Failed to create directory %s: %s\n",
|
||||||
|
default_secondaries_path.c_str(), s.ToString().c_str());
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
FLAGS_secondaries_base = default_secondaries_path;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!FLAGS_enable_secondary && FLAGS_secondary_catch_up_one_in > 0) {
|
||||||
|
fprintf(stderr, "Secondary instance is disabled.\n");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
rocksdb_kill_odds = FLAGS_kill_random_test;
|
rocksdb_kill_odds = FLAGS_kill_random_test;
|
||||||
rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
|
rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user