From c36067575037573a1ee3980bf8c27a93b4cf0694 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 1 Jul 2019 11:45:12 -0700 Subject: [PATCH] Add secondary instance to stress test (#5479) Summary: This PR allows users to run stress tests on secondary instance. Test plan (on devserver) ``` ./db_stress -ops_per_thread=100000 -enable_secondary=true -threads=32 -secondary_catch_up_one_in=10000 -clear_column_family_one_in=1000 -reopen=100 ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/5479 Differential Revision: D16074325 Pulled By: riversand963 fbshipit-source-id: c0ed959e7b6c7cda3efd0b3070ab379de3b29f1c --- tools/db_stress.cc | 171 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 6a3e8bdef..813f80682 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -333,6 +333,11 @@ DEFINE_bool(use_block_based_filter, false, "use block based filter" DEFINE_string(db, "", "Use the db with the following name."); +DEFINE_string(secondaries_base, "", + "Use this path as the base path for secondary instances."); + +DEFINE_bool(enable_secondary, false, "Enable secondary instance."); + DEFINE_string( expected_values_path, "", "File where the array of expected uint32_t values will be stored. If " @@ -599,6 +604,13 @@ DEFINE_uint64(max_manifest_file_size, 16384, "Maximum size of a MANIFEST file"); DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable"); +DEFINE_int32(secondary_catch_up_one_in, 0, + "If non-zero, the secondaries attemp to catch up with the primary " + "once for every N operations on average. 0 indicates the " + "secondaries do not try to catch up after open."); + +static std::shared_ptr dbstats_secondaries; + enum RepFactory { kSkipList, kHashSkipList, @@ -1423,6 +1435,17 @@ class StressTest { } column_families_.clear(); delete db_; + + assert(secondaries_.size() == secondary_cfh_lists_.size()); + size_t n = secondaries_.size(); + for (size_t i = 0; i != n; ++i) { + for (auto* cf : secondary_cfh_lists_[i]) { + delete cf; + } + secondary_cfh_lists_[i].clear(); + delete secondaries_[i]; + } + secondaries_.clear(); } std::shared_ptr NewCache(size_t capacity) { @@ -1620,6 +1643,60 @@ class StressTest { } } +#ifndef ROCKSDB_LITE + if (FLAGS_enable_secondary) { + now = FLAGS_env->NowMicros(); + fprintf(stdout, "%s Start to verify secondaries against primary\n", + FLAGS_env->TimeToString(static_cast(now) / 1000000) + .c_str()); + } + for (size_t k = 0; k != secondaries_.size(); ++k) { + Status s = secondaries_[k]->TryCatchUpWithPrimary(); + if (!s.ok()) { + fprintf(stderr, "Secondary failed to catch up with primary\n"); + return false; + } + ReadOptions ropts; + ropts.total_order_seek = true; + // Verify only the default column family since the primary may have + // dropped other column families after most recent reopen. + std::unique_ptr iter1(db_->NewIterator(ropts)); + std::unique_ptr iter2(secondaries_[k]->NewIterator(ropts)); + for (iter1->SeekToFirst(), iter2->SeekToFirst(); + iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) { + if (iter1->key().compare(iter2->key()) != 0 || + iter1->value().compare(iter2->value())) { + fprintf(stderr, + "Secondary %d contains different data from " + "primary.\nPrimary: %s : %s\nSecondary: %s : %s\n", + static_cast(k), + iter1->key().ToString(/*hex=*/true).c_str(), + iter1->value().ToString(/*hex=*/true).c_str(), + iter2->key().ToString(/*hex=*/true).c_str(), + iter2->value().ToString(/*hex=*/true).c_str()); + return false; + } + } + if (iter1->Valid() && !iter2->Valid()) { + fprintf(stderr, + "Secondary %d record count is smaller than that of primary\n", + static_cast(k)); + return false; + } else if (!iter1->Valid() && iter2->Valid()) { + fprintf(stderr, + "Secondary %d record count is larger than that of primary\n", + static_cast(k)); + return false; + } + } + if (FLAGS_enable_secondary) { + now = FLAGS_env->NowMicros(); + fprintf(stdout, "%s Verification of secondaries succeeded\n", + FLAGS_env->TimeToString(static_cast(now) / 1000000) + .c_str()); + } +#endif // ROCKSDB_LITE + if (shared.HasVerificationFailedYet()) { printf("Verification failed :(\n"); return false; @@ -2231,6 +2308,19 @@ class StressTest { TestIterate(thread, read_opts, rand_column_families, rand_keys); } thread->stats.FinishedSingleOp(); +#ifndef ROCKSDB_LITE + uint32_t tid = thread->tid; + assert(secondaries_.empty() || + static_cast(tid) < secondaries_.size()); + if (FLAGS_secondary_catch_up_one_in > 0 && + thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) { + Status s = secondaries_[tid]->TryCatchUpWithPrimary(); + if (!s.ok()) { + VerificationAbort(shared, "Secondary instance failed to catch up", s); + break; + } + } +#endif } thread->stats.Stop(); @@ -2864,11 +2954,52 @@ class StressTest { } assert(!s.ok() || column_families_.size() == static_cast(FLAGS_column_families)); + + if (FLAGS_enable_secondary) { +#ifndef ROCKSDB_LITE + secondaries_.resize(FLAGS_threads); + std::fill(secondaries_.begin(), secondaries_.end(), nullptr); + secondary_cfh_lists_.clear(); + secondary_cfh_lists_.resize(FLAGS_threads); + Options tmp_opts; + tmp_opts.max_open_files = FLAGS_open_files; + tmp_opts.statistics = dbstats_secondaries; + tmp_opts.env = FLAGS_env; + for (size_t i = 0; i != static_cast(FLAGS_threads); ++i) { + const std::string secondary_path = + FLAGS_secondaries_base + "/" + std::to_string(i); + s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, + cf_descriptors, &secondary_cfh_lists_[i], + &secondaries_[i]); + if (!s.ok()) { + break; + } + } +#else + fprintf(stderr, "Secondary is not supported in RocksDBLite\n"); + exit(1); +#endif + } } else { #ifndef ROCKSDB_LITE DBWithTTL* db_with_ttl; s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); db_ = db_with_ttl; + if (FLAGS_enable_secondary) { + secondaries_.resize(FLAGS_threads); + std::fill(secondaries_.begin(), secondaries_.end(), nullptr); + Options tmp_opts; + tmp_opts.max_open_files = FLAGS_open_files; + for (size_t i = 0; i != static_cast(FLAGS_threads); ++i) { + const std::string secondary_path = + FLAGS_secondaries_base + "/" + std::to_string(i); + s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, + &secondaries_[i]); + if (!s.ok()) { + break; + } + } + } #else fprintf(stderr, "TTL is not supported in RocksDBLite\n"); exit(1); @@ -2891,6 +3022,17 @@ class StressTest { txn_db_ = nullptr; #endif + assert(secondaries_.size() == secondary_cfh_lists_.size()); + size_t n = secondaries_.size(); + for (size_t i = 0; i != n; ++i) { + for (auto* cf : secondary_cfh_lists_[i]) { + delete cf; + } + secondary_cfh_lists_[i].clear(); + delete secondaries_[i]; + } + secondaries_.clear(); + num_times_reopened_++; auto now = FLAGS_env->NowMicros(); fprintf(stdout, "%s Reopening database for the %dth time\n", @@ -2903,6 +3045,10 @@ class StressTest { if (dbstats) { fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); } + if (dbstats_secondaries) { + fprintf(stdout, "Secondary instances STATISTICS:\n%s\n", + dbstats_secondaries->ToString().c_str()); + } } std::shared_ptr cache_; @@ -2920,6 +3066,10 @@ class StressTest { std::unordered_map> options_table_; std::vector options_index_; std::atomic db_preload_finished_; + + // Fields used for stress-testing secondary instance in the same process + std::vector secondaries_; + std::vector > secondary_cfh_lists_; }; class NonBatchedOpsStressTest : public StressTest { @@ -4153,6 +4303,9 @@ int main(int argc, char** argv) { if (FLAGS_statistics) { dbstats = rocksdb::CreateDBStatistics(); + if (FLAGS_enable_secondary) { + dbstats_secondaries = rocksdb::CreateDBStatistics(); + } } FLAGS_compression_type_e = StringToCompressionType(FLAGS_compression_type.c_str()); @@ -4261,6 +4414,24 @@ int main(int argc, char** argv) { FLAGS_db = default_db_path; } + if (FLAGS_enable_secondary && FLAGS_secondaries_base.empty()) { + std::string default_secondaries_path; + FLAGS_env->GetTestDirectory(&default_secondaries_path); + default_secondaries_path += "/dbstress_secondaries"; + rocksdb::Status s = FLAGS_env->CreateDirIfMissing(default_secondaries_path); + if (!s.ok()) { + fprintf(stderr, "Failed to create directory %s: %s\n", + default_secondaries_path.c_str(), s.ToString().c_str()); + exit(1); + } + FLAGS_secondaries_base = default_secondaries_path; + } + + if (!FLAGS_enable_secondary && FLAGS_secondary_catch_up_one_in > 0) { + fprintf(stderr, "Secondary instance is disabled.\n"); + exit(1); + } + rocksdb_kill_odds = FLAGS_kill_random_test; rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);