diff --git a/db/db_test.cc b/db/db_test.cc index 6f81d207a..89b8a380f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7442,7 +7442,7 @@ TEST(DBTest, MTRandomTimeoutTest) { /* * This test is not reliable enough as it heavily depends on disk behavior. - * + */ TEST(DBTest, RateLimitingTest) { Options options = CurrentOptions(); options.write_buffer_size = 1 << 20; // 1MB @@ -7473,7 +7473,7 @@ TEST(DBTest, RateLimitingTest) { // # rate limiting with 0.7 x threshold options.rate_limiter.reset( - NewRateLimiter(static_cast(0.7 * raw_rate))); + NewGenericRateLimiter(static_cast(0.7 * raw_rate))); env_->bytes_written_ = 0; DestroyAndReopen(&options); @@ -7489,11 +7489,11 @@ TEST(DBTest, RateLimitingTest) { env_->bytes_written_); double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate; fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio); - ASSERT_TRUE(ratio > 0.6 && ratio < 0.8); + ASSERT_TRUE(ratio < 0.8); // # rate limiting with half of the raw_rate options.rate_limiter.reset( - NewRateLimiter(static_cast(raw_rate / 2))); + NewGenericRateLimiter(static_cast(raw_rate / 2))); env_->bytes_written_ = 0; DestroyAndReopen(&options); @@ -7509,9 +7509,8 @@ TEST(DBTest, RateLimitingTest) { env_->bytes_written_); ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate; fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio); - ASSERT_TRUE(ratio > 0.4 && ratio < 0.6); + ASSERT_TRUE(ratio < 0.6); } -*/ } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index ac8eaae4b..6c213b9c6 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -39,7 +39,6 @@ class Slice; class SliceTransform; class Statistics; class InternalKeyComparator; -class RateLimiter; // DB contents are stored in a set of blocks, each of which holds a // sequence of key,value pairs. Each block may be compressed before @@ -1027,29 +1026,6 @@ struct FlushOptions { FlushOptions() : wait(true) {} }; -// Create a RateLimiter object, which can be shared among RocksDB instances to -// control write rate of flush and compaction. -// @rate_bytes_per_sec: this is the only parameter you want to set most of the -// time. It controls the total write rate of compaction and flush in bytes per -// second. Currently, RocksDB does not enforce rate limit for anything other -// than flush and compaction, e.g. write to WAL. -// @refill_period_us: this controls how often tokens are refilled. For example, -// when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to -// 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to -// burstier writes while smaller value introduces more CPU overhead. -// The default should work for most cases. -// @fairness: RateLimiter accepts high-pri requests and low-pri requests. -// A low-pri request is usually blocked in favor of hi-pri request. Currently, -// RocksDB assigns low-pri to request from compaciton and high-pri to request -// from flush. Low-pri requests can get blocked if flush requests come in -// continuouly. This fairness parameter grants low-pri requests permission by -// 1/fairness chance even though high-pri requests exist to avoid starvation. -// You should be good by leaving it at default 10. -extern RateLimiter* NewRateLimiter( - int64_t rate_bytes_per_sec, - int64_t refill_period_us = 100 * 1000, - int32_t fairness = 10); - // Get options based on some guidelines. Now only tune parameter based on // flush/compaction and fill default parameters for other parameters. // total_write_buffer_limit: budget for memory spent for mem tables diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h new file mode 100644 index 000000000..027c9fa54 --- /dev/null +++ b/include/rocksdb/rate_limiter.h @@ -0,0 +1,60 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/env.h" + +namespace rocksdb { + +class RateLimiter { + public: + virtual ~RateLimiter() {} + + // Request for token to write bytes. If this request can not be satisfied, + // the call is blocked. Caller is responsible to make sure + // bytes < GetSingleBurstBytes() + virtual void Request(const int64_t bytes, const Env::IOPriority pri) = 0; + + // Max bytes can be granted in a single burst + virtual int64_t GetSingleBurstBytes() const = 0; + + // Total bytes that go though rate limiter + virtual int64_t GetTotalBytesThrough( + const Env::IOPriority pri = Env::IO_TOTAL) const = 0; + + // Total # of requests that go though rate limiter + virtual int64_t GetTotalRequests( + const Env::IOPriority pri = Env::IO_TOTAL) const = 0; +}; + +// Create a RateLimiter object, which can be shared among RocksDB instances to +// control write rate of flush and compaction. +// @rate_bytes_per_sec: this is the only parameter you want to set most of the +// time. It controls the total write rate of compaction and flush in bytes per +// second. Currently, RocksDB does not enforce rate limit for anything other +// than flush and compaction, e.g. write to WAL. +// @refill_period_us: this controls how often tokens are refilled. For example, +// when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to +// 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to +// burstier writes while smaller value introduces more CPU overhead. +// The default should work for most cases. +// @fairness: RateLimiter accepts high-pri requests and low-pri requests. +// A low-pri request is usually blocked in favor of hi-pri request. Currently, +// RocksDB assigns low-pri to request from compaciton and high-pri to request +// from flush. Low-pri requests can get blocked if flush requests come in +// continuouly. This fairness parameter grants low-pri requests permission by +// 1/fairness chance even though high-pri requests exist to avoid starvation. +// You should be good by leaving it at default 10. +extern RateLimiter* NewGenericRateLimiter( + int64_t rate_bytes_per_sec, + int64_t refill_period_us = 100 * 1000, + int32_t fairness = 10); + +} // namespace rocksdb diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 4f7a09e31..cde86f3c9 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -14,7 +14,7 @@ namespace rocksdb { // Pending request -struct RateLimiter::Req { +struct GenericRateLimiter::Req { explicit Req(int64_t bytes, port::Mutex* mu) : bytes(bytes), cv(mu), granted(false) {} int64_t bytes; @@ -23,7 +23,9 @@ struct RateLimiter::Req { }; -RateLimiter::RateLimiter(int64_t rate_bytes_per_sec, int64_t refill_period_us, +GenericRateLimiter::GenericRateLimiter( + int64_t rate_bytes_per_sec, + int64_t refill_period_us, int32_t fairness) : refill_period_us_(refill_period_us), refill_bytes_per_period_(rate_bytes_per_sec * refill_period_us / 1000000.0), @@ -42,7 +44,7 @@ RateLimiter::RateLimiter(int64_t rate_bytes_per_sec, int64_t refill_period_us, total_bytes_through_[1] = 0; } -RateLimiter::~RateLimiter() { +GenericRateLimiter::~GenericRateLimiter() { MutexLock g(&request_mutex_); stop_ = true; requests_to_wait_ = queue_[Env::IO_LOW].size() + queue_[Env::IO_HIGH].size(); @@ -57,7 +59,7 @@ RateLimiter::~RateLimiter() { } } -void RateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { +void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { assert(bytes < refill_bytes_per_period_); MutexLock g(&request_mutex_); @@ -163,7 +165,7 @@ void RateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { } while (!r.granted); } -void RateLimiter::Refill() { +void GenericRateLimiter::Refill() { next_refill_us_ = env_->NowMicros() + refill_period_us_; // Carry over the left over quota from the last period if (available_bytes_ < refill_bytes_per_period_) { @@ -192,9 +194,10 @@ void RateLimiter::Refill() { } } -RateLimiter* NewRateLimiter( +RateLimiter* NewGenericRateLimiter( int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) { - return new RateLimiter(rate_bytes_per_sec, refill_period_us, fairness); + return new GenericRateLimiter( + rate_bytes_per_sec, refill_period_us, fairness); } } // namespace rocksdb diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 28624c395..133c85950 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -9,29 +9,34 @@ #pragma once -#include #include - #include "port/port_posix.h" #include "util/mutexlock.h" #include "util/random.h" #include "rocksdb/env.h" +#include "rocksdb/rate_limiter.h" namespace rocksdb { -class RateLimiter { +class GenericRateLimiter : public RateLimiter { public: - RateLimiter(int64_t refill_bytes, int64_t refill_period_us, int32_t fairness); + GenericRateLimiter(int64_t refill_bytes, + int64_t refill_period_us, int32_t fairness); - ~RateLimiter(); + virtual ~GenericRateLimiter(); // Request for token to write bytes. If this request can not be satisfied, // the call is blocked. Caller is responsible to make sure // bytes < GetSingleBurstBytes() - void Request(const int64_t bytes, const Env::IOPriority pri); + virtual void Request(const int64_t bytes, const Env::IOPriority pri) override; - int64_t GetTotalBytesThrough( - const Env::IOPriority pri = Env::IO_TOTAL) const { + virtual int64_t GetSingleBurstBytes() const override { + // const var + return refill_bytes_per_period_; + } + + virtual int64_t GetTotalBytesThrough( + const Env::IOPriority pri = Env::IO_TOTAL) const override { MutexLock g(&request_mutex_); if (pri == Env::IO_TOTAL) { return total_bytes_through_[Env::IO_LOW] + @@ -40,7 +45,8 @@ class RateLimiter { return total_bytes_through_[pri]; } - int64_t GetTotalRequests(const Env::IOPriority pri = Env::IO_TOTAL) const { + virtual int64_t GetTotalRequests( + const Env::IOPriority pri = Env::IO_TOTAL) const override { MutexLock g(&request_mutex_); if (pri == Env::IO_TOTAL) { return total_requests_[Env::IO_LOW] + total_requests_[Env::IO_HIGH]; @@ -48,16 +54,6 @@ class RateLimiter { return total_requests_[pri]; } - int64_t GetSingleBurstBytes() const { - // const var - return refill_bytes_per_period_; - } - - int64_t GetAvailableBytes() const { - MutexLock g(&request_mutex_); - return available_bytes_; - } - private: void Refill(); diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index 66735f9e8..1b72e4ed0 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -21,14 +21,14 @@ class RateLimiterTest { }; TEST(RateLimiterTest, StartStop) { - std::unique_ptr limiter(new RateLimiter(100, 100, 10)); + std::unique_ptr limiter(new GenericRateLimiter(100, 100, 10)); } TEST(RateLimiterTest, Rate) { auto* env = Env::Default(); struct Arg { Arg(int64_t target_rate, int burst) - : limiter(new RateLimiter(target_rate, 100 * 1000, 10)), + : limiter(new GenericRateLimiter(target_rate, 100 * 1000, 10)), request_size(target_rate / 10), burst(burst) {} std::unique_ptr limiter;