22d4dc5066
Summary: EndWriteStall has a data race: `queue_.empty()` is checked outside of the mutex, so once we enter the critical section another thread may already have cleared the list, and accessing the `front()` is undefined behavior (and causes interesting crashes under high concurrency). This PR fixes the bug, and also rewrites the logic to make it easier to reason about it. It also fixes another subtle bug: if some writers are stalled and `SetBufferSize(0)` is called, which disables the WBM, the writer are not unblocked because of an early `enabled()` check in `EndWriteStall()`. It doesn't significantly change the locking behavior, as before writers won't lock unless entering a stall condition, and `FreeMem` almost always locks if stalling is allowed, but that is inevitable with the current design. Liveness is guaranteed by the fact that if some writes are blocked, eventually all writes will be blocked due to `stall_active_`, and eventually all memory is freed. While at it, do a couple of optimizations: - In `WBMStallInterface::Signal()` signal the CV only after releasing the lock. Signaling under the lock is a common pitfall, as it causes the woken-up thread to immediately go back to sleep because the mutex is still locked by the awaker. - Move all allocations and deallocations outside of the lock. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9009 Test Plan: ``` USE_CLANG=1 make -j64 all check ``` Reviewed By: akankshamahajan15 Differential Revision: D31550668 Pulled By: ot fbshipit-source-id: 5125387c3dc7ecaaa2b8bbc736e58c4156698580
204 lines
6.2 KiB
C++
204 lines
6.2 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "rocksdb/write_buffer_manager.h"
|
|
|
|
#include "cache/cache_entry_roles.h"
|
|
#include "cache/cache_reservation_manager.h"
|
|
#include "db/db_impl/db_impl.h"
|
|
#include "rocksdb/status.h"
|
|
#include "util/coding.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
WriteBufferManager::WriteBufferManager(size_t _buffer_size,
|
|
std::shared_ptr<Cache> cache,
|
|
bool allow_stall)
|
|
: buffer_size_(_buffer_size),
|
|
mutable_limit_(buffer_size_ * 7 / 8),
|
|
memory_used_(0),
|
|
memory_active_(0),
|
|
cache_rev_mng_(nullptr),
|
|
allow_stall_(allow_stall),
|
|
stall_active_(false) {
|
|
#ifndef ROCKSDB_LITE
|
|
if (cache) {
|
|
// Memtable's memory usage tends to fluctuate frequently
|
|
// therefore we set delayed_decrease = true to save some dummy entry
|
|
// insertion on memory increase right after memory decrease
|
|
cache_rev_mng_.reset(
|
|
new CacheReservationManager(cache, true /* delayed_decrease */));
|
|
}
|
|
#else
|
|
(void)cache;
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
WriteBufferManager::~WriteBufferManager() {
|
|
#ifndef NDEBUG
|
|
std::unique_lock<std::mutex> lock(mu_);
|
|
assert(queue_.empty());
|
|
#endif
|
|
}
|
|
|
|
std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
|
|
if (cache_rev_mng_ != nullptr) {
|
|
return cache_rev_mng_->GetTotalReservedCacheSize();
|
|
} else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
void WriteBufferManager::ReserveMem(size_t mem) {
|
|
if (cache_rev_mng_ != nullptr) {
|
|
ReserveMemWithCache(mem);
|
|
} else if (enabled()) {
|
|
memory_used_.fetch_add(mem, std::memory_order_relaxed);
|
|
}
|
|
if (enabled()) {
|
|
memory_active_.fetch_add(mem, std::memory_order_relaxed);
|
|
}
|
|
}
|
|
|
|
// Should only be called from write thread
|
|
void WriteBufferManager::ReserveMemWithCache(size_t mem) {
|
|
#ifndef ROCKSDB_LITE
|
|
assert(cache_rev_mng_ != nullptr);
|
|
// Use a mutex to protect various data structures. Can be optimized to a
|
|
// lock-free solution if it ends up with a performance bottleneck.
|
|
std::lock_guard<std::mutex> lock(cache_rev_mng_mu_);
|
|
|
|
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
|
|
memory_used_.store(new_mem_used, std::memory_order_relaxed);
|
|
Status s =
|
|
cache_rev_mng_->UpdateCacheReservation<CacheEntryRole::kWriteBuffer>(
|
|
new_mem_used);
|
|
|
|
// We absorb the error since WriteBufferManager is not able to handle
|
|
// this failure properly. Ideallly we should prevent this allocation
|
|
// from happening if this cache reservation fails.
|
|
// [TODO] We'll need to improve it in the future and figure out what to do on
|
|
// error
|
|
s.PermitUncheckedError();
|
|
#else
|
|
(void)mem;
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
void WriteBufferManager::ScheduleFreeMem(size_t mem) {
|
|
if (enabled()) {
|
|
memory_active_.fetch_sub(mem, std::memory_order_relaxed);
|
|
}
|
|
}
|
|
|
|
void WriteBufferManager::FreeMem(size_t mem) {
|
|
if (cache_rev_mng_ != nullptr) {
|
|
FreeMemWithCache(mem);
|
|
} else if (enabled()) {
|
|
memory_used_.fetch_sub(mem, std::memory_order_relaxed);
|
|
}
|
|
// Check if stall is active and can be ended.
|
|
MaybeEndWriteStall();
|
|
}
|
|
|
|
void WriteBufferManager::FreeMemWithCache(size_t mem) {
|
|
#ifndef ROCKSDB_LITE
|
|
assert(cache_rev_mng_ != nullptr);
|
|
// Use a mutex to protect various data structures. Can be optimized to a
|
|
// lock-free solution if it ends up with a performance bottleneck.
|
|
std::lock_guard<std::mutex> lock(cache_rev_mng_mu_);
|
|
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
|
|
memory_used_.store(new_mem_used, std::memory_order_relaxed);
|
|
Status s =
|
|
cache_rev_mng_->UpdateCacheReservation<CacheEntryRole::kWriteBuffer>(
|
|
new_mem_used);
|
|
|
|
// We absorb the error since WriteBufferManager is not able to handle
|
|
// this failure properly.
|
|
// [TODO] We'll need to improve it in the future and figure out what to do on
|
|
// error
|
|
s.PermitUncheckedError();
|
|
#else
|
|
(void)mem;
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
|
|
assert(wbm_stall != nullptr);
|
|
assert(allow_stall_);
|
|
|
|
// Allocate outside of the lock.
|
|
std::list<StallInterface*> new_node = {wbm_stall};
|
|
|
|
{
|
|
std::unique_lock<std::mutex> lock(mu_);
|
|
// Verify if the stall conditions are stil active.
|
|
if (ShouldStall()) {
|
|
stall_active_.store(true, std::memory_order_relaxed);
|
|
queue_.splice(queue_.end(), std::move(new_node));
|
|
}
|
|
}
|
|
|
|
// If the node was not consumed, the stall has ended already and we can signal
|
|
// the caller.
|
|
if (!new_node.empty()) {
|
|
new_node.front()->Signal();
|
|
}
|
|
}
|
|
|
|
// Called when memory is freed in FreeMem or the buffer size has changed.
|
|
void WriteBufferManager::MaybeEndWriteStall() {
|
|
// Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock
|
|
// the writers.
|
|
if (!allow_stall_) {
|
|
return;
|
|
}
|
|
|
|
if (IsStallThresholdExceeded()) {
|
|
return; // Stall conditions have not resolved.
|
|
}
|
|
|
|
// Perform all deallocations outside of the lock.
|
|
std::list<StallInterface*> cleanup;
|
|
|
|
std::unique_lock<std::mutex> lock(mu_);
|
|
if (!stall_active_.load(std::memory_order_relaxed)) {
|
|
return; // Nothing to do.
|
|
}
|
|
|
|
// Unblock new writers.
|
|
stall_active_.store(false, std::memory_order_relaxed);
|
|
|
|
// Unblock the writers in the queue.
|
|
for (StallInterface* wbm_stall : queue_) {
|
|
wbm_stall->Signal();
|
|
}
|
|
cleanup = std::move(queue_);
|
|
}
|
|
|
|
void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
|
|
assert(wbm_stall != nullptr);
|
|
|
|
// Deallocate the removed nodes outside of the lock.
|
|
std::list<StallInterface*> cleanup;
|
|
|
|
if (enabled() && allow_stall_) {
|
|
std::unique_lock<std::mutex> lock(mu_);
|
|
for (auto it = queue_.begin(); it != queue_.end();) {
|
|
auto next = std::next(it);
|
|
if (*it == wbm_stall) {
|
|
cleanup.splice(cleanup.end(), queue_, std::move(it));
|
|
}
|
|
it = next;
|
|
}
|
|
}
|
|
wbm_stall->Signal();
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|