diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 027c9fa54..44c1bdf8a 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -17,6 +17,10 @@ class RateLimiter { public: virtual ~RateLimiter() {} + // This API allows user to dynamically change rate limiter's bytes per second. + // REQUIRED: bytes_per_second > 0 + virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0; + // Request for token to write bytes. If this request can not be satisfied, // the call is blocked. Caller is responsible to make sure // bytes < GetSingleBurstBytes() diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 2beefd58f..3eff5068a 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -22,24 +22,23 @@ struct GenericRateLimiter::Req { bool granted; }; - -GenericRateLimiter::GenericRateLimiter( - int64_t rate_bytes_per_sec, - int64_t refill_period_us, - int32_t fairness) - : refill_period_us_(refill_period_us), - refill_bytes_per_period_(rate_bytes_per_sec * refill_period_us / 1000000.0), - env_(Env::Default()), - stop_(false), - exit_cv_(&request_mutex_), - requests_to_wait_(0), - total_requests_{0, 0}, - total_bytes_through_{0, 0}, - available_bytes_(0), - next_refill_us_(env_->NowMicros()), - fairness_(fairness > 100 ? 100 : fairness), - rnd_((uint32_t)time(nullptr)), - leader_(nullptr) { +GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, + int64_t refill_period_us, + int32_t fairness) + : refill_period_us_(refill_period_us), + refill_bytes_per_period_( + CalculateRefillBytesPerPeriod(rate_bytes_per_sec)), + env_(Env::Default()), + stop_(false), + exit_cv_(&request_mutex_), + requests_to_wait_(0), + total_requests_{0, 0}, + total_bytes_through_{0, 0}, + available_bytes_(0), + next_refill_us_(env_->NowMicros()), + fairness_(fairness > 100 ? 100 : fairness), + rnd_((uint32_t)time(nullptr)), + leader_(nullptr) { total_bytes_through_[0] = 0; total_bytes_through_[1] = 0; } @@ -60,8 +59,16 @@ GenericRateLimiter::~GenericRateLimiter() { } } +// This API allows user to dynamically change rate limiter's bytes per second. +void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { + assert(bytes_per_second > 0); + refill_bytes_per_period_.store( + CalculateRefillBytesPerPeriod(bytes_per_second), + std::memory_order_relaxed); +} + void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { - assert(bytes <= refill_bytes_per_period_); + assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed)); MutexLock g(&request_mutex_); if (stop_) { @@ -169,8 +176,10 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { void GenericRateLimiter::Refill() { next_refill_us_ = env_->NowMicros() + refill_period_us_; // Carry over the left over quota from the last period - if (available_bytes_ < refill_bytes_per_period_) { - available_bytes_ += refill_bytes_per_period_; + auto refill_bytes_per_period = + refill_bytes_per_period_.load(std::memory_order_relaxed); + if (available_bytes_ < refill_bytes_per_period) { + available_bytes_ += refill_bytes_per_period; } int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1; @@ -197,6 +206,9 @@ void GenericRateLimiter::Refill() { RateLimiter* NewGenericRateLimiter( int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) { + assert(rate_bytes_per_sec > 0); + assert(refill_period_us > 0); + assert(fairness > 0); return new GenericRateLimiter( rate_bytes_per_sec, refill_period_us, fairness); } diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 133c85950..3840c4edd 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -9,6 +9,7 @@ #pragma once +#include #include #include "port/port_posix.h" #include "util/mutexlock.h" @@ -25,14 +26,16 @@ class GenericRateLimiter : public RateLimiter { virtual ~GenericRateLimiter(); + // This API allows user to dynamically change rate limiter's bytes per second. + virtual void SetBytesPerSecond(int64_t bytes_per_second) override; + // Request for token to write bytes. If this request can not be satisfied, // the call is blocked. Caller is responsible to make sure // bytes < GetSingleBurstBytes() virtual void Request(const int64_t bytes, const Env::IOPriority pri) override; virtual int64_t GetSingleBurstBytes() const override { - // const var - return refill_bytes_per_period_; + return refill_bytes_per_period_.load(std::memory_order_relaxed); } virtual int64_t GetTotalBytesThrough( @@ -56,12 +59,16 @@ class GenericRateLimiter : public RateLimiter { private: void Refill(); + int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec) { + return rate_bytes_per_sec * refill_period_us_ / 1000000.0; + } // This mutex guard all internal states mutable port::Mutex request_mutex_; const int64_t refill_period_us_; - const int64_t refill_bytes_per_period_; + // This variable can be changed dynamically. + std::atomic refill_bytes_per_period_; Env* const env_; bool stop_; diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index ccce84dda..d635010a4 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -54,25 +54,36 @@ TEST_F(RateLimiterTest, Rate) { } }; - for (int i = 1; i <= 16; i*=2) { + for (int i = 1; i <= 16; i *= 2) { int32_t target = i * 1024 * 10; Arg arg(target, i / 4 + 1); - auto start = env->NowMicros(); - for (int t = 0; t < i; ++t) { - env->StartThread(writer, &arg); + int64_t old_total_bytes_through = 0; + for (int iter = 1; iter <= 2; ++iter) { + // second iteration changes the target dynamically + if (iter == 2) { + target *= 2; + arg.limiter->SetBytesPerSecond(target); + } + auto start = env->NowMicros(); + for (int t = 0; t < i; ++t) { + env->StartThread(writer, &arg); + } + env->WaitForJoin(); + + auto elapsed = env->NowMicros() - start; + double rate = + (arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) * + 1000000.0 / elapsed; + old_total_bytes_through = arg.limiter->GetTotalBytesThrough(); + fprintf(stderr, + "request size [1 - %" PRIi32 "], limit %" PRIi32 + " KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n", + arg.request_size - 1, target / 1024, rate / 1024, + elapsed / 1000000.0); + + ASSERT_GE(rate / target, 0.9); + ASSERT_LE(rate / target, 1.1); } - env->WaitForJoin(); - - auto elapsed = env->NowMicros() - start; - double rate = arg.limiter->GetTotalBytesThrough() - * 1000000.0 / elapsed; - fprintf(stderr, "request size [1 - %" PRIi32 "], limit %" PRIi32 - " KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n", - arg.request_size - 1, target / 1024, rate / 1024, - elapsed / 1000000.0); - - ASSERT_GE(rate / target, 0.95); - ASSERT_LE(rate / target, 1.05); } }