Replace BackupRateLimiter with GenericRateLimiter

Summary: BackupRateLimiter removed and uses replaced with the existing GenericRateLimiter

Test Plan:
make all check

make clean
USE_CLANG=1 make all

make clean
OPT=-DROCKSDB_LITE make release

Reviewers: leveldb, igor

Reviewed By: igor

Subscribers: igor, dhruba

Differential Revision: https://reviews.facebook.net/D46095
This commit is contained in:
Paul Marinescu 2015-09-03 17:00:09 -07:00
parent 20ef64cae0
commit 50dc5f0c5a
3 changed files with 16 additions and 62 deletions

View File

@ -23,7 +23,7 @@ class RateLimiter {
// Request for token to write bytes. If this request can not be satisfied, // Request for token to write bytes. If this request can not be satisfied,
// the call is blocked. Caller is responsible to make sure // the call is blocked. Caller is responsible to make sure
// bytes < GetSingleBurstBytes() // bytes <= GetSingleBurstBytes()
virtual void Request(const int64_t bytes, const Env::IOPriority pri) = 0; virtual void Request(const int64_t bytes, const Env::IOPriority pri) = 0;
// Max bytes can be granted in a single burst // Max bytes can be granted in a single burst

View File

@ -31,7 +31,7 @@ class GenericRateLimiter : public RateLimiter {
// Request for token to write bytes. If this request can not be satisfied, // Request for token to write bytes. If this request can not be satisfied,
// the call is blocked. Caller is responsible to make sure // the call is blocked. Caller is responsible to make sure
// bytes < GetSingleBurstBytes() // bytes <= GetSingleBurstBytes()
virtual void Request(const int64_t bytes, const Env::IOPriority pri) override; virtual void Request(const int64_t bytes, const Env::IOPriority pri) override;
virtual int64_t GetSingleBurstBytes() const override { virtual int64_t GetSingleBurstBytes() const override {

View File

@ -17,6 +17,7 @@
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "port/port.h" #include "port/port.h"
@ -43,51 +44,6 @@
namespace rocksdb { namespace rocksdb {
class BackupRateLimiter {
public:
BackupRateLimiter(Env* env, uint64_t max_bytes_per_second,
uint64_t bytes_per_check)
: env_(env),
max_bytes_per_second_(max_bytes_per_second),
bytes_per_check_(bytes_per_check),
micros_start_time_(env->NowMicros()),
bytes_since_start_(0) {}
// thread safe
void ReportAndWait(uint64_t bytes_since_last_call) {
std::unique_lock<std::mutex> lk(lock_);
bytes_since_start_ += bytes_since_last_call;
if (bytes_since_start_ < bytes_per_check_) {
// not enough bytes to be rate-limited
return;
}
uint64_t now = env_->NowMicros();
uint64_t interval = now - micros_start_time_;
uint64_t should_take_micros =
(bytes_since_start_ * kMicrosInSecond) / max_bytes_per_second_;
if (should_take_micros > interval) {
env_->SleepForMicroseconds(
static_cast<int>(should_take_micros - interval));
now = env_->NowMicros();
}
// reset interval
micros_start_time_ = now;
bytes_since_start_ = 0;
}
private:
Env* env_;
std::mutex lock_;
uint64_t max_bytes_per_second_;
uint64_t bytes_per_check_;
uint64_t micros_start_time_;
uint64_t bytes_since_start_;
static const uint64_t kMicrosInSecond = 1000 * 1000LL;
};
void BackupStatistics::IncrementNumberSuccessBackup() { void BackupStatistics::IncrementNumberSuccessBackup() {
number_success_backup++; number_success_backup++;
} }
@ -314,7 +270,7 @@ class BackupEngineImpl : public BackupEngine {
Env* src_env, Env* src_env,
Env* dst_env, Env* dst_env,
bool sync, bool sync,
BackupRateLimiter* rate_limiter, RateLimiter* rate_limiter,
uint64_t* size = nullptr, uint64_t* size = nullptr,
uint32_t* checksum_value = nullptr, uint32_t* checksum_value = nullptr,
uint64_t size_limit = 0); uint64_t size_limit = 0);
@ -335,7 +291,7 @@ class BackupEngineImpl : public BackupEngine {
Env* src_env; Env* src_env;
Env* dst_env; Env* dst_env;
bool sync; bool sync;
BackupRateLimiter* rate_limiter; RateLimiter* rate_limiter;
uint64_t size_limit; uint64_t size_limit;
std::promise<CopyResult> result; std::promise<CopyResult> result;
@ -362,7 +318,7 @@ class BackupEngineImpl : public BackupEngine {
Env* _src_env, Env* _src_env,
Env* _dst_env, Env* _dst_env,
bool _sync, bool _sync,
BackupRateLimiter* _rate_limiter, RateLimiter* _rate_limiter,
uint64_t _size_limit) uint64_t _size_limit)
: src_path(std::move(_src_path)), : src_path(std::move(_src_path)),
dst_path(std::move(_dst_path)), dst_path(std::move(_dst_path)),
@ -440,7 +396,7 @@ class BackupEngineImpl : public BackupEngine {
bool shared, bool shared,
const std::string& src_dir, const std::string& src_dir,
const std::string& src_fname, // starts with "/" const std::string& src_fname, // starts with "/"
BackupRateLimiter* rate_limiter, RateLimiter* rate_limiter,
uint64_t size_limit = 0, uint64_t size_limit = 0,
bool shared_checksum = false); bool shared_checksum = false);
@ -683,11 +639,10 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
s = backup_env_->CreateDir( s = backup_env_->CreateDir(
GetAbsolutePath(GetPrivateFileRel(new_backup_id, true))); GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)));
unique_ptr<BackupRateLimiter> rate_limiter; unique_ptr<RateLimiter> rate_limiter;
if (options_.backup_rate_limit > 0) { if (options_.backup_rate_limit > 0) {
copy_file_buffer_size_ = options_.backup_rate_limit / 10; rate_limiter.reset(NewGenericRateLimiter(options_.backup_rate_limit));
rate_limiter.reset(new BackupRateLimiter(db_env_, copy_file_buffer_size_ = rate_limiter->GetSingleBurstBytes();
options_.backup_rate_limit, copy_file_buffer_size_));
} }
// A set into which we will insert the dst_paths that are calculated for live // A set into which we will insert the dst_paths that are calculated for live
@ -982,11 +937,10 @@ Status BackupEngineImpl::RestoreDBFromBackup(
DeleteChildren(db_dir); DeleteChildren(db_dir);
} }
unique_ptr<BackupRateLimiter> rate_limiter; unique_ptr<RateLimiter> rate_limiter;
if (options_.restore_rate_limit > 0) { if (options_.restore_rate_limit > 0) {
copy_file_buffer_size_ = options_.restore_rate_limit / 10; rate_limiter.reset(NewGenericRateLimiter(options_.restore_rate_limit));
rate_limiter.reset(new BackupRateLimiter(db_env_, copy_file_buffer_size_ = rate_limiter->GetSingleBurstBytes();
options_.restore_rate_limit, copy_file_buffer_size_));
} }
Status s; Status s;
std::vector<RestoreAfterCopyWorkItem> restore_items_to_finish; std::vector<RestoreAfterCopyWorkItem> restore_items_to_finish;
@ -1094,7 +1048,7 @@ Status BackupEngineImpl::CopyFile(
const std::string& src, const std::string& src,
const std::string& dst, Env* src_env, const std::string& dst, Env* src_env,
Env* dst_env, bool sync, Env* dst_env, bool sync,
BackupRateLimiter* rate_limiter, uint64_t* size, RateLimiter* rate_limiter, uint64_t* size,
uint32_t* checksum_value, uint32_t* checksum_value,
uint64_t size_limit) { uint64_t size_limit) {
Status s; Status s;
@ -1152,7 +1106,7 @@ Status BackupEngineImpl::CopyFile(
} }
s = dest_writer->Append(data); s = dest_writer->Append(data);
if (rate_limiter != nullptr) { if (rate_limiter != nullptr) {
rate_limiter->ReportAndWait(data.size()); rate_limiter->Request(data.size(), Env::IO_LOW);
} }
} while (s.ok() && data.size() > 0 && size_limit > 0); } while (s.ok() && data.size() > 0 && size_limit > 0);
@ -1171,7 +1125,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
bool shared, bool shared,
const std::string& src_dir, const std::string& src_dir,
const std::string& src_fname, const std::string& src_fname,
BackupRateLimiter* rate_limiter, RateLimiter* rate_limiter,
uint64_t size_limit, uint64_t size_limit,
bool shared_checksum) { bool shared_checksum) {
assert(src_fname.size() > 0 && src_fname[0] == '/'); assert(src_fname.size() > 0 && src_fname[0] == '/');