Add public API RateLimiter::GetTotalPendingRequests() (#8890)

Summary:
Context/Summary:
As users requested, a public API RateLimiter::GetTotalPendingRequests() is added to expose the total number of pending requests for bytes in the rate limiter, which is the size of the request queue of that priority (or of all priorities, if IO_TOTAL is interested) at the time when this API is called.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8890

Test Plan:
- Passing added new unit tests
- Passing existing unit tests

Reviewed By: ajkr

Differential Revision: D30815500

Pulled By: hx235

fbshipit-source-id: 2dfa990f651c1c47378b6215c751ad76a5824300
This commit is contained in:
Hui Xiao 2021-09-10 08:35:59 -07:00 committed by Facebook GitHub Bot
parent 0fb938c448
commit 12542488ef
5 changed files with 62 additions and 1 deletions

View File

@ -19,6 +19,7 @@
### Public API change ### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API * Remove obsolete implementation details FullKey and ParseFullKey from public API
* Add a public API RateLimiter::GetTotalPendingRequests() for the total number of requests that are pending for bytes in the rate limiter.
## 6.24.0 (2021-08-20) ## 6.24.0 (2021-08-20)
### Bug Fixes ### Bug Fixes

View File

@ -89,6 +89,10 @@ class RateLimiter {
virtual int64_t GetTotalRequests( virtual int64_t GetTotalRequests(
const Env::IOPriority pri = Env::IO_TOTAL) const = 0; const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
// Total # of requests that are pending for bytes in rate limiter
virtual int64_t GetTotalPendingRequests(
const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
virtual int64_t GetBytesPerSecond() const = 0; virtual int64_t GetBytesPerSecond() const = 0;
virtual bool IsRateLimited(OpType op_type) { virtual bool IsRateLimited(OpType op_type) {

View File

@ -143,7 +143,8 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
// Request cannot be satisfied at this moment, enqueue // Request cannot be satisfied at this moment, enqueue
Req r(bytes, &request_mutex_); Req r(bytes, &request_mutex_);
queue_[pri].push_back(&r); queue_[pri].push_back(&r);
TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostEnqueueRequest",
&request_mutex_);
// A thread representing a queued request coordinates with other such threads. // A thread representing a queued request coordinates with other such threads.
// There are two main duties. // There are two main duties.
// //

View File

@ -72,6 +72,19 @@ class GenericRateLimiter : public RateLimiter {
return total_requests_[pri]; return total_requests_[pri];
} }
virtual int64_t GetTotalPendingRequests(
const Env::IOPriority pri = Env::IO_TOTAL) const override {
MutexLock g(&request_mutex_);
if (pri == Env::IO_TOTAL) {
int64_t total_pending_requests_sum = 0;
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
}
return total_pending_requests_sum;
}
return static_cast<int64_t>(queue_[pri].size());
}
virtual int64_t GetBytesPerSecond() const override { virtual int64_t GetBytesPerSecond() const override {
return rate_bytes_per_sec_; return rate_bytes_per_sec_;
} }

View File

@ -15,6 +15,7 @@
#include <limits> #include <limits>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/port.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
@ -89,6 +90,47 @@ TEST_F(RateLimiterTest, GetTotalRequests) {
"Env::IO_TOTAL"; "Env::IO_TOTAL";
} }
TEST_F(RateLimiterTest, GetTotalPendingRequests) {
std::unique_ptr<RateLimiter> limiter(
NewGenericRateLimiter(20 /* rate_bytes_per_sec */));
for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast<Env::IOPriority>(i)),
0);
}
// This is a variable for making sure the following callback is called
// and the assertions in it are indeed excuted
bool nonzero_pending_requests_verified_ = false;
SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) {
port::Mutex* request_mutex = (port::Mutex*)arg;
// We temporarily unlock the mutex so that the following
// GetTotalPendingRequests() can acquire it
request_mutex->Unlock();
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1);
// We lock the mutex again so that the request thread can resume running
// with the mutex locked
request_mutex->Lock();
nonzero_pending_requests_verified_ = true;
});
SyncPoint::GetInstance()->EnableProcessing();
limiter->Request(20, Env::IO_USER, nullptr /* stats */,
RateLimiter::OpType::kWrite);
ASSERT_EQ(nonzero_pending_requests_verified_, true);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::Request:PostEnqueueRequest");
}
TEST_F(RateLimiterTest, Modes) { TEST_F(RateLimiterTest, Modes) {
for (auto mode : {RateLimiter::Mode::kWritesOnly, for (auto mode : {RateLimiter::Mode::kWritesOnly,
RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) { RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {