Support lowering CPU priority of background threads
Summary: Background activities like compaction can negatively affect latency of higher-priority tasks like request processing. To avoid this, rocksdb already lowers the IO priority of background threads on Linux systems. While this takes care of typical IO-bound systems, it does not help much when CPU (temporarily) becomes the bottleneck. This is especially likely when using more expensive compression settings. This patch adds an API to allow for lowering the CPU priority of background threads, modeled on the IO priority API. Benchmarks (see below) show significant latency and throughput improvements when CPU bound. As a result, workloads with some CPU usage bursts should benefit from lower latencies at a given utilization, or should be able to push utilization higher at a given request latency target. A useful side effect is that compaction CPU usage is now easily visible in common tools, allowing for an easier estimation of the contribution of compaction vs. request processing threads. As with IO priority, the implementation is limited to Linux, degrading to a no-op on other systems. Closes https://github.com/facebook/rocksdb/pull/3763 Differential Revision: D7740096 Pulled By: gwicke fbshipit-source-id: e5d32373e8dc403a7b0c2227023f9ce4f22b413c
This commit is contained in:
parent
affe01b0d5
commit
090c78a0d7
@ -10,6 +10,7 @@
|
||||
* Introduce TTL for level compaction so that all files older than ttl go through the compaction process to get rid of old data.
|
||||
* TransactionDBOptions::write_policy can be configured to enable WritePrepared 2PC transactions. Read more about them in the wiki.
|
||||
* Add DB properties "rocksdb.block-cache-capacity", "rocksdb.block-cache-usage", "rocksdb.block-cache-pinned-usage" to show block cache usage.
|
||||
* Add `Env::LowerThreadPoolCPUPriority(Priority)` method, which lowers the CPU priority of background (esp. compaction) threads to minimize interference with foreground tasks.
|
||||
|
||||
### Bug Fixes
|
||||
* Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob.
|
||||
|
9
env/env_posix.cc
vendored
9
env/env_posix.cc
vendored
@ -815,6 +815,15 @@ class PosixEnv : public Env {
|
||||
#endif
|
||||
}
|
||||
|
||||
virtual void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
|
||||
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
|
||||
#ifdef OS_LINUX
|
||||
thread_pools_[pool].LowerCPUPriority();
|
||||
#else
|
||||
(void)pool;
|
||||
#endif
|
||||
}
|
||||
|
||||
virtual std::string TimeToString(uint64_t secondsSince1970) override {
|
||||
const time_t seconds = (time_t)secondsSince1970;
|
||||
struct tm t;
|
||||
|
@ -397,6 +397,9 @@ class Env {
|
||||
// Lower IO priority for threads from the specified pool.
|
||||
virtual void LowerThreadPoolIOPriority(Priority /*pool*/ = LOW) {}
|
||||
|
||||
// Lower CPU priority for threads from the specified pool.
|
||||
virtual void LowerThreadPoolCPUPriority(Priority /*pool*/ = LOW) {}
|
||||
|
||||
// Converts seconds-since-Jan-01-1970 to a printable string
|
||||
virtual std::string TimeToString(uint64_t time) = 0;
|
||||
|
||||
@ -1092,6 +1095,10 @@ class EnvWrapper : public Env {
|
||||
target_->LowerThreadPoolIOPriority(pool);
|
||||
}
|
||||
|
||||
void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
|
||||
target_->LowerThreadPoolCPUPriority(pool);
|
||||
}
|
||||
|
||||
std::string TimeToString(uint64_t time) override {
|
||||
return target_->TimeToString(time);
|
||||
}
|
||||
|
@ -992,6 +992,8 @@ DEFINE_int32(memtable_insert_with_hint_prefix_size, 0,
|
||||
"memtable insert with hint with the given prefix size.");
|
||||
DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction "
|
||||
"threads' IO priority");
|
||||
DEFINE_bool(enable_cpu_prio, false, "Lower the background flush/compaction "
|
||||
"threads' CPU priority");
|
||||
DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
|
||||
"table becomes an identity function. This is only valid when key "
|
||||
"is 8 bytes");
|
||||
@ -3321,6 +3323,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
||||
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
|
||||
FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH);
|
||||
}
|
||||
if (FLAGS_enable_cpu_prio) {
|
||||
FLAGS_env->LowerThreadPoolCPUPriority(Env::LOW);
|
||||
FLAGS_env->LowerThreadPoolCPUPriority(Env::HIGH);
|
||||
}
|
||||
options.env = FLAGS_env;
|
||||
|
||||
if (FLAGS_rate_limiter_bytes_per_sec > 0) {
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
#ifdef OS_LINUX
|
||||
# include <sys/syscall.h>
|
||||
# include <sys/resource.h>
|
||||
#endif
|
||||
|
||||
#include <stdlib.h>
|
||||
@ -54,6 +55,8 @@ struct ThreadPoolImpl::Impl {
|
||||
|
||||
void LowerIOPriority();
|
||||
|
||||
void LowerCPUPriority();
|
||||
|
||||
void WakeUpAllThreads() {
|
||||
bgsignal_.notify_all();
|
||||
}
|
||||
@ -98,6 +101,7 @@ private:
|
||||
static void* BGThreadWrapper(void* arg);
|
||||
|
||||
bool low_io_priority_;
|
||||
bool low_cpu_priority_;
|
||||
Env::Priority priority_;
|
||||
Env* env_;
|
||||
|
||||
@ -126,6 +130,7 @@ inline
|
||||
ThreadPoolImpl::Impl::Impl()
|
||||
:
|
||||
low_io_priority_(false),
|
||||
low_cpu_priority_(false),
|
||||
priority_(Env::LOW),
|
||||
env_(nullptr),
|
||||
total_threads_limit_(0),
|
||||
@ -172,9 +177,16 @@ void ThreadPoolImpl::Impl::LowerIOPriority() {
|
||||
low_io_priority_ = true;
|
||||
}
|
||||
|
||||
inline
|
||||
void ThreadPoolImpl::Impl::LowerCPUPriority() {
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
low_cpu_priority_ = true;
|
||||
}
|
||||
|
||||
void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
|
||||
bool low_io_priority = false;
|
||||
bool low_cpu_priority = false;
|
||||
|
||||
while (true) {
|
||||
// Wait until there is an item that is ready to run
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
@ -214,9 +226,20 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
|
||||
std::memory_order_relaxed);
|
||||
|
||||
bool decrease_io_priority = (low_io_priority != low_io_priority_);
|
||||
bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
|
||||
lock.unlock();
|
||||
|
||||
#ifdef OS_LINUX
|
||||
if (decrease_cpu_priority) {
|
||||
setpriority(
|
||||
PRIO_PROCESS,
|
||||
// Current thread.
|
||||
0,
|
||||
// Lowest priority possible.
|
||||
19);
|
||||
low_cpu_priority = true;
|
||||
}
|
||||
|
||||
if (decrease_io_priority) {
|
||||
#define IOPRIO_CLASS_SHIFT (13)
|
||||
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
|
||||
@ -237,6 +260,7 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
|
||||
}
|
||||
#else
|
||||
(void)decrease_io_priority; // avoid 'unused variable' error
|
||||
(void)decrease_cpu_priority;
|
||||
#endif
|
||||
func();
|
||||
}
|
||||
@ -425,6 +449,10 @@ void ThreadPoolImpl::LowerIOPriority() {
|
||||
impl_->LowerIOPriority();
|
||||
}
|
||||
|
||||
void ThreadPoolImpl::LowerCPUPriority() {
|
||||
impl_->LowerCPUPriority();
|
||||
}
|
||||
|
||||
void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
|
||||
impl_->SetBackgroundThreadsInternal(num, false);
|
||||
}
|
||||
|
@ -46,10 +46,14 @@ class ThreadPoolImpl : public ThreadPool {
|
||||
// start yet
|
||||
void WaitForJobsAndJoinAllThreads() override;
|
||||
|
||||
// Make threads to run at a lower kernel priority
|
||||
// Make threads to run at a lower kernel IO priority
|
||||
// Currently only has effect on Linux
|
||||
void LowerIOPriority();
|
||||
|
||||
// Make threads to run at a lower kernel CPU priority
|
||||
// Currently only has effect on Linux
|
||||
void LowerCPUPriority();
|
||||
|
||||
// Ensure there is at aleast num threads in the pool
|
||||
// but do not kill threads if there are more
|
||||
void IncBackgroundThreadsIfNeeded(int num);
|
||||
|
Loading…
x
Reference in New Issue
Block a user