596e9008e4
Summary: When WriteBufferManager is shared across DBs and column families to maintain memory usage under a limit, OOMs have been observed when flush cannot finish but writes continuously insert to memtables. In order to avoid OOMs, when memory usage goes beyond buffer_limit_ and DBs tries to write, this change will stall incoming writers until flush is completed and memory_usage drops. Design: Stall condition: When total memory usage exceeds WriteBufferManager::buffer_size_ (memory_usage() >= buffer_size_) WriterBufferManager::ShouldStall() returns true. DBImpl first block incoming/future writers by calling write_thread_.BeginWriteStall() (which adds dummy stall object to the writer's queue). Then DB is blocked on a state State::Blocked (current write doesn't go through). WBStallInterface object maintained by every DB instance is added to the queue of WriteBufferManager. If multiple DBs tries to write during this stall, they will also be blocked when check WriteBufferManager::ShouldStall() returns true. End Stall condition: When flush is finished and memory usage goes down, stall will end only if memory waiting to be flushed is less than buffer_size/2. This lower limit will give time for flush to complete and avoid continous stalling if memory usage remains close to buffer_size. WriterBufferManager::EndWriteStall() is called, which removes all instances from its queue and signal them to continue. Their state is changed to State::Running and they are unblocked. DBImpl then signal all incoming writers of that DB to continue by calling write_thread_.EndWriteStall() (which removes dummy stall object from the queue). DB instance creates WBMStallInterface which is an interface to block and signal DBs during stall. When DB needs to be blocked or signalled by WriteBufferManager, state_for_wbm_ state is changed accordingly (RUNNING or BLOCKED). Pull Request resolved: https://github.com/facebook/rocksdb/pull/7898 Test Plan: Added a new test db/db_write_buffer_manager_test.cc Reviewed By: anand1976 Differential Revision: D26093227 Pulled By: akankshamahajan15 fbshipit-source-id: 2bbd982a3fb7033f6de6153aa92a221249861aae
179 lines
5.6 KiB
C++
179 lines
5.6 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
//
|
|
// WriteBufferManager is for managing memory allocation for one or more
|
|
// MemTables.
|
|
|
|
#pragma once
|
|
|
|
#include <atomic>
|
|
#include <condition_variable>
|
|
#include <cstddef>
|
|
#include <list>
|
|
#include <mutex>
|
|
|
|
#include "rocksdb/cache.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// Interface to block and signal DB instances.
|
|
// Each DB instance contains ptr to StallInterface.
|
|
class StallInterface {
|
|
public:
|
|
virtual ~StallInterface() {}
|
|
|
|
virtual void Block() = 0;
|
|
|
|
virtual void Signal() = 0;
|
|
};
|
|
|
|
class WriteBufferManager {
|
|
public:
|
|
// Parameters:
|
|
// _buffer_size: _buffer_size = 0 indicates no limit. Memory won't be capped.
|
|
// memory_usage() won't be valid and ShouldFlush() will always return true.
|
|
//
|
|
// cache_: if `cache` is provided, we'll put dummy entries in the cache and
|
|
// cost the memory allocated to the cache. It can be used even if _buffer_size
|
|
// = 0.
|
|
//
|
|
// allow_stall: if set true, it will enable stalling of writes when
|
|
// memory_usage() exceeds buffer_size. It will wait for flush to complete and
|
|
// memory usage to drop down.
|
|
explicit WriteBufferManager(size_t _buffer_size,
|
|
std::shared_ptr<Cache> cache = {},
|
|
bool allow_stall = false);
|
|
// No copying allowed
|
|
WriteBufferManager(const WriteBufferManager&) = delete;
|
|
WriteBufferManager& operator=(const WriteBufferManager&) = delete;
|
|
|
|
~WriteBufferManager();
|
|
|
|
// Returns true if buffer_limit is passed to limit the total memory usage and
|
|
// is greater than 0.
|
|
bool enabled() const { return buffer_size() > 0; }
|
|
|
|
// Returns true if pointer to cache is passed.
|
|
bool cost_to_cache() const { return cache_rep_ != nullptr; }
|
|
|
|
// Returns the total memory used by memtables.
|
|
// Only valid if enabled()
|
|
size_t memory_usage() const {
|
|
return memory_used_.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
// Returns the total memory used by active memtables.
|
|
size_t mutable_memtable_memory_usage() const {
|
|
return memory_active_.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
size_t dummy_entries_in_cache_usage() const {
|
|
return dummy_size_.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
// Returns the buffer_size.
|
|
size_t buffer_size() const {
|
|
return buffer_size_.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
void SetBufferSize(size_t new_size) {
|
|
buffer_size_.store(new_size, std::memory_order_relaxed);
|
|
mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
|
|
// Check if stall is active and can be ended.
|
|
if (allow_stall_) {
|
|
EndWriteStall();
|
|
}
|
|
}
|
|
|
|
// Below functions should be called by RocksDB internally.
|
|
|
|
// Should only be called from write thread
|
|
bool ShouldFlush() const {
|
|
if (enabled()) {
|
|
if (mutable_memtable_memory_usage() >
|
|
mutable_limit_.load(std::memory_order_relaxed)) {
|
|
return true;
|
|
}
|
|
size_t local_size = buffer_size();
|
|
if (memory_usage() >= local_size &&
|
|
mutable_memtable_memory_usage() >= local_size / 2) {
|
|
// If the memory exceeds the buffer size, we trigger more aggressive
|
|
// flush. But if already more than half memory is being flushed,
|
|
// triggering more flush may not help. We will hold it instead.
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// Returns true if total memory usage exceeded buffer_size.
|
|
// We stall the writes untill memory_usage drops below buffer_size. When the
|
|
// function returns true, all writer threads (including one checking this
|
|
// condition) across all DBs will be stalled. Stall is allowed only if user
|
|
// pass allow_stall = true during WriteBufferManager instance creation.
|
|
//
|
|
// Should only be called by RocksDB internally .
|
|
bool ShouldStall() {
|
|
if (allow_stall_ && enabled()) {
|
|
if (IsStallActive()) {
|
|
return true;
|
|
}
|
|
if (IsStallThresholdExceeded()) {
|
|
stall_active_.store(true, std::memory_order_relaxed);
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// Returns true if stall is active.
|
|
bool IsStallActive() const {
|
|
return stall_active_.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
// Returns true if stalling condition is met.
|
|
bool IsStallThresholdExceeded() { return memory_usage() >= buffer_size_; }
|
|
|
|
void ReserveMem(size_t mem);
|
|
|
|
// We are in the process of freeing `mem` bytes, so it is not considered
|
|
// when checking the soft limit.
|
|
void ScheduleFreeMem(size_t mem);
|
|
|
|
void FreeMem(size_t mem);
|
|
|
|
// Add the DB instance to the queue and block the DB.
|
|
// Should only be called by RocksDB internally.
|
|
void BeginWriteStall(StallInterface* wbm_stall);
|
|
|
|
// Remove DB instances from queue and signal them to continue.
|
|
void EndWriteStall();
|
|
|
|
void RemoveDBFromQueue(StallInterface* wbm_stall);
|
|
|
|
private:
|
|
std::atomic<size_t> buffer_size_;
|
|
std::atomic<size_t> mutable_limit_;
|
|
std::atomic<size_t> memory_used_;
|
|
// Memory that hasn't been scheduled to free.
|
|
std::atomic<size_t> memory_active_;
|
|
std::atomic<size_t> dummy_size_;
|
|
struct CacheRep;
|
|
std::unique_ptr<CacheRep> cache_rep_;
|
|
std::list<StallInterface*> queue_;
|
|
// Protects the queue_
|
|
std::mutex mu_;
|
|
bool allow_stall_;
|
|
std::atomic<bool> stall_active_;
|
|
|
|
void ReserveMemWithCache(size_t mem);
|
|
void FreeMemWithCache(size_t mem);
|
|
};
|
|
} // namespace ROCKSDB_NAMESPACE
|