cb2476a0ca
Summary: The current implementation of rate limiter has the possibility to introduce resource starvation when change its limit. This diff aims to fix this problem by consuming request bytes partially. Test Plan: ``` ./rate_limiter_test [==========] Running 4 tests from 1 test case. [----------] Global test environment set-up. [----------] 4 tests from RateLimiterTest [ RUN ] RateLimiterTest.OverflowRate [ OK ] RateLimiterTest.OverflowRate (0 ms) [ RUN ] RateLimiterTest.StartStop [ OK ] RateLimiterTest.StartStop (0 ms) [ RUN ] RateLimiterTest.Rate request size [1 - 1023], limit 10 KB/sec, actual rate: 10.355712 KB/sec, elapsed 2.00 seconds request size [1 - 1023], limit 20 KB/sec, actual rate: 19.136564 KB/sec, elapsed 2.00 seconds request size [1 - 2047], limit 20 KB/sec, actual rate: 20.783976 KB/sec, elapsed 2.10 seconds request size [1 - 2047], limit 40 KB/sec, actual rate: 39.308144 KB/sec, elapsed 2.10 seconds request size [1 - 4095], limit 40 KB/sec, actual rate: 40.318349 KB/sec, elapsed 2.20 seconds request size [1 - 4095], limit 80 KB/sec, actual rate: 79.667396 KB/sec, elapsed 2.20 seconds request size [1 - 8191], limit 80 KB/sec, actual rate: 81.807158 KB/sec, elapsed 2.30 seconds request size [1 - 8191], limit 160 KB/sec, actual rate: 160.659761 KB/sec, elapsed 2.20 seconds request size [1 - 16383], limit 160 KB/sec, actual rate: 160.700990 KB/sec, elapsed 3.00 seconds request size [1 - 16383], limit 320 KB/sec, actual rate: 317.639481 KB/sec, elapsed 2.50 seconds [ OK ] RateLimiterTest.Rate (22618 ms) [ RUN ] RateLimiterTest.LimitChangeTest [COMPLETE] request size 10 KB, new limit 20KB/sec, refill period 1000 ms [COMPLETE] request size 10 KB, new limit 5KB/sec, refill period 1000 ms [COMPLETE] request size 20 KB, new limit 40KB/sec, refill period 1000 ms [COMPLETE] request size 20 KB, new limit 10KB/sec, refill period 1000 ms [COMPLETE] request size 40 KB, new limit 80KB/sec, refill period 1000 ms [COMPLETE] request size 40 KB, new limit 20KB/sec, refill period 1000 ms [COMPLETE] request size 80 KB, new limit 160KB/sec, refill period 1000 ms [COMPLETE] request size 80 KB, new limit 40KB/sec, refill period 1000 ms [COMPLETE] request size 160 KB, new limit 320KB/sec, refill period 1000 ms [COMPLETE] request size 160 KB, new limit 80KB/sec, refill period 1000 ms [ OK ] RateLimiterTest.LimitChangeTest (5002 ms) [----------] 4 tests from RateLimiterTest (27620 ms total) [----------] Global test environment tear-down [==========] 4 tests from 1 test case ran. (27621 ms total) [ PASSED ] 4 tests. ``` Reviewers: sdong, IslamAbdelRahman, yiwu, andrewkr Reviewed By: andrewkr Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D60207
151 lines
5.2 KiB
C++
151 lines
5.2 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#ifndef __STDC_FORMAT_MACROS
|
|
#define __STDC_FORMAT_MACROS
|
|
#endif
|
|
|
|
#include "util/rate_limiter.h"
|
|
#include <inttypes.h>
|
|
#include <limits>
|
|
#include "rocksdb/env.h"
|
|
#include "util/random.h"
|
|
#include "util/sync_point.h"
|
|
#include "util/testharness.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
class RateLimiterTest : public testing::Test {};
|
|
|
|
TEST_F(RateLimiterTest, OverflowRate) {
|
|
GenericRateLimiter limiter(port::kMaxInt64, 1000, 10);
|
|
ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll);
|
|
}
|
|
|
|
TEST_F(RateLimiterTest, StartStop) {
|
|
std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(100, 100, 10));
|
|
}
|
|
|
|
TEST_F(RateLimiterTest, Rate) {
|
|
auto* env = Env::Default();
|
|
struct Arg {
|
|
Arg(int32_t _target_rate, int _burst)
|
|
: limiter(new GenericRateLimiter(_target_rate, 100 * 1000, 10)),
|
|
request_size(_target_rate / 10),
|
|
burst(_burst) {}
|
|
std::unique_ptr<RateLimiter> limiter;
|
|
int32_t request_size;
|
|
int burst;
|
|
};
|
|
|
|
auto writer = [](void* p) {
|
|
auto* thread_env = Env::Default();
|
|
auto* arg = static_cast<Arg*>(p);
|
|
// Test for 2 seconds
|
|
auto until = thread_env->NowMicros() + 2 * 1000000;
|
|
Random r((uint32_t)(thread_env->NowNanos() %
|
|
std::numeric_limits<uint32_t>::max()));
|
|
while (thread_env->NowMicros() < until) {
|
|
for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) {
|
|
arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
|
|
Env::IO_HIGH);
|
|
}
|
|
arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW);
|
|
}
|
|
};
|
|
|
|
for (int i = 1; i <= 16; i *= 2) {
|
|
int32_t target = i * 1024 * 10;
|
|
Arg arg(target, i / 4 + 1);
|
|
int64_t old_total_bytes_through = 0;
|
|
for (int iter = 1; iter <= 2; ++iter) {
|
|
// second iteration changes the target dynamically
|
|
if (iter == 2) {
|
|
target *= 2;
|
|
arg.limiter->SetBytesPerSecond(target);
|
|
}
|
|
auto start = env->NowMicros();
|
|
for (int t = 0; t < i; ++t) {
|
|
env->StartThread(writer, &arg);
|
|
}
|
|
env->WaitForJoin();
|
|
|
|
auto elapsed = env->NowMicros() - start;
|
|
double rate =
|
|
(arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) *
|
|
1000000.0 / elapsed;
|
|
old_total_bytes_through = arg.limiter->GetTotalBytesThrough();
|
|
fprintf(stderr,
|
|
"request size [1 - %" PRIi32 "], limit %" PRIi32
|
|
" KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n",
|
|
arg.request_size - 1, target / 1024, rate / 1024,
|
|
elapsed / 1000000.0);
|
|
|
|
ASSERT_GE(rate / target, 0.9);
|
|
ASSERT_LE(rate / target, 1.1);
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(RateLimiterTest, LimitChangeTest) {
|
|
// starvation test when limit changes to a smaller value
|
|
int64_t refill_period = 1000 * 1000;
|
|
auto* env = Env::Default();
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
struct Arg {
|
|
Arg(int32_t _request_size, Env::IOPriority _pri,
|
|
std::shared_ptr<RateLimiter> _limiter)
|
|
: request_size(_request_size), pri(_pri), limiter(_limiter) {}
|
|
int32_t request_size;
|
|
Env::IOPriority pri;
|
|
std::shared_ptr<RateLimiter> limiter;
|
|
};
|
|
|
|
auto writer = [](void* p) {
|
|
auto* arg = static_cast<Arg*>(p);
|
|
arg->limiter->Request(arg->request_size, arg->pri);
|
|
};
|
|
|
|
for (uint32_t i = 1; i <= 16; i <<= 1) {
|
|
int32_t target = i * 1024 * 10;
|
|
// refill per second
|
|
for (int iter = 0; iter < 2; iter++) {
|
|
std::shared_ptr<RateLimiter> limiter =
|
|
std::make_shared<GenericRateLimiter>(target, refill_period, 10);
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
|
{{"GenericRateLimiter::Request",
|
|
"RateLimiterTest::LimitChangeTest:changeLimitStart"},
|
|
{"RateLimiterTest::LimitChangeTest:changeLimitEnd",
|
|
"GenericRateLimiter::Refill"}});
|
|
Arg arg(target, Env::IO_HIGH, limiter);
|
|
// The idea behind is to start a request first, then before it refills,
|
|
// update limit to a different value (2X/0.5X). No starvation should
|
|
// be guaranteed under any situation
|
|
// TODO(lightmark): more test cases are welcome.
|
|
env->StartThread(writer, &arg);
|
|
int32_t new_limit = (target << 1) >> (iter << 1);
|
|
TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart");
|
|
arg.limiter->SetBytesPerSecond(new_limit);
|
|
TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd");
|
|
env->WaitForJoin();
|
|
fprintf(stderr,
|
|
"[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32
|
|
"KB/sec, refill period %" PRIi64 " ms\n",
|
|
target / 1024, new_limit / 1024, refill_period / 1000);
|
|
}
|
|
}
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|