diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 44c1bdf8a..ae3ab8f84 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -23,7 +23,7 @@ class RateLimiter { // Request for token to write bytes. If this request can not be satisfied, // the call is blocked. Caller is responsible to make sure - // bytes < GetSingleBurstBytes() + // bytes <= GetSingleBurstBytes() virtual void Request(const int64_t bytes, const Env::IOPriority pri) = 0; // Max bytes can be granted in a single burst diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 723fb8e36..2a54d4cba 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -31,7 +31,7 @@ class GenericRateLimiter : public RateLimiter { // Request for token to write bytes. If this request can not be satisfied, // the call is blocked. Caller is responsible to make sure - // bytes < GetSingleBurstBytes() + // bytes <= GetSingleBurstBytes() virtual void Request(const int64_t bytes, const Env::IOPriority pri) override; virtual int64_t GetSingleBurstBytes() const override { diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 55dd74b42..cdcc0a621 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -17,6 +17,7 @@ #include "util/file_reader_writer.h" #include "util/logging.h" #include "util/string_util.h" +#include "rocksdb/rate_limiter.h" #include "rocksdb/transaction_log.h" #include "port/port.h" @@ -43,51 +44,6 @@ namespace rocksdb { -class BackupRateLimiter { - public: - BackupRateLimiter(Env* env, uint64_t max_bytes_per_second, - uint64_t bytes_per_check) - : env_(env), - max_bytes_per_second_(max_bytes_per_second), - bytes_per_check_(bytes_per_check), - micros_start_time_(env->NowMicros()), - bytes_since_start_(0) {} - - // thread safe - void ReportAndWait(uint64_t bytes_since_last_call) { - std::unique_lock lk(lock_); - - bytes_since_start_ += bytes_since_last_call; - if (bytes_since_start_ < bytes_per_check_) { - // not enough bytes to be rate-limited - return; - } - - uint64_t now = env_->NowMicros(); - uint64_t interval = now - micros_start_time_; - uint64_t should_take_micros = - (bytes_since_start_ * kMicrosInSecond) / max_bytes_per_second_; - - if (should_take_micros > interval) { - env_->SleepForMicroseconds( - static_cast(should_take_micros - interval)); - now = env_->NowMicros(); - } - // reset interval - micros_start_time_ = now; - bytes_since_start_ = 0; - } - - private: - Env* env_; - std::mutex lock_; - uint64_t max_bytes_per_second_; - uint64_t bytes_per_check_; - uint64_t micros_start_time_; - uint64_t bytes_since_start_; - static const uint64_t kMicrosInSecond = 1000 * 1000LL; -}; - void BackupStatistics::IncrementNumberSuccessBackup() { number_success_backup++; } @@ -314,7 +270,7 @@ class BackupEngineImpl : public BackupEngine { Env* src_env, Env* dst_env, bool sync, - BackupRateLimiter* rate_limiter, + RateLimiter* rate_limiter, uint64_t* size = nullptr, uint32_t* checksum_value = nullptr, uint64_t size_limit = 0); @@ -335,7 +291,7 @@ class BackupEngineImpl : public BackupEngine { Env* src_env; Env* dst_env; bool sync; - BackupRateLimiter* rate_limiter; + RateLimiter* rate_limiter; uint64_t size_limit; std::promise result; @@ -362,7 +318,7 @@ class BackupEngineImpl : public BackupEngine { Env* _src_env, Env* _dst_env, bool _sync, - BackupRateLimiter* _rate_limiter, + RateLimiter* _rate_limiter, uint64_t _size_limit) : src_path(std::move(_src_path)), dst_path(std::move(_dst_path)), @@ -440,7 +396,7 @@ class BackupEngineImpl : public BackupEngine { bool shared, const std::string& src_dir, const std::string& src_fname, // starts with "/" - BackupRateLimiter* rate_limiter, + RateLimiter* rate_limiter, uint64_t size_limit = 0, bool shared_checksum = false); @@ -683,11 +639,10 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { s = backup_env_->CreateDir( GetAbsolutePath(GetPrivateFileRel(new_backup_id, true))); - unique_ptr rate_limiter; + unique_ptr rate_limiter; if (options_.backup_rate_limit > 0) { - copy_file_buffer_size_ = options_.backup_rate_limit / 10; - rate_limiter.reset(new BackupRateLimiter(db_env_, - options_.backup_rate_limit, copy_file_buffer_size_)); + rate_limiter.reset(NewGenericRateLimiter(options_.backup_rate_limit)); + copy_file_buffer_size_ = rate_limiter->GetSingleBurstBytes(); } // A set into which we will insert the dst_paths that are calculated for live @@ -982,11 +937,10 @@ Status BackupEngineImpl::RestoreDBFromBackup( DeleteChildren(db_dir); } - unique_ptr rate_limiter; + unique_ptr rate_limiter; if (options_.restore_rate_limit > 0) { - copy_file_buffer_size_ = options_.restore_rate_limit / 10; - rate_limiter.reset(new BackupRateLimiter(db_env_, - options_.restore_rate_limit, copy_file_buffer_size_)); + rate_limiter.reset(NewGenericRateLimiter(options_.restore_rate_limit)); + copy_file_buffer_size_ = rate_limiter->GetSingleBurstBytes(); } Status s; std::vector restore_items_to_finish; @@ -1094,7 +1048,7 @@ Status BackupEngineImpl::CopyFile( const std::string& src, const std::string& dst, Env* src_env, Env* dst_env, bool sync, - BackupRateLimiter* rate_limiter, uint64_t* size, + RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value, uint64_t size_limit) { Status s; @@ -1152,7 +1106,7 @@ Status BackupEngineImpl::CopyFile( } s = dest_writer->Append(data); if (rate_limiter != nullptr) { - rate_limiter->ReportAndWait(data.size()); + rate_limiter->Request(data.size(), Env::IO_LOW); } } while (s.ok() && data.size() > 0 && size_limit > 0); @@ -1171,7 +1125,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem( bool shared, const std::string& src_dir, const std::string& src_fname, - BackupRateLimiter* rate_limiter, + RateLimiter* rate_limiter, uint64_t size_limit, bool shared_checksum) { assert(src_fname.size() > 0 && src_fname[0] == '/');