rocksdb/util/rate_limiter.cc
mrambacher 7cd5835a28 Make RateLimiter Customizable (#9141)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/9141

Reviewed By: zhichao-cao

Differential Revision: D32432190

Pulled By: mrambacher

fbshipit-source-id: 7930ed88a02412128cd407b5063522484e45c6ce
2021-12-01 06:57:02 -08:00

497 lines
18 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/rate_limiter.h"
#include <algorithm>
#include "monitoring/statistics.h"
#include "port/port.h"
#include "rocksdb/convenience.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
#include "test_util/sync_point.h"
#include "util/aligned_buffer.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
Env::IOPriority io_priority, Statistics* stats,
RateLimiter::OpType op_type) {
if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) {
bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes()));
if (alignment > 0) {
// Here we may actually require more than burst and block
// but we can not write less than one page at a time on direct I/O
// thus we may want not to use ratelimiter
bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
}
Request(bytes, io_priority, stats, op_type);
}
return bytes;
}
// Pending request
struct GenericRateLimiter::Req {
explicit Req(int64_t _bytes, port::Mutex* _mu)
: request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {}
int64_t request_bytes;
int64_t bytes;
port::CondVar cv;
bool granted;
};
static std::unordered_map<std::string, OptionTypeInfo>
generic_rate_limiter_type_info = {
#ifndef ROCKSDB_LITE
{"rate_bytes_per_sec",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
max_bytes_per_sec),
OptionType::kInt64T}},
{"refill_period_us",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
refill_period_us),
OptionType::kInt64T}},
{"fairness",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
fairness),
OptionType::kInt32T}},
{"auto_tuned",
{offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
auto_tuned),
OptionType::kBoolean}},
{"clock",
OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
offsetof(struct GenericRateLimiter::GenericRateLimiterOptions,
clock),
OptionVerificationType::kByNameAllowFromNull,
OptionTypeFlags::kAllowNull)},
#endif // ROCKSDB_LITE
};
GenericRateLimiter::GenericRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness,
RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock,
bool auto_tuned)
: RateLimiter(mode),
options_(rate_bytes_per_sec, refill_period_us, fairness, clock,
auto_tuned),
stop_(false),
exit_cv_(&request_mutex_),
requests_to_wait_(0),
available_bytes_(0),
rnd_((uint32_t)time(nullptr)),
wait_until_refill_pending_(false),
num_drains_(0),
prev_num_drains_(0) {
RegisterOptions(&options_, &generic_rate_limiter_type_info);
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
total_requests_[i] = 0;
total_bytes_through_[i] = 0;
}
Initialize();
}
void GenericRateLimiter::Initialize() {
if (options_.clock == nullptr) {
options_.clock = SystemClock::Default();
}
options_.fairness = std::min(options_.fairness, 100);
next_refill_us_ = NowMicrosMonotonic();
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic());
if (options_.auto_tuned) {
rate_bytes_per_sec_ = options_.max_bytes_per_sec / 2;
} else {
rate_bytes_per_sec_ = options_.max_bytes_per_sec;
}
refill_bytes_per_period_ = CalculateRefillBytesPerPeriod(rate_bytes_per_sec_);
}
GenericRateLimiter::~GenericRateLimiter() {
MutexLock g(&request_mutex_);
stop_ = true;
std::deque<Req*>::size_type queues_size_sum = 0;
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
queues_size_sum += queue_[i].size();
}
requests_to_wait_ = static_cast<int32_t>(queues_size_sum);
for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) {
std::deque<Req*> queue = queue_[i];
for (auto& r : queue) {
r->cv.Signal();
}
}
while (requests_to_wait_ > 0) {
exit_cv_.Wait();
}
}
Status GenericRateLimiter::PrepareOptions(const ConfigOptions& options) {
if (options_.fairness <= 0) {
return Status::InvalidArgument("Fairness must be > 0");
} else if (options_.max_bytes_per_sec <= 0) {
return Status::InvalidArgument("max_bytes_per_sec must be > 0");
} else if (options_.refill_period_us <= 0) {
return Status::InvalidArgument("Refill_period_us must be > 0");
}
Initialize();
return RateLimiter::PrepareOptions(options);
}
// This API allows user to dynamically change rate limiter's bytes per second.
void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
assert(bytes_per_second > 0);
rate_bytes_per_sec_ = bytes_per_second;
refill_bytes_per_period_.store(
CalculateRefillBytesPerPeriod(bytes_per_second),
std::memory_order_relaxed);
}
void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
Statistics* stats) {
assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
bytes = std::max(static_cast<int64_t>(0), bytes);
TEST_SYNC_POINT("GenericRateLimiter::Request");
TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
&rate_bytes_per_sec_);
MutexLock g(&request_mutex_);
if (options_.auto_tuned) {
static const int kRefillsPerTune = 100;
std::chrono::microseconds now(NowMicrosMonotonic());
if (now - tuned_time_ >= kRefillsPerTune * std::chrono::microseconds(
options_.refill_period_us)) {
Status s = Tune();
s.PermitUncheckedError(); //**TODO: What to do on error?
}
}
if (stop_) {
// It is now in the clean-up of ~GenericRateLimiter().
// Therefore any new incoming request will exit from here
// and not get satiesfied.
return;
}
++total_requests_[pri];
if (available_bytes_ >= bytes) {
// Refill thread assigns quota and notifies requests waiting on
// the queue under mutex. So if we get here, that means nobody
// is waiting?
available_bytes_ -= bytes;
total_bytes_through_[pri] += bytes;
return;
}
// Request cannot be satisfied at this moment, enqueue
Req r(bytes, &request_mutex_);
queue_[pri].push_back(&r);
TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostEnqueueRequest",
&request_mutex_);
// A thread representing a queued request coordinates with other such threads.
// There are two main duties.
//
// (1) Waiting for the next refill time.
// (2) Refilling the bytes and granting requests.
do {
int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonic();
if (time_until_refill_us > 0) {
if (wait_until_refill_pending_) {
// Somebody is performing (1). Trust we'll be woken up when our request
// is granted or we are needed for future duties.
r.cv.Wait();
} else {
// Whichever thread reaches here first performs duty (1) as described
// above.
int64_t wait_until = options_.clock->NowMicros() + time_until_refill_us;
RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
++num_drains_;
wait_until_refill_pending_ = true;
r.cv.TimedWait(wait_until);
TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait",
&time_until_refill_us);
wait_until_refill_pending_ = false;
}
} else {
// Whichever thread reaches here first performs duty (2) as described
// above.
RefillBytesAndGrantRequests();
if (r.granted) {
// If there is any remaining requests, make sure there exists at least
// one candidate is awake for future duties by signaling a front request
// of a queue.
for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) {
std::deque<Req*> queue = queue_[i];
if (!queue.empty()) {
queue.front()->cv.Signal();
break;
}
}
}
}
// Invariant: non-granted request is always in one queue, and granted
// request is always in zero queues.
#ifndef NDEBUG
int num_found = 0;
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
if (std::find(queue_[i].begin(), queue_[i].end(), &r) !=
queue_[i].end()) {
++num_found;
}
}
if (r.granted) {
assert(num_found == 0);
} else {
assert(num_found == 1);
}
#endif // NDEBUG
} while (!stop_ && !r.granted);
if (stop_) {
// It is now in the clean-up of ~GenericRateLimiter().
// Therefore any woken-up request will have come out of the loop and then
// exit here. It might or might not have been satisfied.
--requests_to_wait_;
exit_cv_.Signal();
}
}
std::vector<Env::IOPriority>
GenericRateLimiter::GeneratePriorityIterationOrder() {
std::vector<Env::IOPriority> pri_iteration_order(Env::IO_TOTAL /* 4 */);
// We make Env::IO_USER a superior priority by always iterating its queue
// first
pri_iteration_order[0] = Env::IO_USER;
bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(options_.fairness);
TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"PostRandomOneInFairnessForHighPri",
&high_pri_iterated_after_mid_low_pri);
bool mid_pri_itereated_after_low_pri = rnd_.OneIn(options_.fairness);
TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"PostRandomOneInFairnessForMidPri",
&mid_pri_itereated_after_low_pri);
if (high_pri_iterated_after_mid_low_pri) {
pri_iteration_order[3] = Env::IO_HIGH;
pri_iteration_order[2] =
mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW;
pri_iteration_order[1] =
(pri_iteration_order[2] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID;
} else {
pri_iteration_order[1] = Env::IO_HIGH;
pri_iteration_order[3] =
mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW;
pri_iteration_order[2] =
(pri_iteration_order[3] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID;
}
TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"PreReturnPriIterationOrder",
&pri_iteration_order);
return pri_iteration_order;
}
void GenericRateLimiter::RefillBytesAndGrantRequests() {
TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests");
next_refill_us_ = NowMicrosMonotonic() + options_.refill_period_us;
// Carry over the left over quota from the last period
auto refill_bytes_per_period =
refill_bytes_per_period_.load(std::memory_order_relaxed);
if (available_bytes_ < refill_bytes_per_period) {
available_bytes_ += refill_bytes_per_period;
}
std::vector<Env::IOPriority> pri_iteration_order =
GeneratePriorityIterationOrder();
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
assert(!pri_iteration_order.empty());
Env::IOPriority current_pri = pri_iteration_order[i];
auto* queue = &queue_[current_pri];
while (!queue->empty()) {
auto* next_req = queue->front();
if (available_bytes_ < next_req->request_bytes) {
// Grant partial request_bytes to avoid starvation of requests
// that become asking for more bytes than available_bytes_
// due to dynamically reduced rate limiter's bytes_per_second that
// leads to reduced refill_bytes_per_period hence available_bytes_
next_req->request_bytes -= available_bytes_;
available_bytes_ = 0;
break;
}
available_bytes_ -= next_req->request_bytes;
next_req->request_bytes = 0;
total_bytes_through_[current_pri] += next_req->bytes;
queue->pop_front();
next_req->granted = true;
// Quota granted, signal the thread to exit
next_req->cv.Signal();
}
}
}
int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
int64_t rate_bytes_per_sec) {
if (port::kMaxInt64 / rate_bytes_per_sec < options_.refill_period_us) {
// Avoid unexpected result in the overflow case. The result now is still
// inaccurate but is a number that is large enough.
return port::kMaxInt64 / 1000000;
} else {
return rate_bytes_per_sec * options_.refill_period_us / 1000000;
}
}
Status GenericRateLimiter::Tune() {
const int kLowWatermarkPct = 50;
const int kHighWatermarkPct = 90;
const int kAdjustFactorPct = 5;
// computed rate limit will be in
// `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`.
const int kAllowedRangeFactor = 20;
std::chrono::microseconds prev_tuned_time = tuned_time_;
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic());
int64_t elapsed_intervals =
(tuned_time_ - prev_tuned_time +
std::chrono::microseconds(options_.refill_period_us) -
std::chrono::microseconds(1)) /
std::chrono::microseconds(options_.refill_period_us);
// We tune every kRefillsPerTune intervals, so the overflow and division-by-
// zero conditions should never happen.
assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100);
assert(elapsed_intervals > 0);
int64_t drained_pct =
(num_drains_ - prev_num_drains_) * 100 / elapsed_intervals;
int64_t prev_bytes_per_sec = GetBytesPerSecond();
int64_t new_bytes_per_sec;
if (drained_pct == 0) {
new_bytes_per_sec = options_.max_bytes_per_sec / kAllowedRangeFactor;
} else if (drained_pct < kLowWatermarkPct) {
// sanitize to prevent overflow
int64_t sanitized_prev_bytes_per_sec =
std::min(prev_bytes_per_sec, port::kMaxInt64 / 100);
new_bytes_per_sec =
std::max(options_.max_bytes_per_sec / kAllowedRangeFactor,
sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
} else if (drained_pct > kHighWatermarkPct) {
// sanitize to prevent overflow
int64_t sanitized_prev_bytes_per_sec = std::min(
prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct));
new_bytes_per_sec =
std::min(options_.max_bytes_per_sec,
sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
} else {
new_bytes_per_sec = prev_bytes_per_sec;
}
if (new_bytes_per_sec != prev_bytes_per_sec) {
SetBytesPerSecond(new_bytes_per_sec);
}
num_drains_ = prev_num_drains_;
return Status::OK();
}
RateLimiter* NewGenericRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
int32_t fairness /* = 10 */,
RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */,
bool auto_tuned /* = false */) {
assert(rate_bytes_per_sec > 0);
assert(refill_period_us > 0);
assert(fairness > 0);
std::unique_ptr<RateLimiter> limiter(
new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
mode, SystemClock::Default(), auto_tuned));
Status s = limiter->PrepareOptions(ConfigOptions());
if (s.ok()) {
return limiter.release();
} else {
assert(false);
return nullptr;
}
}
namespace {
#ifndef ROCKSDB_LITE
static int RegisterBuiltinRateLimiters(ObjectLibrary& library,
const std::string& /*arg*/) {
library.Register<RateLimiter>(
GenericRateLimiter::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<RateLimiter>* guard,
std::string* /*errmsg*/) {
guard->reset(new GenericRateLimiter(port::kMaxInt64));
return guard->get();
});
size_t num_types;
return static_cast<int>(library.GetFactoryCount(&num_types));
}
static std::unordered_map<std::string, RateLimiter::Mode>
rate_limiter_mode_map = {
{"kReadsOnly", RateLimiter::Mode::kReadsOnly},
{"kWritesOnly", RateLimiter::Mode::kWritesOnly},
{"kAllIo", RateLimiter::Mode::kAllIo},
};
#endif // ROCKSDB_LITE
static bool LoadRateLimiter(const std::string& name,
std::shared_ptr<RateLimiter>* limiter) {
auto plen = strlen(GenericRateLimiter::kClassName());
if (name.size() > plen + 2 && name[plen] == ':' &&
StartsWith(name, GenericRateLimiter::kClassName())) {
auto rate = ParseInt64(name.substr(plen + 1));
limiter->reset(new GenericRateLimiter(rate));
return true;
} else {
return false;
}
}
static std::unordered_map<std::string, OptionTypeInfo> rate_limiter_type_info =
{
#ifndef ROCKSDB_LITE
{"mode",
OptionTypeInfo::Enum<RateLimiter::Mode>(0, &rate_limiter_mode_map)},
#endif // ROCKSDB_LITE
};
} // namespace
RateLimiter::RateLimiter(Mode mode) : mode_(mode) {
RegisterOptions("", &mode_, &rate_limiter_type_info);
}
Status RateLimiter::CreateFromString(const ConfigOptions& config_options,
const std::string& value,
std::shared_ptr<RateLimiter>* result) {
if (value.empty()) {
result->reset();
return Status::OK();
} else {
#ifndef ROCKSDB_LITE
static std::once_flag once;
std::call_once(once, [&]() {
RegisterBuiltinRateLimiters(*(ObjectLibrary::Default().get()), "");
});
#endif // ROCKSDB_LITE
return LoadSharedObject<RateLimiter>(config_options, value, LoadRateLimiter,
result);
}
}
} // namespace ROCKSDB_NAMESPACE