From 12542488ef587f405a019250ee512ee2936807f6 Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Fri, 10 Sep 2021 08:35:59 -0700 Subject: [PATCH] Add public API RateLimiter::GetTotalPendingRequests() (#8890) Summary: Context/Summary: As users requested, a public API RateLimiter::GetTotalPendingRequests() is added to expose the total number of pending requests for bytes in the rate limiter, which is the size of the request queue of that priority (or of all priorities, if IO_TOTAL is interested) at the time when this API is called. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8890 Test Plan: - Passing added new unit tests - Passing existing unit tests Reviewed By: ajkr Differential Revision: D30815500 Pulled By: hx235 fbshipit-source-id: 2dfa990f651c1c47378b6215c751ad76a5824300 --- HISTORY.md | 1 + include/rocksdb/rate_limiter.h | 4 ++++ util/rate_limiter.cc | 3 ++- util/rate_limiter.h | 13 +++++++++++ util/rate_limiter_test.cc | 42 ++++++++++++++++++++++++++++++++++ 5 files changed, 62 insertions(+), 1 deletion(-) diff --git a/HISTORY.md b/HISTORY.md index 821e58b24..ec65434d3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,7 @@ ### Public API change * Remove obsolete implementation details FullKey and ParseFullKey from public API +* Add a public API RateLimiter::GetTotalPendingRequests() for the total number of requests that are pending for bytes in the rate limiter. ## 6.24.0 (2021-08-20) ### Bug Fixes diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 0ee89f5c8..518db7aa6 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -89,6 +89,10 @@ class RateLimiter { virtual int64_t GetTotalRequests( const Env::IOPriority pri = Env::IO_TOTAL) const = 0; + // Total # of requests that are pending for bytes in rate limiter + virtual int64_t GetTotalPendingRequests( + const Env::IOPriority pri = Env::IO_TOTAL) const = 0; + virtual int64_t GetBytesPerSecond() const = 0; virtual bool IsRateLimited(OpType op_type) { diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 7d7936e5f..2faafdbbb 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -143,7 +143,8 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, // Request cannot be satisfied at this moment, enqueue Req r(bytes, &request_mutex_); queue_[pri].push_back(&r); - + TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostEnqueueRequest", + &request_mutex_); // A thread representing a queued request coordinates with other such threads. // There are two main duties. // diff --git a/util/rate_limiter.h b/util/rate_limiter.h index f3f593c36..7596374b0 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -72,6 +72,19 @@ class GenericRateLimiter : public RateLimiter { return total_requests_[pri]; } + virtual int64_t GetTotalPendingRequests( + const Env::IOPriority pri = Env::IO_TOTAL) const override { + MutexLock g(&request_mutex_); + if (pri == Env::IO_TOTAL) { + int64_t total_pending_requests_sum = 0; + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + total_pending_requests_sum += static_cast(queue_[i].size()); + } + return total_pending_requests_sum; + } + return static_cast(queue_[pri].size()); + } + virtual int64_t GetBytesPerSecond() const override { return rate_bytes_per_sec_; } diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index 27bbc2d31..cae82da34 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -15,6 +15,7 @@ #include #include "db/db_test_util.h" +#include "port/port.h" #include "rocksdb/system_clock.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" @@ -89,6 +90,47 @@ TEST_F(RateLimiterTest, GetTotalRequests) { "Env::IO_TOTAL"; } +TEST_F(RateLimiterTest, GetTotalPendingRequests) { + std::unique_ptr limiter( + NewGenericRateLimiter(20 /* rate_bytes_per_sec */)); + for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { + ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast(i)), + 0); + } + // This is a variable for making sure the following callback is called + // and the assertions in it are indeed excuted + bool nonzero_pending_requests_verified_ = false; + SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) { + port::Mutex* request_mutex = (port::Mutex*)arg; + // We temporarily unlock the mutex so that the following + // GetTotalPendingRequests() can acquire it + request_mutex->Unlock(); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1); + // We lock the mutex again so that the request thread can resume running + // with the mutex locked + request_mutex->Lock(); + nonzero_pending_requests_verified_ = true; + }); + + SyncPoint::GetInstance()->EnableProcessing(); + limiter->Request(20, Env::IO_USER, nullptr /* stats */, + RateLimiter::OpType::kWrite); + ASSERT_EQ(nonzero_pending_requests_verified_, true); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::Request:PostEnqueueRequest"); +} + TEST_F(RateLimiterTest, Modes) { for (auto mode : {RateLimiter::Mode::kWritesOnly, RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {