diff --git a/Makefile b/Makefile index ca419f08b..606a92c1e 100644 --- a/Makefile +++ b/Makefile @@ -112,7 +112,8 @@ TESTS = \ deletefile_test \ table_test \ thread_local_test \ - geodb_test + geodb_test \ + rate_limiter_test TOOLS = \ sst_dump \ @@ -356,6 +357,9 @@ dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) env_test: util/env_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) util/env_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +rate_limiter_test: util/rate_limiter_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) util/rate_limiter_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 6a963510f..9698c66ae 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -194,8 +194,16 @@ class Env { // REQUIRES: lock has not already been unlocked. virtual Status UnlockFile(FileLock* lock) = 0; + // Priority for scheduling job in thread pool enum Priority { LOW, HIGH, TOTAL }; + // Priority for scheduling job in thread pool + enum IOPriority { + IO_LOW = 0, + IO_HIGH = 1, + IO_TOTAL = 2 + }; + // Arrange to run "(*function)(arg)" once in a background thread, in // the thread pool specified by pri. By default, jobs go to the 'LOW' // priority thread pool. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 928149332..f1f807c15 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -34,6 +34,7 @@ class Snapshot; class TableFactory; class MemTableRepFactory; class TablePropertiesCollectorFactory; +class RateLimiter; class Slice; class SliceTransform; class Statistics; @@ -998,6 +999,21 @@ struct FlushOptions { FlushOptions() : wait(true) {} }; + +// Create a RateLimiter object, which can be shared among RocksDB instances to +// control write rate of flush and compaction. +// @rate_bytes_per_sec: desired total write rate in bytes per second. +// @refill_period_us: token refill interval in micro-second. +// @fairness: RateLimiter accepts high-pri requests and low-pri requests. +// low-pri request is usually blocked in favor of hi-pri request. To prevent +// low-pri request from being blocked for too long, it can get processed first +// by 1/fairness chance. +extern RateLimiter* NewRateLimiter( + int64_t rate_bytes_per_sec, + int64_t refill_period_us = 100 * 1000, + int32_t fairness = 10); + + } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ diff --git a/port/port_posix.cc b/port/port_posix.cc index f353c475e..c5ea439eb 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -31,7 +31,7 @@ static int PthreadCall(const char* label, int result) { Mutex::Mutex(bool adaptive) { #ifdef OS_LINUX if (!adaptive) { - PthreadCall("init mutex", pthread_mutex_init(&mu_, NULL)); + PthreadCall("init mutex", pthread_mutex_init(&mu_, nullptr)); } else { pthread_mutexattr_t mutex_attr; PthreadCall("init mutex attr", pthread_mutexattr_init(&mutex_attr)); @@ -43,7 +43,7 @@ Mutex::Mutex(bool adaptive) { pthread_mutexattr_destroy(&mutex_attr)); } #else // ignore adaptive for non-linux platform - PthreadCall("init mutex", pthread_mutex_init(&mu_, NULL)); + PthreadCall("init mutex", pthread_mutex_init(&mu_, nullptr)); #endif // OS_LINUX } @@ -71,7 +71,7 @@ void Mutex::AssertHeld() { CondVar::CondVar(Mutex* mu) : mu_(mu) { - PthreadCall("init cv", pthread_cond_init(&cv_, NULL)); + PthreadCall("init cv", pthread_cond_init(&cv_, nullptr)); } CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); } @@ -115,7 +115,9 @@ void CondVar::SignalAll() { PthreadCall("broadcast", pthread_cond_broadcast(&cv_)); } -RWMutex::RWMutex() { PthreadCall("init mutex", pthread_rwlock_init(&mu_, NULL)); } +RWMutex::RWMutex() { + PthreadCall("init mutex", pthread_rwlock_init(&mu_, nullptr)); +} RWMutex::~RWMutex() { PthreadCall("destroy mutex", pthread_rwlock_destroy(&mu_)); } diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc new file mode 100644 index 000000000..4f7a09e31 --- /dev/null +++ b/util/rate_limiter.cc @@ -0,0 +1,200 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/rate_limiter.h" +#include "rocksdb/env.h" + +namespace rocksdb { + + +// Pending request +struct RateLimiter::Req { + explicit Req(int64_t bytes, port::Mutex* mu) : + bytes(bytes), cv(mu), granted(false) {} + int64_t bytes; + port::CondVar cv; + bool granted; +}; + + +RateLimiter::RateLimiter(int64_t rate_bytes_per_sec, int64_t refill_period_us, + int32_t fairness) + : refill_period_us_(refill_period_us), + refill_bytes_per_period_(rate_bytes_per_sec * refill_period_us / 1000000.0), + env_(Env::Default()), + stop_(false), + exit_cv_(&request_mutex_), + requests_to_wait_(0), + total_requests_{0, 0}, + total_bytes_through_{0, 0}, + available_bytes_(0), + next_refill_us_(env_->NowMicros()), + fairness_(fairness > 100 ? 100 : fairness), + rnd_((uint32_t)time(nullptr)), + leader_(nullptr) { + total_bytes_through_[0] = 0; + total_bytes_through_[1] = 0; +} + +RateLimiter::~RateLimiter() { + MutexLock g(&request_mutex_); + stop_ = true; + requests_to_wait_ = queue_[Env::IO_LOW].size() + queue_[Env::IO_HIGH].size(); + for (auto& r : queue_[Env::IO_HIGH]) { + r->cv.Signal(); + } + for (auto& r : queue_[Env::IO_LOW]) { + r->cv.Signal(); + } + while (requests_to_wait_ > 0) { + exit_cv_.Wait(); + } +} + +void RateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { + assert(bytes < refill_bytes_per_period_); + + MutexLock g(&request_mutex_); + if (stop_) { + return; + } + + ++total_requests_[pri]; + + if (available_bytes_ >= bytes) { + // Refill thread assigns quota and notifies requests waiting on + // the queue under mutex. So if we get here, that means nobody + // is waiting? + available_bytes_ -= bytes; + total_bytes_through_[pri] += bytes; + return; + } + + // Request cannot be satisfied at this moment, enqueue + Req r(bytes, &request_mutex_); + queue_[pri].push_back(&r); + + do { + bool timedout = false; + // Leader election, candidates can be: + // (1) a new incoming request, + // (2) a previous leader, whose quota has not been not assigned yet due + // to lower priority + // (3) a previous waiter at the front of queue, who got notified by + // previous leader + if (leader_ == nullptr && + ((!queue_[Env::IO_HIGH].empty() && + &r == queue_[Env::IO_HIGH].front()) || + (!queue_[Env::IO_LOW].empty() && + &r == queue_[Env::IO_LOW].front()))) { + leader_ = &r; + timedout = r.cv.TimedWait(next_refill_us_); + } else { + // Not at the front of queue or an leader has already been elected + r.cv.Wait(); + } + + // request_mutex_ is held from now on + if (stop_) { + --requests_to_wait_; + exit_cv_.Signal(); + return; + } + + // Make sure the waken up request is always the header of its queue + assert(r.granted || + (!queue_[Env::IO_HIGH].empty() && + &r == queue_[Env::IO_HIGH].front()) || + (!queue_[Env::IO_LOW].empty() && + &r == queue_[Env::IO_LOW].front())); + assert(leader_ == nullptr || + (!queue_[Env::IO_HIGH].empty() && + leader_ == queue_[Env::IO_HIGH].front()) || + (!queue_[Env::IO_LOW].empty() && + leader_ == queue_[Env::IO_LOW].front())); + + if (leader_ == &r) { + // Waken up from TimedWait() + if (timedout) { + // Time to do refill! + Refill(); + + // Re-elect a new leader regardless. This is to simplify the + // election handling. + leader_ = nullptr; + + // Notify the header of queue if current leader is going away + if (r.granted) { + // Current leader already got granted with quota. Notify header + // of waiting queue to participate next round of election. + assert((queue_[Env::IO_HIGH].empty() || + &r != queue_[Env::IO_HIGH].front()) && + (queue_[Env::IO_LOW].empty() || + &r != queue_[Env::IO_LOW].front())); + if (!queue_[Env::IO_HIGH].empty()) { + queue_[Env::IO_HIGH].front()->cv.Signal(); + } else if (!queue_[Env::IO_LOW].empty()) { + queue_[Env::IO_LOW].front()->cv.Signal(); + } + // Done + break; + } + } else { + // Spontaneous wake up, need to continue to wait + assert(!r.granted); + leader_ = nullptr; + } + } else { + // Waken up by previous leader: + // (1) if requested quota is granted, it is done. + // (2) if requested quota is not granted, this means current thread + // was picked as a new leader candidate (previous leader got quota). + // It needs to participate leader election because a new request may + // come in before this thread gets waken up. So it may actually need + // to do Wait() again. + assert(!timedout); + } + } while (!r.granted); +} + +void RateLimiter::Refill() { + next_refill_us_ = env_->NowMicros() + refill_period_us_; + // Carry over the left over quota from the last period + if (available_bytes_ < refill_bytes_per_period_) { + available_bytes_ += refill_bytes_per_period_; + } + + int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1; + for (int q = 0; q < 2; ++q) { + auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH; + auto* queue = &queue_[use_pri]; + while (!queue->empty()) { + auto* next_req = queue->front(); + if (available_bytes_ < next_req->bytes) { + break; + } + available_bytes_ -= next_req->bytes; + total_bytes_through_[use_pri] += next_req->bytes; + queue->pop_front(); + + next_req->granted = true; + if (next_req != leader_) { + // Quota granted, signal the thread + next_req->cv.Signal(); + } + } + } +} + +RateLimiter* NewRateLimiter( + int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) { + return new RateLimiter(rate_bytes_per_sec, refill_period_us, fairness); +} + +} // namespace rocksdb diff --git a/util/rate_limiter.h b/util/rate_limiter.h new file mode 100644 index 000000000..28624c395 --- /dev/null +++ b/util/rate_limiter.h @@ -0,0 +1,88 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include + +#include "port/port_posix.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "rocksdb/env.h" + +namespace rocksdb { + +class RateLimiter { + public: + RateLimiter(int64_t refill_bytes, int64_t refill_period_us, int32_t fairness); + + ~RateLimiter(); + + // Request for token to write bytes. If this request can not be satisfied, + // the call is blocked. Caller is responsible to make sure + // bytes < GetSingleBurstBytes() + void Request(const int64_t bytes, const Env::IOPriority pri); + + int64_t GetTotalBytesThrough( + const Env::IOPriority pri = Env::IO_TOTAL) const { + MutexLock g(&request_mutex_); + if (pri == Env::IO_TOTAL) { + return total_bytes_through_[Env::IO_LOW] + + total_bytes_through_[Env::IO_HIGH]; + } + return total_bytes_through_[pri]; + } + + int64_t GetTotalRequests(const Env::IOPriority pri = Env::IO_TOTAL) const { + MutexLock g(&request_mutex_); + if (pri == Env::IO_TOTAL) { + return total_requests_[Env::IO_LOW] + total_requests_[Env::IO_HIGH]; + } + return total_requests_[pri]; + } + + int64_t GetSingleBurstBytes() const { + // const var + return refill_bytes_per_period_; + } + + int64_t GetAvailableBytes() const { + MutexLock g(&request_mutex_); + return available_bytes_; + } + + private: + void Refill(); + + // This mutex guard all internal states + mutable port::Mutex request_mutex_; + + const int64_t refill_period_us_; + const int64_t refill_bytes_per_period_; + Env* const env_; + + bool stop_; + port::CondVar exit_cv_; + int32_t requests_to_wait_; + + int64_t total_requests_[Env::IO_TOTAL]; + int64_t total_bytes_through_[Env::IO_TOTAL]; + int64_t available_bytes_; + int64_t next_refill_us_; + + int32_t fairness_; + Random rnd_; + + struct Req; + Req* leader_; + std::deque queue_[Env::IO_TOTAL]; +}; + +} // namespace rocksdb diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc new file mode 100644 index 000000000..348a8f628 --- /dev/null +++ b/util/rate_limiter_test.cc @@ -0,0 +1,80 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include +#include "util/testharness.h" +#include "util/rate_limiter.h" +#include "util/random.h" +#include "rocksdb/env.h" + +namespace rocksdb { + +class RateLimiterTest { +}; + +TEST(RateLimiterTest, StartStop) { + std::unique_ptr limiter(new RateLimiter(100, 100, 10)); +} + +TEST(RateLimiterTest, Rate) { + auto* env = Env::Default(); + struct Arg { + Arg(int64_t target_rate, int burst) + : limiter(new RateLimiter(target_rate, 100 * 1000, 10)), + request_size(target_rate / 10), + burst(burst) {} + std::unique_ptr limiter; + int64_t request_size; + int burst; + }; + + auto writer = [](void* p) { + auto* env = Env::Default(); + auto* arg = static_cast(p); + // Test for 2 seconds + auto until = env->NowMicros() + 2 * 1000000; + Random r((uint32_t)(env->NowNanos() % + std::numeric_limits::max())); + while (env->NowMicros() < until) { + for (int i = 0; i < static_cast(r.Skewed(arg->burst) + 1); ++i) { + arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, + Env::IO_HIGH); + } + arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, + Env::IO_LOW); + } + }; + + for (int i = 1; i <= 16; i*=2) { + int64_t target = i * 1024 * 10; + Arg arg(target, i / 4 + 1); + auto start = env->NowMicros(); + for (int t = 0; t < i; ++t) { + env->StartThread(writer, &arg); + } + env->WaitForJoin(); + + auto elapsed = env->NowMicros() - start; + double rate = arg.limiter->GetTotalBytesThrough() + * 1000000.0 / elapsed; + fprintf(stderr, "request size [1 - %ld], limit %ld KB/sec, " + "actual rate: %lf KB/sec, elapsed %.2lf seconds\n", + arg.request_size - 1, target / 1024, rate / 1024, + elapsed / 1000000.0); + + ASSERT_GE(rate / target, 0.95); + ASSERT_LE(rate / target, 1.05); + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + return rocksdb::test::RunAllTests(); +} diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 3ac1d90a1..48fc2905a 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -29,9 +29,10 @@ namespace rocksdb { namespace { -class RateLimiter { +class BackupRateLimiter { public: - RateLimiter(Env* env, uint64_t max_bytes_per_second, uint64_t bytes_per_check) + BackupRateLimiter(Env* env, uint64_t max_bytes_per_second, + uint64_t bytes_per_check) : env_(env), max_bytes_per_second_(max_bytes_per_second), bytes_per_check_(bytes_per_check), @@ -240,7 +241,7 @@ class BackupEngineImpl : public BackupEngine { Env* src_env, Env* dst_env, bool sync, - RateLimiter* rate_limiter, + BackupRateLimiter* rate_limiter, uint64_t* size = nullptr, uint32_t* checksum_value = nullptr, uint64_t size_limit = 0); @@ -250,7 +251,7 @@ class BackupEngineImpl : public BackupEngine { bool shared, const std::string& src_dir, const std::string& src_fname, // starts with "/" - RateLimiter* rate_limiter, + BackupRateLimiter* rate_limiter, uint64_t size_limit = 0, bool shared_checksum = false); @@ -447,11 +448,11 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { s = backup_env_->CreateDir( GetAbsolutePath(GetPrivateFileRel(new_backup_id, true))); - unique_ptr rate_limiter; + unique_ptr rate_limiter; if (options_.backup_rate_limit > 0) { copy_file_buffer_size_ = options_.backup_rate_limit / 10; - rate_limiter.reset(new RateLimiter(db_env_, options_.backup_rate_limit, - copy_file_buffer_size_)); + rate_limiter.reset(new BackupRateLimiter(db_env_, + options_.backup_rate_limit, copy_file_buffer_size_)); } // copy live_files @@ -636,11 +637,11 @@ Status BackupEngineImpl::RestoreDBFromBackup( DeleteChildren(db_dir); } - unique_ptr rate_limiter; + unique_ptr rate_limiter; if (options_.restore_rate_limit > 0) { copy_file_buffer_size_ = options_.restore_rate_limit / 10; - rate_limiter.reset(new RateLimiter(db_env_, options_.restore_rate_limit, - copy_file_buffer_size_)); + rate_limiter.reset(new BackupRateLimiter(db_env_, + options_.restore_rate_limit, copy_file_buffer_size_)); } Status s; for (auto& file : backup.GetFiles()) { @@ -752,12 +753,13 @@ Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) { return s; } -Status BackupEngineImpl::CopyFile(const std::string& src, - const std::string& dst, Env* src_env, - Env* dst_env, bool sync, - RateLimiter* rate_limiter, uint64_t* size, - uint32_t* checksum_value, - uint64_t size_limit) { +Status BackupEngineImpl::CopyFile( + const std::string& src, + const std::string& dst, Env* src_env, + Env* dst_env, bool sync, + BackupRateLimiter* rate_limiter, uint64_t* size, + uint32_t* checksum_value, + uint64_t size_limit) { Status s; unique_ptr dst_file; unique_ptr src_file; @@ -824,7 +826,7 @@ Status BackupEngineImpl::CopyFile(const std::string& src, Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup, bool shared, const std::string& src_dir, const std::string& src_fname, - RateLimiter* rate_limiter, + BackupRateLimiter* rate_limiter, uint64_t size_limit, bool shared_checksum) {