Return Status::NotSupported() in RateLimiter::GetTotalPendingRequests default impl (#8950)
Summary: Context: After more discussion, a fix in https://github.com/facebook/rocksdb/issues/8938 might turn out to be too restrictive for the case where `GetTotalPendingRequests` might be invoked on RateLimiter classes that does not support the recently added API `RateLimiter::GetTotalPendingRequests` (https://github.com/facebook/rocksdb/issues/8890) due to the `assert(false)` in https://github.com/facebook/rocksdb/issues/8938. Furthermore, sentinel value like `-1` proposed in https://github.com/facebook/rocksdb/issues/8938 is easy to be ignored and unchecked. Therefore we decided to adopt `Status::NotSupported()`, which is also a convention of adding new API to public header in RocksDB. - Changed return value type of `RateLimiter::GetTotalPendingRequests` in related declaration/definition - Passed in pointer argument to hold the output instead of returning it as before - Adapted to the changes above in calling `RateLimiter::GetTotalPendingRequests` in test - Minor improvement to `TEST_F(RateLimiterTest, GetTotalPendingRequests)`: added failure message for assertion and replaced repetitive statements with a loop Pull Request resolved: https://github.com/facebook/rocksdb/pull/8950 Reviewed By: ajkr, pdillinger Differential Revision: D31128450 Pulled By: hx235 fbshipit-source-id: 282ac9c4f3dacaa0aec6d0a993161f77ad47a040
This commit is contained in:
parent
0fb79b0aca
commit
14e3f8cd9b
@ -27,7 +27,7 @@
|
||||
* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
|
||||
* Batch blob read requests for `DB::MultiGet` using `MultiRead`.
|
||||
* Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result.
|
||||
* Add built-in rate limiter's implementation for `RateLimiter::GetTotalPendingRequests()` for the total number of requests that are pending for bytes in the rate limiter.
|
||||
* Add built-in rate limiter's implementation of `RateLimiter::GetTotalPendingRequest(int64_t* total_pending_requests, const Env::IOPriority pri)` for the total number of requests that are pending for bytes in the rate limiter.
|
||||
|
||||
### Public API change
|
||||
* Remove obsolete implementation details FullKey and ParseFullKey from public API
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/statistics.h"
|
||||
#include "rocksdb/status.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
@ -90,14 +91,17 @@ class RateLimiter {
|
||||
const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
|
||||
|
||||
// Total # of requests that are pending for bytes in rate limiter
|
||||
// For convenience, this function is implemented by the RateLimiter returned
|
||||
// by NewGenericRateLimiter but is not required by RocksDB. The default
|
||||
// implementation indicates "not supported".
|
||||
virtual int64_t GetTotalPendingRequests(
|
||||
// For convenience, this function is supported by the RateLimiter returned
|
||||
// by NewGenericRateLimiter but is not required by RocksDB.
|
||||
//
|
||||
// REQUIRED: total_pending_request != nullptr
|
||||
virtual Status GetTotalPendingRequests(
|
||||
int64_t* total_pending_requests,
|
||||
const Env::IOPriority pri = Env::IO_TOTAL) const {
|
||||
assert(false);
|
||||
assert(total_pending_requests != nullptr);
|
||||
(void)total_pending_requests;
|
||||
(void)pri;
|
||||
return -1;
|
||||
return Status::NotSupported();
|
||||
}
|
||||
|
||||
virtual int64_t GetBytesPerSecond() const = 0;
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/rate_limiter.h"
|
||||
#include "rocksdb/status.h"
|
||||
#include "rocksdb/system_clock.h"
|
||||
#include "util/mutexlock.h"
|
||||
#include "util/random.h"
|
||||
@ -72,17 +73,21 @@ class GenericRateLimiter : public RateLimiter {
|
||||
return total_requests_[pri];
|
||||
}
|
||||
|
||||
virtual int64_t GetTotalPendingRequests(
|
||||
virtual Status GetTotalPendingRequests(
|
||||
int64_t* total_pending_requests,
|
||||
const Env::IOPriority pri = Env::IO_TOTAL) const override {
|
||||
assert(total_pending_requests != nullptr);
|
||||
MutexLock g(&request_mutex_);
|
||||
if (pri == Env::IO_TOTAL) {
|
||||
int64_t total_pending_requests_sum = 0;
|
||||
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
|
||||
total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
|
||||
}
|
||||
return total_pending_requests_sum;
|
||||
*total_pending_requests = total_pending_requests_sum;
|
||||
} else {
|
||||
*total_pending_requests = static_cast<int64_t>(queue_[pri].size());
|
||||
}
|
||||
return static_cast<int64_t>(queue_[pri].size());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
virtual int64_t GetBytesPerSecond() const override {
|
||||
|
@ -100,9 +100,11 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
|
||||
std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
|
||||
200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
|
||||
10 /* fairness */));
|
||||
int64_t total_pending_requests = 0;
|
||||
for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
|
||||
ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast<Env::IOPriority>(i)),
|
||||
0);
|
||||
ASSERT_OK(limiter->GetTotalPendingRequests(
|
||||
&total_pending_requests, static_cast<Env::IOPriority>(i)));
|
||||
ASSERT_EQ(total_pending_requests, 0);
|
||||
}
|
||||
// This is a variable for making sure the following callback is called
|
||||
// and the assertions in it are indeed excuted
|
||||
@ -113,11 +115,23 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
|
||||
// We temporarily unlock the mutex so that the following
|
||||
// GetTotalPendingRequests() can acquire it
|
||||
request_mutex->Unlock();
|
||||
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1);
|
||||
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
|
||||
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
|
||||
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
|
||||
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1);
|
||||
for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
|
||||
EXPECT_OK(limiter->GetTotalPendingRequests(
|
||||
&total_pending_requests, static_cast<Env::IOPriority>(i)))
|
||||
<< "Failed to return total pending requests for priority level = "
|
||||
<< static_cast<Env::IOPriority>(i);
|
||||
if (i == Env::IO_USER || i == Env::IO_TOTAL) {
|
||||
EXPECT_EQ(total_pending_requests, 1)
|
||||
<< "Failed to correctly return total pending requests for "
|
||||
"priority level = "
|
||||
<< static_cast<Env::IOPriority>(i);
|
||||
} else {
|
||||
EXPECT_EQ(total_pending_requests, 0)
|
||||
<< "Failed to correctly return total pending requests for "
|
||||
"priority level = "
|
||||
<< static_cast<Env::IOPriority>(i);
|
||||
}
|
||||
}
|
||||
// We lock the mutex again so that the request thread can resume running
|
||||
// with the mutex locked
|
||||
request_mutex->Lock();
|
||||
@ -128,11 +142,16 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
|
||||
limiter->Request(200, Env::IO_USER, nullptr /* stats */,
|
||||
RateLimiter::OpType::kWrite);
|
||||
ASSERT_EQ(nonzero_pending_requests_verified, true);
|
||||
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0);
|
||||
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
|
||||
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
|
||||
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
|
||||
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0);
|
||||
for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
|
||||
EXPECT_OK(limiter->GetTotalPendingRequests(&total_pending_requests,
|
||||
static_cast<Env::IOPriority>(i)))
|
||||
<< "Failed to return total pending requests for priority level = "
|
||||
<< static_cast<Env::IOPriority>(i);
|
||||
EXPECT_EQ(total_pending_requests, 0)
|
||||
<< "Failed to correctly return total pending requests for priority "
|
||||
"level = "
|
||||
<< static_cast<Env::IOPriority>(i);
|
||||
}
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearCallBack(
|
||||
"GenericRateLimiter::Request:PostEnqueueRequest");
|
||||
|
Loading…
Reference in New Issue
Block a user