From 7bd725e962d41ac4476c37e93b885da08bdc206a Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Mon, 19 Dec 2016 12:25:35 -0800 Subject: [PATCH] db_bench: introduce --benchmark_read_rate_limit Summary: Add the parameter in db_bench to help users to measure latency histogram with constant read rate. Closes https://github.com/facebook/rocksdb/pull/1683 Differential Revision: D4341387 Pulled By: siying fbshipit-source-id: 1b4b276 --- tools/db_bench_tool.cc | 48 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index c61ffac7e..d0c8fb0af 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -752,6 +752,11 @@ DEFINE_uint64( "If non-zero, db_bench will rate-limit the writes going into RocksDB. This " "is the global rate in bytes/second."); +DEFINE_uint64( + benchmark_read_rate_limit, 0, + "If non-zero, db_bench will rate-limit the reads from RocksDB. This " + "is the global rate in ops/second."); + DEFINE_uint64(max_compaction_bytes, rocksdb::Options().max_compaction_bytes, "Max bytes allowed in one compaction"); @@ -1667,6 +1672,7 @@ struct SharedState { int total; int perf_level; std::shared_ptr write_rate_limiter; + std::shared_ptr read_rate_limiter; // Each thread goes through the following states: // (1) initializing @@ -1821,6 +1827,8 @@ class Benchmark { / 1048576.0)); fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n", FLAGS_benchmark_write_rate_limit); + fprintf(stdout, "Read rate: %" PRIu64 " ops/second\n", + FLAGS_benchmark_read_rate_limit); if (FLAGS_enable_numa) { fprintf(stderr, "Running in NUMA enabled mode.\n"); #ifndef NUMA @@ -2478,6 +2486,10 @@ class Benchmark { shared.write_rate_limiter.reset( NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit)); } + if (FLAGS_benchmark_read_rate_limit > 0) { + shared.read_rate_limiter.reset( + NewGenericRateLimiter(FLAGS_benchmark_read_rate_limit)); + } std::unique_ptr reporter_agent; if (FLAGS_report_interval_seconds > 0) { @@ -3629,7 +3641,13 @@ class Benchmark { bytes += iter->key().size() + iter->value().size(); thread->stats.FinishedOps(nullptr, db, 1, kRead); ++i; + + if (thread->shared->read_rate_limiter.get() != nullptr && + i % 1024 == 1023) { + thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH); + } } + delete iter; thread->stats.AddBytes(bytes); if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) { @@ -3655,6 +3673,10 @@ class Benchmark { bytes += iter->key().size() + iter->value().size(); thread->stats.FinishedOps(nullptr, db, 1, kRead); ++i; + if (thread->shared->read_rate_limiter.get() != nullptr && + i % 1024 == 1023) { + thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH); + } } delete iter; thread->stats.AddBytes(bytes); @@ -3693,6 +3715,10 @@ class Benchmark { ++nonexist; } } + if (thread->shared->read_rate_limiter.get() != nullptr) { + thread->shared->read_rate_limiter->Request(100, Env::IO_HIGH); + } + thread->stats.FinishedOps(nullptr, db, 100, kRead); } while (!duration.Done(100)); @@ -3761,6 +3787,12 @@ class Benchmark { fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); abort(); } + + if (thread->shared->read_rate_limiter.get() != nullptr && + read % 256 == 255) { + thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH); + } + thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead); } @@ -3780,6 +3812,7 @@ class Benchmark { // Returns the total number of keys found. void MultiReadRandom(ThreadState* thread) { int64_t read = 0; + int64_t num_multireads = 0; int64_t found = 0; ReadOptions options(FLAGS_verify_checksum, true); std::vector keys; @@ -3800,6 +3833,7 @@ class Benchmark { assert(static_cast(statuses.size()) == entries_per_batch_); read += entries_per_batch_; + num_multireads++; for (int64_t i = 0; i < entries_per_batch_; ++i) { if (statuses[i].ok()) { ++found; @@ -3809,6 +3843,11 @@ class Benchmark { abort(); } } + if (thread->shared->read_rate_limiter.get() != nullptr && + num_multireads % 256 == 255) { + thread->shared->read_rate_limiter->Request(256 * entries_per_batch_, + Env::IO_HIGH); + } thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead); } @@ -3902,6 +3941,11 @@ class Benchmark { assert(iter_to_use->status().ok()); } + if (thread->shared->read_rate_limiter.get() != nullptr && + read % 256 == 255) { + thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH); + } + thread->stats.FinishedOps(&db_, db_.db, 1, kSeek); } delete single_iter; @@ -4699,6 +4743,10 @@ class Benchmark { } } found += key_found; + + if (thread->shared->read_rate_limiter.get() != nullptr) { + thread->shared->read_rate_limiter->Request(1, Env::IO_HIGH); + } } delete iter;