rocksdb/util/concurrent_task_limiter_impl.cc

67 lines
2.1 KiB
C++
Raw Normal View History

Concurrent task limiter for compaction thread control (#4332) Summary: The PR is targeting to resolve the issue of: https://github.com/facebook/rocksdb/issues/3972#issue-330771918 We have a rocksdb created with leveled-compaction with multiple column families (CFs), some of CFs are using HDD to store big and less frequently accessed data and others are using SSD. When there are continuously write traffics going on to all CFs, the compaction thread pool is mostly occupied by those slow HDD compactions, which blocks fully utilize SSD bandwidth. Since atomic write and transaction is needed across CFs, so splitting it to multiple rocksdb instance is not an option for us. With the compaction thread control, we got 30%+ HDD write throughput gain, and also a lot smooth SSD write since less write stall happening. ConcurrentTaskLimiter can be shared with multi-CFs across rocksdb instances, so the feature does not only work for multi-CFs scenarios, but also for multi-rocksdbs scenarios, who need disk IO resource control per tenant. The usage is straight forward: e.g.: // // Enable compaction thread limiter thru ColumnFamilyOptions // std::shared_ptr<ConcurrentTaskLimiter> ctl(NewConcurrentTaskLimiter("foo_limiter", 4)); Options options; ColumnFamilyOptions cf_opt(options); cf_opt.compaction_thread_limiter = ctl; ... // // Compaction thread limiter can be tuned or disabled on-the-fly // ctl->SetMaxOutstandingTask(12); // enlarge to 12 tasks ... ctl->ResetMaxOutstandingTask(); // disable (bypass) thread limiter ctl->SetMaxOutstandingTask(-1); // Same as above ... ctl->SetMaxOutstandingTask(0); // full throttle (0 task) // // Sharing compaction thread limiter among CFs (to resolve multiple storage perf issue) // std::shared_ptr<ConcurrentTaskLimiter> ctl_ssd(NewConcurrentTaskLimiter("ssd_limiter", 8)); std::shared_ptr<ConcurrentTaskLimiter> ctl_hdd(NewConcurrentTaskLimiter("hdd_limiter", 4)); Options options; ColumnFamilyOptions cf_opt_ssd1(options); ColumnFamilyOptions cf_opt_ssd2(options); ColumnFamilyOptions cf_opt_hdd1(options); ColumnFamilyOptions cf_opt_hdd2(options); ColumnFamilyOptions cf_opt_hdd3(options); // SSD CFs cf_opt_ssd1.compaction_thread_limiter = ctl_ssd; cf_opt_ssd2.compaction_thread_limiter = ctl_ssd; // HDD CFs cf_opt_hdd1.compaction_thread_limiter = ctl_hdd; cf_opt_hdd2.compaction_thread_limiter = ctl_hdd; cf_opt_hdd3.compaction_thread_limiter = ctl_hdd; ... // // The limiter is disabled by default (or set to nullptr explicitly) // Options options; ColumnFamilyOptions cf_opt(options); cf_opt.compaction_thread_limiter = nullptr; Pull Request resolved: https://github.com/facebook/rocksdb/pull/4332 Differential Revision: D13226590 Pulled By: siying fbshipit-source-id: 14307aec55b8bd59c8223d04aa6db3c03d1b0c1d
2018-12-13 22:16:04 +01:00
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/concurrent_task_limiter_impl.h"
#include "rocksdb/concurrent_task_limiter.h"
namespace rocksdb {
ConcurrentTaskLimiterImpl::ConcurrentTaskLimiterImpl(
const std::string& name, int32_t max_outstanding_task)
: name_(name),
max_outstanding_tasks_{max_outstanding_task},
outstanding_tasks_{0} {
}
ConcurrentTaskLimiterImpl::~ConcurrentTaskLimiterImpl() {
}
const std::string& ConcurrentTaskLimiterImpl::GetName() const {
return name_;
}
void ConcurrentTaskLimiterImpl::SetMaxOutstandingTask(int32_t limit) {
max_outstanding_tasks_.store(limit, std::memory_order_relaxed);
}
void ConcurrentTaskLimiterImpl::ResetMaxOutstandingTask() {
max_outstanding_tasks_.store(-1, std::memory_order_relaxed);
}
int32_t ConcurrentTaskLimiterImpl::GetOutstandingTask() const {
return outstanding_tasks_.load(std::memory_order_relaxed);
}
std::unique_ptr<TaskLimiterToken> ConcurrentTaskLimiterImpl::GetToken(
bool force) {
int32_t limit = max_outstanding_tasks_.load(std::memory_order_relaxed);
int32_t tasks = outstanding_tasks_.load(std::memory_order_relaxed);
// force = true, bypass the throttle.
// limit < 0 means unlimited tasks.
while (force || limit < 0 || tasks < limit) {
if (outstanding_tasks_.compare_exchange_weak(tasks, tasks + 1)) {
return std::unique_ptr<TaskLimiterToken>(new TaskLimiterToken(this));
}
}
return nullptr;
}
ConcurrentTaskLimiter* NewConcurrentTaskLimiter(
const std::string& name, int32_t limit) {
return new ConcurrentTaskLimiterImpl(name, limit);
}
TaskLimiterToken::~TaskLimiterToken() {
--limiter_->outstanding_tasks_;
assert(limiter_->outstanding_tasks_ >= 0);
}
} // namespace rocksdb