From ce8e88d2d7a62e2a08c4109aac84cb9e95ed359b Mon Sep 17 00:00:00 2001 From: Zhichao Cao Date: Tue, 22 Jan 2019 10:40:44 -0800 Subject: [PATCH] Generate mixed workload with Get, Put, Seek in db_bench (#4788) Summary: Based on the specific workload models (key access distribution, value size distribution, and iterator scan length distribution, the QPS variation), the MixGraph benchmark generate the synthetic workload according to these distributions which can reflect the real-world workload characteristics. After user enable the tracing function, they will get the trace file. By analyzing the trace file with the trace_analyzer tool, user can generate a set of statistic data files including. The *_accessed_key_stats.txt, *-accessed_value_size_distribution.txt, *-iterator_length_distribution.txt, and *-qps_stats.txt are mainly used to fit the Matlab model fitting. After that, user can get the parameters of the workload distributions (the modeling details are described: [here](https://github.com/facebook/rocksdb/wiki/RocksDB-Trace%2C-Replay%2C-and-Analyzer)) The key access distribution follows the The two-term power model. The probability density function is: `f(x) = ax^{b}+c`. The corresponding parameters are key_dist_a, key_dist_b, and key_dist_c in db_bench For the value size distribution and iterator scan length distribution, they both follow the Generalized Pareto Distribution. The probability density function is `f(x) = (1/sigma)(1+k*(x-theta)/sigma))^{-1-1/k)`. The parameters are: value_k, value_theta, value_sigma and iter_k, iter_theta, iter_sigma. For more information about the Generalized Pareto Distribution, users can find the [wiki](https://en.wikipedia.org/wiki/Generalized_Pareto_distribution) and [Matalb page](https://www.mathworks.com/help/stats/generalized-pareto-distribution.html) As for the QPS, it follows the diurnal pattern. So Sine is a good model to fit it. `F(x) = sine_a*sin(sine_b*x + sine_c) + sine_d`. The trace_will tell you the average QPS in the print out resutls, which is sine_d. After user fit the "*-qps_stats.txt" to the Matlab model, user can get the sine_a, sine_b, and sine_c. By using the 4 parameters, user can control the QPS variation including the period, average, changes. To use the bench mark, user can indicate the following parameters as examples: ``` -benchmarks="mixgraph" -key_dist_a=0.002312 -key_dist_b=0.3467 -value_k=0.9233 -value_sigma=226.4092 -iter_k=2.517 -iter_sigma=14.236 -mix_get_ratio=0.7 -mix_put_ratio=0.25 -mix_seek_ratio=0.05 -sine_mix_rate_interval_milliseconds=500 -sine_a=15000 -sine_b=1 -sine_d=20000 ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/4788 Differential Revision: D13573940 Pulled By: sagar0 fbshipit-source-id: e184c27e07b4f1bc0b436c2be36c5090c1fb0222 --- tools/db_bench_tool.cc | 293 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 293 insertions(+) diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 37bbcc485..f35378bf9 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -21,6 +21,7 @@ #endif #include #include +#include #include #include #include @@ -102,6 +103,7 @@ DEFINE_string( "compact," "compactall," "multireadrandom," + "mixgraph," "readseq," "readtocache," "readreverse," @@ -932,6 +934,52 @@ DEFINE_uint64( "If non-zero, db_bench will rate-limit the writes going into RocksDB. This " "is the global rate in bytes/second."); +// the parameters of mix_graph +DEFINE_double(key_dist_a, 0.0, + "The parameter 'a' of key access distribution model " + "f(x)=a*x^b"); +DEFINE_double(key_dist_b, 0.0, + "The parameter 'b' of key access distribution model " + "f(x)=a*x^b"); +DEFINE_double(value_theta, 0.0, + "The parameter 'theta' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(value_k, 0.0, + "The parameter 'k' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(value_sigma, 0.0, + "The parameter 'theta' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(iter_theta, 0.0, + "The parameter 'theta' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(iter_k, 0.0, + "The parameter 'k' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(iter_sigma, 0.0, + "The parameter 'sigma' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(mix_get_ratio, 1.0, + "The ratio of Get queries of mix_graph workload"); +DEFINE_double(mix_put_ratio, 0.0, + "The ratio of Put queries of mix_graph workload"); +DEFINE_double(mix_seek_ratio, 0.0, + "The ratio of Seek queries of mix_graph workload"); +DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator"); +DEFINE_int64(mix_ave_kv_size, 512, + "The average key-value size of this workload"); +DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload"); +DEFINE_double( + sine_mix_rate_noise, 0.0, + "Add the noise ratio to the sine rate, it is between 0.0 and 1.0"); +DEFINE_bool(sine_mix_rate, false, + "Enable the sine QPS control on the mix workload"); +DEFINE_uint64( + sine_mix_rate_interval_milliseconds, 10000, + "Interval of which the sine wave read_rate_limit is recalculated"); +DEFINE_int64(mix_accesses, -1, + "The total query accesses of mix_graph workload"); + DEFINE_uint64( benchmark_read_rate_limit, 0, "If non-zero, db_bench will rate-limit the reads from RocksDB. This " @@ -2627,6 +2675,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { fprintf(stderr, "entries_per_batch = %" PRIi64 "\n", entries_per_batch_); method = &Benchmark::MultiReadRandom; + } else if (name == "mixgraph") { + method = &Benchmark::MixGraph; } else if (name == "readmissing") { ++key_size_; method = &Benchmark::ReadRandom; @@ -4536,6 +4586,249 @@ void VerifyDBFromDB(std::string& truth_db_name) { thread->stats.AddMessage(msg); } + // THe reverse function of Pareto function + int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) { + double ret; + if (k == 0.0) { + ret = theta - sigma * std::log(u); + } else { + ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k; + } + return static_cast(ceil(ret)); + } + // inversion of y=ax^b + int64_t PowerCdfInversion(double u, double a, double b) { + double ret; + ret = std::pow((u / a), (1 / b)); + return static_cast(ceil(ret)); + } + + // Add the noice to the QPS + double AddNoise(double origin, double noise_ratio) { + if (noise_ratio < 0.0 || noise_ratio > 1.0) { + return origin; + } + int band_int = static_cast(FLAGS_sine_a); + double delta = (rand() % band_int - band_int / 2) * noise_ratio; + if (origin + delta < 0) { + return origin; + } else { + return (origin + delta); + } + } + + // decide the query type + // 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge + class QueryDecider { + public: + std::vector type_; + std::vector ratio_; + int range_; + + QueryDecider() {} + ~QueryDecider() {} + + Status Initiate(std::vector ratio_input) { + int range_max = 1000; + double sum = 0.0; + for (auto& ratio : ratio_input) { + sum += ratio; + } + range_ = 0; + for (auto& ratio : ratio_input) { + range_ += static_cast(ceil(range_max * (ratio / sum))); + type_.push_back(range_); + ratio_.push_back(ratio / sum); + } + return Status::OK(); + } + + int GetType(int64_t rand_num) { + if (rand_num < 0) { + rand_num = rand_num * (-1); + } + int pos = static_cast(rand_num % range_); + for (int i = 0; i < static_cast(type_.size()); i++) { + if (pos < type_[i]) { + return i; + } + } + return 0; + } + }; + + // The graph wokrload mixed with Get, Put, Iterator + void MixGraph(ThreadState* thread) { + int64_t read = 0; // including single gets and Next of iterators + int64_t gets = 0; + int64_t puts = 0; + int64_t found = 0; + int64_t seek = 0; + int64_t seek_found = 0; + int64_t bytes = 0; + int64_t value_max = FLAGS_mix_max_value_size; + int64_t scan_len_max = FLAGS_mix_max_scan_len; + double write_rate = 1000000.0; + double read_rate = 1000000.0; + std::vector ratio; + char value_buffer[2 * value_max]; + QueryDecider query; + RandomGenerator gen; + Status s; + + ReadOptions options(FLAGS_verify_checksum, true); + std::unique_ptr key_guard; + Slice key = AllocateKey(&key_guard); + PinnableSlice pinnable_val; + ratio.push_back(FLAGS_mix_get_ratio); + ratio.push_back(FLAGS_mix_put_ratio); + ratio.push_back(FLAGS_mix_seek_ratio); + query.Initiate(ratio); + + // the limit of qps initiation + if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) { + thread->shared->read_rate_limiter.reset(NewGenericRateLimiter( + read_rate, 100000 /* refill_period_us */, 10 /* fairness */, + RateLimiter::Mode::kReadsOnly)); + thread->shared->write_rate_limiter.reset( + NewGenericRateLimiter(write_rate)); + } + + Duration duration(FLAGS_duration, reads_); + while (!duration.Done(1)) { + DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread); + int64_t rand_v, key_rand, key_seed; + rand_v = GetRandomKey(&thread->rand) % FLAGS_num; + double u = static_cast(rand_v) / FLAGS_num; + key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b); + Random64 rand(key_seed); + key_rand = static_cast(rand.Next()) % FLAGS_num; + GenerateKeyFromInt(key_rand, FLAGS_num, &key); + int query_type = query.GetType(rand_v); + + // change the qps + uint64_t now = FLAGS_env->NowMicros(); + uint64_t usecs_since_last; + if (now > thread->stats.GetSineInterval()) { + usecs_since_last = now - thread->stats.GetSineInterval(); + } else { + usecs_since_last = 0; + } + + if (usecs_since_last > + (FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) { + double usecs_since_start = + static_cast(now - thread->stats.GetStart()); + thread->stats.ResetSineInterval(); + double mix_rate_with_noise = AddNoise( + SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise); + read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]); + write_rate = + mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size; + + thread->shared->write_rate_limiter.reset( + NewGenericRateLimiter(write_rate)); + thread->shared->read_rate_limiter.reset(NewGenericRateLimiter( + read_rate, + FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10, + RateLimiter::Mode::kReadsOnly)); + } + // Start the query + if (query_type == 0) { + // the Get query + gets++; + read++; + if (FLAGS_num_column_families > 1) { + s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key, + &pinnable_val); + } else { + pinnable_val.Reset(); + s = db_with_cfh->db->Get(options, + db_with_cfh->db->DefaultColumnFamily(), key, + &pinnable_val); + } + + if (s.ok()) { + found++; + bytes += key.size() + pinnable_val.size(); + } else if (!s.IsNotFound()) { + fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); + abort(); + } + + if (thread->shared->read_rate_limiter.get() != nullptr && + read % 256 == 255) { + thread->shared->read_rate_limiter->Request( + 256, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kRead); + } + + } else if (query_type == 1) { + // the Put query + puts++; + int64_t value_size = ParetoCdfInversion( + u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma); + if (value_size < 0) { + value_size = 10; + } else if (value_size > value_max) { + value_size = value_size % value_max; + } + s = db_with_cfh->db->Put(write_options_, key, gen.Generate(value_size)); + if (!s.ok()) { + fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + exit(1); + } + + if (thread->shared->write_rate_limiter) { + thread->shared->write_rate_limiter->Request( + key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/, + RateLimiter::OpType::kWrite); + } + + } else if (query_type == 2) { + // Seek query + if (db_with_cfh->db != nullptr) { + Iterator* single_iter = nullptr; + single_iter = db_with_cfh->db->NewIterator(options); + if (single_iter != nullptr) { + single_iter->Seek(key); + seek++; + read++; + if (single_iter->Valid() && single_iter->key().compare(key) == 0) { + seek_found++; + } + int64_t scan_length = + ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k, + FLAGS_iter_sigma) % + scan_len_max; + for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) { + Slice value = single_iter->value(); + memcpy(value_buffer, value.data(), + std::min(value.size(), sizeof(value_buffer))); + bytes += single_iter->key().size() + single_iter->value().size(); + single_iter->Next(); + assert(single_iter->status().ok()); + } + } + delete single_iter; + } + } + } + char msg[100]; + snprintf(msg, sizeof(msg), + "( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64 " of %" PRIu64 + " in %" PRIu64 " found)\n", + gets, puts, seek, found, read); + + thread->stats.AddBytes(bytes); + thread->stats.AddMessage(msg); + + if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) { + thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") + + get_perf_context()->ToString()); + } + } + void IteratorCreation(ThreadState* thread) { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true);