db_bench: introduce --benchmark_read_rate_limit
Summary: Add the parameter in db_bench to help users to measure latency histogram with constant read rate. Closes https://github.com/facebook/rocksdb/pull/1683 Differential Revision: D4341387 Pulled By: siying fbshipit-source-id: 1b4b276
This commit is contained in:
parent
296691847d
commit
7bd725e962
@ -752,6 +752,11 @@ DEFINE_uint64(
|
||||
"If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
|
||||
"is the global rate in bytes/second.");
|
||||
|
||||
DEFINE_uint64(
|
||||
benchmark_read_rate_limit, 0,
|
||||
"If non-zero, db_bench will rate-limit the reads from RocksDB. This "
|
||||
"is the global rate in ops/second.");
|
||||
|
||||
DEFINE_uint64(max_compaction_bytes, rocksdb::Options().max_compaction_bytes,
|
||||
"Max bytes allowed in one compaction");
|
||||
|
||||
@ -1667,6 +1672,7 @@ struct SharedState {
|
||||
int total;
|
||||
int perf_level;
|
||||
std::shared_ptr<RateLimiter> write_rate_limiter;
|
||||
std::shared_ptr<RateLimiter> read_rate_limiter;
|
||||
|
||||
// Each thread goes through the following states:
|
||||
// (1) initializing
|
||||
@ -1821,6 +1827,8 @@ class Benchmark {
|
||||
/ 1048576.0));
|
||||
fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n",
|
||||
FLAGS_benchmark_write_rate_limit);
|
||||
fprintf(stdout, "Read rate: %" PRIu64 " ops/second\n",
|
||||
FLAGS_benchmark_read_rate_limit);
|
||||
if (FLAGS_enable_numa) {
|
||||
fprintf(stderr, "Running in NUMA enabled mode.\n");
|
||||
#ifndef NUMA
|
||||
@ -2478,6 +2486,10 @@ class Benchmark {
|
||||
shared.write_rate_limiter.reset(
|
||||
NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
|
||||
}
|
||||
if (FLAGS_benchmark_read_rate_limit > 0) {
|
||||
shared.read_rate_limiter.reset(
|
||||
NewGenericRateLimiter(FLAGS_benchmark_read_rate_limit));
|
||||
}
|
||||
|
||||
std::unique_ptr<ReporterAgent> reporter_agent;
|
||||
if (FLAGS_report_interval_seconds > 0) {
|
||||
@ -3629,7 +3641,13 @@ class Benchmark {
|
||||
bytes += iter->key().size() + iter->value().size();
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kRead);
|
||||
++i;
|
||||
|
||||
if (thread->shared->read_rate_limiter.get() != nullptr &&
|
||||
i % 1024 == 1023) {
|
||||
thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH);
|
||||
}
|
||||
}
|
||||
|
||||
delete iter;
|
||||
thread->stats.AddBytes(bytes);
|
||||
if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
|
||||
@ -3655,6 +3673,10 @@ class Benchmark {
|
||||
bytes += iter->key().size() + iter->value().size();
|
||||
thread->stats.FinishedOps(nullptr, db, 1, kRead);
|
||||
++i;
|
||||
if (thread->shared->read_rate_limiter.get() != nullptr &&
|
||||
i % 1024 == 1023) {
|
||||
thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH);
|
||||
}
|
||||
}
|
||||
delete iter;
|
||||
thread->stats.AddBytes(bytes);
|
||||
@ -3693,6 +3715,10 @@ class Benchmark {
|
||||
++nonexist;
|
||||
}
|
||||
}
|
||||
if (thread->shared->read_rate_limiter.get() != nullptr) {
|
||||
thread->shared->read_rate_limiter->Request(100, Env::IO_HIGH);
|
||||
}
|
||||
|
||||
thread->stats.FinishedOps(nullptr, db, 100, kRead);
|
||||
} while (!duration.Done(100));
|
||||
|
||||
@ -3761,6 +3787,12 @@ class Benchmark {
|
||||
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
|
||||
abort();
|
||||
}
|
||||
|
||||
if (thread->shared->read_rate_limiter.get() != nullptr &&
|
||||
read % 256 == 255) {
|
||||
thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH);
|
||||
}
|
||||
|
||||
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
|
||||
}
|
||||
|
||||
@ -3780,6 +3812,7 @@ class Benchmark {
|
||||
// Returns the total number of keys found.
|
||||
void MultiReadRandom(ThreadState* thread) {
|
||||
int64_t read = 0;
|
||||
int64_t num_multireads = 0;
|
||||
int64_t found = 0;
|
||||
ReadOptions options(FLAGS_verify_checksum, true);
|
||||
std::vector<Slice> keys;
|
||||
@ -3800,6 +3833,7 @@ class Benchmark {
|
||||
assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
|
||||
|
||||
read += entries_per_batch_;
|
||||
num_multireads++;
|
||||
for (int64_t i = 0; i < entries_per_batch_; ++i) {
|
||||
if (statuses[i].ok()) {
|
||||
++found;
|
||||
@ -3809,6 +3843,11 @@ class Benchmark {
|
||||
abort();
|
||||
}
|
||||
}
|
||||
if (thread->shared->read_rate_limiter.get() != nullptr &&
|
||||
num_multireads % 256 == 255) {
|
||||
thread->shared->read_rate_limiter->Request(256 * entries_per_batch_,
|
||||
Env::IO_HIGH);
|
||||
}
|
||||
thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead);
|
||||
}
|
||||
|
||||
@ -3902,6 +3941,11 @@ class Benchmark {
|
||||
assert(iter_to_use->status().ok());
|
||||
}
|
||||
|
||||
if (thread->shared->read_rate_limiter.get() != nullptr &&
|
||||
read % 256 == 255) {
|
||||
thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH);
|
||||
}
|
||||
|
||||
thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
|
||||
}
|
||||
delete single_iter;
|
||||
@ -4699,6 +4743,10 @@ class Benchmark {
|
||||
}
|
||||
}
|
||||
found += key_found;
|
||||
|
||||
if (thread->shared->read_rate_limiter.get() != nullptr) {
|
||||
thread->shared->read_rate_limiter->Request(1, Env::IO_HIGH);
|
||||
}
|
||||
}
|
||||
delete iter;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user