Fix race in WriteBufferManager (#9009)
Summary: EndWriteStall has a data race: `queue_.empty()` is checked outside of the mutex, so once we enter the critical section another thread may already have cleared the list, and accessing the `front()` is undefined behavior (and causes interesting crashes under high concurrency). This PR fixes the bug, and also rewrites the logic to make it easier to reason about it. It also fixes another subtle bug: if some writers are stalled and `SetBufferSize(0)` is called, which disables the WBM, the writer are not unblocked because of an early `enabled()` check in `EndWriteStall()`. It doesn't significantly change the locking behavior, as before writers won't lock unless entering a stall condition, and `FreeMem` almost always locks if stalling is allowed, but that is inevitable with the current design. Liveness is guaranteed by the fact that if some writes are blocked, eventually all writes will be blocked due to `stall_active_`, and eventually all memory is freed. While at it, do a couple of optimizations: - In `WBMStallInterface::Signal()` signal the CV only after releasing the lock. Signaling under the lock is a common pitfall, as it causes the woken-up thread to immediately go back to sleep because the mutex is still locked by the awaker. - Move all allocations and deallocations outside of the lock. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9009 Test Plan: ``` USE_CLANG=1 make -j64 all check ``` Reviewed By: akankshamahajan15 Differential Revision: D31550668 Pulled By: ot fbshipit-source-id: 5125387c3dc7ecaaa2b8bbc736e58c4156698580
This commit is contained in:
parent
e1139167ae
commit
22d4dc5066
@ -1098,8 +1098,10 @@ class DBImpl : public DB {
|
|||||||
// Called from WriteBufferManager. This function changes the state_
|
// Called from WriteBufferManager. This function changes the state_
|
||||||
// to State::RUNNING indicating the stall is cleared and DB can proceed.
|
// to State::RUNNING indicating the stall is cleared and DB can proceed.
|
||||||
void Signal() override {
|
void Signal() override {
|
||||||
MutexLock lock(&state_mutex_);
|
{
|
||||||
state_ = State::RUNNING;
|
MutexLock lock(&state_mutex_);
|
||||||
|
state_ = State::RUNNING;
|
||||||
|
}
|
||||||
state_cv_.Signal();
|
state_cv_.Signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,9 +85,7 @@ class WriteBufferManager {
|
|||||||
buffer_size_.store(new_size, std::memory_order_relaxed);
|
buffer_size_.store(new_size, std::memory_order_relaxed);
|
||||||
mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
|
mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
|
||||||
// Check if stall is active and can be ended.
|
// Check if stall is active and can be ended.
|
||||||
if (allow_stall_) {
|
MaybeEndWriteStall();
|
||||||
EndWriteStall();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Below functions should be called by RocksDB internally.
|
// Below functions should be called by RocksDB internally.
|
||||||
@ -118,17 +116,12 @@ class WriteBufferManager {
|
|||||||
// pass allow_stall = true during WriteBufferManager instance creation.
|
// pass allow_stall = true during WriteBufferManager instance creation.
|
||||||
//
|
//
|
||||||
// Should only be called by RocksDB internally .
|
// Should only be called by RocksDB internally .
|
||||||
bool ShouldStall() {
|
bool ShouldStall() const {
|
||||||
if (allow_stall_ && enabled()) {
|
if (!allow_stall_ || !enabled()) {
|
||||||
if (IsStallActive()) {
|
return false;
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (IsStallThresholdExceeded()) {
|
|
||||||
stall_active_.store(true, std::memory_order_relaxed);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return false;
|
|
||||||
|
return IsStallActive() || IsStallThresholdExceeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns true if stall is active.
|
// Returns true if stall is active.
|
||||||
@ -137,7 +130,9 @@ class WriteBufferManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Returns true if stalling condition is met.
|
// Returns true if stalling condition is met.
|
||||||
bool IsStallThresholdExceeded() { return memory_usage() >= buffer_size_; }
|
bool IsStallThresholdExceeded() const {
|
||||||
|
return memory_usage() >= buffer_size_;
|
||||||
|
}
|
||||||
|
|
||||||
void ReserveMem(size_t mem);
|
void ReserveMem(size_t mem);
|
||||||
|
|
||||||
@ -151,8 +146,9 @@ class WriteBufferManager {
|
|||||||
// Should only be called by RocksDB internally.
|
// Should only be called by RocksDB internally.
|
||||||
void BeginWriteStall(StallInterface* wbm_stall);
|
void BeginWriteStall(StallInterface* wbm_stall);
|
||||||
|
|
||||||
// Remove DB instances from queue and signal them to continue.
|
// If stall conditions have resolved, remove DB instances from queue and
|
||||||
void EndWriteStall();
|
// signal them to continue.
|
||||||
|
void MaybeEndWriteStall();
|
||||||
|
|
||||||
void RemoveDBFromQueue(StallInterface* wbm_stall);
|
void RemoveDBFromQueue(StallInterface* wbm_stall);
|
||||||
|
|
||||||
@ -167,9 +163,11 @@ class WriteBufferManager {
|
|||||||
std::mutex cache_rev_mng_mu_;
|
std::mutex cache_rev_mng_mu_;
|
||||||
|
|
||||||
std::list<StallInterface*> queue_;
|
std::list<StallInterface*> queue_;
|
||||||
// Protects the queue_
|
// Protects the queue_ and stall_active_.
|
||||||
std::mutex mu_;
|
std::mutex mu_;
|
||||||
bool allow_stall_;
|
bool allow_stall_;
|
||||||
|
// Value should only be changed by BeginWriteStall() and MaybeEndWriteStall()
|
||||||
|
// while holding mu_, but it can be read without a lock.
|
||||||
std::atomic<bool> stall_active_;
|
std::atomic<bool> stall_active_;
|
||||||
|
|
||||||
void ReserveMemWithCache(size_t mem);
|
void ReserveMemWithCache(size_t mem);
|
||||||
|
@ -39,7 +39,12 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size,
|
|||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
}
|
}
|
||||||
|
|
||||||
WriteBufferManager::~WriteBufferManager() = default;
|
WriteBufferManager::~WriteBufferManager() {
|
||||||
|
#ifndef NDEBUG
|
||||||
|
std::unique_lock<std::mutex> lock(mu_);
|
||||||
|
assert(queue_.empty());
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
|
std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
|
||||||
if (cache_rev_mng_ != nullptr) {
|
if (cache_rev_mng_ != nullptr) {
|
||||||
@ -98,9 +103,7 @@ void WriteBufferManager::FreeMem(size_t mem) {
|
|||||||
memory_used_.fetch_sub(mem, std::memory_order_relaxed);
|
memory_used_.fetch_sub(mem, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
// Check if stall is active and can be ended.
|
// Check if stall is active and can be ended.
|
||||||
if (allow_stall_) {
|
MaybeEndWriteStall();
|
||||||
EndWriteStall();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteBufferManager::FreeMemWithCache(size_t mem) {
|
void WriteBufferManager::FreeMemWithCache(size_t mem) {
|
||||||
@ -127,47 +130,74 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) {
|
|||||||
|
|
||||||
void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
|
void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
|
||||||
assert(wbm_stall != nullptr);
|
assert(wbm_stall != nullptr);
|
||||||
if (wbm_stall) {
|
assert(allow_stall_);
|
||||||
|
|
||||||
|
// Allocate outside of the lock.
|
||||||
|
std::list<StallInterface*> new_node = {wbm_stall};
|
||||||
|
|
||||||
|
{
|
||||||
std::unique_lock<std::mutex> lock(mu_);
|
std::unique_lock<std::mutex> lock(mu_);
|
||||||
queue_.push_back(wbm_stall);
|
// Verify if the stall conditions are stil active.
|
||||||
|
if (ShouldStall()) {
|
||||||
|
stall_active_.store(true, std::memory_order_relaxed);
|
||||||
|
queue_.splice(queue_.end(), std::move(new_node));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// In case thread enqueue itself and memory got freed in parallel, end the
|
|
||||||
// stall.
|
// If the node was not consumed, the stall has ended already and we can signal
|
||||||
if (!ShouldStall()) {
|
// the caller.
|
||||||
EndWriteStall();
|
if (!new_node.empty()) {
|
||||||
|
new_node.front()->Signal();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Called when memory is freed in FreeMem.
|
// Called when memory is freed in FreeMem or the buffer size has changed.
|
||||||
void WriteBufferManager::EndWriteStall() {
|
void WriteBufferManager::MaybeEndWriteStall() {
|
||||||
if (enabled() && !IsStallThresholdExceeded()) {
|
// Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock
|
||||||
{
|
// the writers.
|
||||||
std::unique_lock<std::mutex> lock(mu_);
|
if (!allow_stall_) {
|
||||||
stall_active_.store(false, std::memory_order_relaxed);
|
return;
|
||||||
if (queue_.empty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the instances from the list and call WBMStallInterface::Signal to
|
|
||||||
// change the state to running and unblock the DB instances.
|
|
||||||
// Check ShouldStall() incase stall got active by other DBs.
|
|
||||||
while (!ShouldStall() && !queue_.empty()) {
|
|
||||||
std::unique_lock<std::mutex> lock(mu_);
|
|
||||||
StallInterface* wbm_stall = queue_.front();
|
|
||||||
queue_.pop_front();
|
|
||||||
wbm_stall->Signal();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (IsStallThresholdExceeded()) {
|
||||||
|
return; // Stall conditions have not resolved.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform all deallocations outside of the lock.
|
||||||
|
std::list<StallInterface*> cleanup;
|
||||||
|
|
||||||
|
std::unique_lock<std::mutex> lock(mu_);
|
||||||
|
if (!stall_active_.load(std::memory_order_relaxed)) {
|
||||||
|
return; // Nothing to do.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unblock new writers.
|
||||||
|
stall_active_.store(false, std::memory_order_relaxed);
|
||||||
|
|
||||||
|
// Unblock the writers in the queue.
|
||||||
|
for (StallInterface* wbm_stall : queue_) {
|
||||||
|
wbm_stall->Signal();
|
||||||
|
}
|
||||||
|
cleanup = std::move(queue_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
|
void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
|
||||||
assert(wbm_stall != nullptr);
|
assert(wbm_stall != nullptr);
|
||||||
|
|
||||||
|
// Deallocate the removed nodes outside of the lock.
|
||||||
|
std::list<StallInterface*> cleanup;
|
||||||
|
|
||||||
if (enabled() && allow_stall_) {
|
if (enabled() && allow_stall_) {
|
||||||
std::unique_lock<std::mutex> lock(mu_);
|
std::unique_lock<std::mutex> lock(mu_);
|
||||||
queue_.remove(wbm_stall);
|
for (auto it = queue_.begin(); it != queue_.end();) {
|
||||||
wbm_stall->Signal();
|
auto next = std::next(it);
|
||||||
|
if (*it == wbm_stall) {
|
||||||
|
cleanup.splice(cleanup.end(), queue_, std::move(it));
|
||||||
|
}
|
||||||
|
it = next;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
wbm_stall->Signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
Loading…
Reference in New Issue
Block a user