dynamically change current memtable size
Summary: Previously setting `write_buffer_size` with `SetOptions` would only apply to new memtables. An internal user wanted it to take effect immediately, instead of at an arbitrary future point, to prevent OOM. This PR makes the memtable's size mutable, and makes `SetOptions()` mutate it. There is one case when we preserve the old behavior, which is when memtable prefix bloom filter is enabled and the user is increasing the memtable's capacity. That's because the prefix bloom filter's size is fixed and wouldn't work as well on a larger memtable. Closes https://github.com/facebook/rocksdb/pull/3119 Differential Revision: D6228304 Pulled By: ajkr fbshipit-source-id: e44bd9d10a5f8c9d8c464bf7436070bb3eafdfc9
This commit is contained in:
parent
7f1815c379
commit
9019e91254
@ -930,6 +930,13 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
|
|||||||
super_version_ = new_superversion;
|
super_version_ = new_superversion;
|
||||||
++super_version_number_;
|
++super_version_number_;
|
||||||
super_version_->version_number = super_version_number_;
|
super_version_->version_number = super_version_number_;
|
||||||
|
if (old_superversion != nullptr) {
|
||||||
|
if (old_superversion->mutable_cf_options.write_buffer_size !=
|
||||||
|
mutable_cf_options.write_buffer_size) {
|
||||||
|
mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Reset SuperVersions cached in thread local storage
|
// Reset SuperVersions cached in thread local storage
|
||||||
ResetThreadLocalSuperVersions();
|
ResetThreadLocalSuperVersions();
|
||||||
|
|
||||||
|
@ -3347,11 +3347,23 @@ TEST_F(DBTest, DynamicMemtableOptions) {
|
|||||||
{"write_buffer_size", "131072"},
|
{"write_buffer_size", "131072"},
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// The existing memtable is still 64KB in size, after it becomes immutable,
|
// The existing memtable inflated 64KB->128KB when we invoked SetOptions().
|
||||||
// the next memtable will be 128KB in size. Write 256KB total, we should
|
// Write 192KB, we should have a 128KB L0 file and a memtable with 64KB data.
|
||||||
// have a 64KB L0 file, a 128KB L0 file, and a memtable with 64KB data
|
gen_l0_kb(192);
|
||||||
gen_l0_kb(256);
|
ASSERT_EQ(NumTableFilesAtLevel(0), 1); // (A)
|
||||||
ASSERT_EQ(NumTableFilesAtLevel(0), 2); // (A)
|
ASSERT_LT(SizeAtLevel(0), k128KB + 2 * k5KB);
|
||||||
|
ASSERT_GT(SizeAtLevel(0), k128KB - 4 * k5KB);
|
||||||
|
|
||||||
|
// Decrease buffer size below current usage
|
||||||
|
ASSERT_OK(dbfull()->SetOptions({
|
||||||
|
{"write_buffer_size", "65536"},
|
||||||
|
}));
|
||||||
|
// The existing memtable became eligible for flush when we reduced its
|
||||||
|
// capacity to 64KB. Two keys need to be added to trigger flush: first causes
|
||||||
|
// memtable to be marked full, second schedules the flush. Then we should have
|
||||||
|
// a 128KB L0 file, a 64KB L0 file, and a memtable with just one key.
|
||||||
|
gen_l0_kb(2);
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
|
||||||
ASSERT_LT(SizeAtLevel(0), k128KB + k64KB + 2 * k5KB);
|
ASSERT_LT(SizeAtLevel(0), k128KB + k64KB + 2 * k5KB);
|
||||||
ASSERT_GT(SizeAtLevel(0), k128KB + k64KB - 4 * k5KB);
|
ASSERT_GT(SizeAtLevel(0), k128KB + k64KB - 4 * k5KB);
|
||||||
|
|
||||||
|
@ -38,10 +38,10 @@
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
MemTableOptions::MemTableOptions(const ImmutableCFOptions& ioptions,
|
ImmutableMemTableOptions::ImmutableMemTableOptions(
|
||||||
const MutableCFOptions& mutable_cf_options)
|
const ImmutableCFOptions& ioptions,
|
||||||
: write_buffer_size(mutable_cf_options.write_buffer_size),
|
const MutableCFOptions& mutable_cf_options)
|
||||||
arena_block_size(mutable_cf_options.arena_block_size),
|
: arena_block_size(mutable_cf_options.arena_block_size),
|
||||||
memtable_prefix_bloom_bits(
|
memtable_prefix_bloom_bits(
|
||||||
static_cast<uint32_t>(
|
static_cast<uint32_t>(
|
||||||
static_cast<double>(mutable_cf_options.write_buffer_size) *
|
static_cast<double>(mutable_cf_options.write_buffer_size) *
|
||||||
@ -82,6 +82,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
|
|||||||
data_size_(0),
|
data_size_(0),
|
||||||
num_entries_(0),
|
num_entries_(0),
|
||||||
num_deletes_(0),
|
num_deletes_(0),
|
||||||
|
write_buffer_size_(mutable_cf_options.write_buffer_size),
|
||||||
flush_in_progress_(false),
|
flush_in_progress_(false),
|
||||||
flush_completed_(false),
|
flush_completed_(false),
|
||||||
file_number_(0),
|
file_number_(0),
|
||||||
@ -135,6 +136,7 @@ size_t MemTable::ApproximateMemoryUsage() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool MemTable::ShouldFlushNow() const {
|
bool MemTable::ShouldFlushNow() const {
|
||||||
|
size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
|
||||||
// In a lot of times, we cannot allocate arena blocks that exactly matches the
|
// In a lot of times, we cannot allocate arena blocks that exactly matches the
|
||||||
// buffer size. Thus we have to decide if we should over-allocate or
|
// buffer size. Thus we have to decide if we should over-allocate or
|
||||||
// under-allocate.
|
// under-allocate.
|
||||||
@ -152,16 +154,14 @@ bool MemTable::ShouldFlushNow() const {
|
|||||||
// if we can still allocate one more block without exceeding the
|
// if we can still allocate one more block without exceeding the
|
||||||
// over-allocation ratio, then we should not flush.
|
// over-allocation ratio, then we should not flush.
|
||||||
if (allocated_memory + kArenaBlockSize <
|
if (allocated_memory + kArenaBlockSize <
|
||||||
moptions_.write_buffer_size +
|
write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
|
||||||
kArenaBlockSize * kAllowOverAllocationRatio) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if user keeps adding entries that exceeds moptions.write_buffer_size,
|
// if user keeps adding entries that exceeds write_buffer_size, we need to
|
||||||
// we need to flush earlier even though we still have much available
|
// flush earlier even though we still have much available memory left.
|
||||||
// memory left.
|
if (allocated_memory >
|
||||||
if (allocated_memory > moptions_.write_buffer_size +
|
write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
|
||||||
kArenaBlockSize * kAllowOverAllocationRatio) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -264,7 +264,8 @@ class MemTableIterator : public InternalIterator {
|
|||||||
comparator_(mem.comparator_),
|
comparator_(mem.comparator_),
|
||||||
valid_(false),
|
valid_(false),
|
||||||
arena_mode_(arena != nullptr),
|
arena_mode_(arena != nullptr),
|
||||||
value_pinned_(!mem.GetMemTableOptions()->inplace_update_support) {
|
value_pinned_(
|
||||||
|
!mem.GetImmutableMemTableOptions()->inplace_update_support) {
|
||||||
if (use_range_del_table) {
|
if (use_range_del_table) {
|
||||||
iter_ = mem.range_del_table_->GetIterator(arena);
|
iter_ = mem.range_del_table_->GetIterator(arena);
|
||||||
} else if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
|
} else if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
|
||||||
|
@ -35,11 +35,9 @@ class MemTableIterator;
|
|||||||
class MergeContext;
|
class MergeContext;
|
||||||
class InternalIterator;
|
class InternalIterator;
|
||||||
|
|
||||||
struct MemTableOptions {
|
struct ImmutableMemTableOptions {
|
||||||
explicit MemTableOptions(
|
explicit ImmutableMemTableOptions(const ImmutableCFOptions& ioptions,
|
||||||
const ImmutableCFOptions& ioptions,
|
const MutableCFOptions& mutable_cf_options);
|
||||||
const MutableCFOptions& mutable_cf_options);
|
|
||||||
size_t write_buffer_size;
|
|
||||||
size_t arena_block_size;
|
size_t arena_block_size;
|
||||||
uint32_t memtable_prefix_bloom_bits;
|
uint32_t memtable_prefix_bloom_bits;
|
||||||
size_t memtable_huge_page_size;
|
size_t memtable_huge_page_size;
|
||||||
@ -260,6 +258,18 @@ class MemTable {
|
|||||||
return num_deletes_.load(std::memory_order_relaxed);
|
return num_deletes_.load(std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dynamically change the memtable's capacity. If set below the current usage,
|
||||||
|
// the next key added will trigger a flush. Can only increase size when
|
||||||
|
// memtable prefix bloom is disabled, since we can't easily allocate more
|
||||||
|
// space.
|
||||||
|
void UpdateWriteBufferSize(size_t new_write_buffer_size) {
|
||||||
|
if (prefix_bloom_ == nullptr ||
|
||||||
|
new_write_buffer_size < write_buffer_size_) {
|
||||||
|
write_buffer_size_.store(new_write_buffer_size,
|
||||||
|
std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Returns the edits area that is needed for flushing the memtable
|
// Returns the edits area that is needed for flushing the memtable
|
||||||
VersionEdit* GetEdits() { return &edit_; }
|
VersionEdit* GetEdits() { return &edit_; }
|
||||||
|
|
||||||
@ -348,7 +358,9 @@ class MemTable {
|
|||||||
return comparator_.comparator;
|
return comparator_.comparator;
|
||||||
}
|
}
|
||||||
|
|
||||||
const MemTableOptions* GetMemTableOptions() const { return &moptions_; }
|
const ImmutableMemTableOptions* GetImmutableMemTableOptions() const {
|
||||||
|
return &moptions_;
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t ApproximateOldestKeyTime() const {
|
uint64_t ApproximateOldestKeyTime() const {
|
||||||
return oldest_key_time_.load(std::memory_order_relaxed);
|
return oldest_key_time_.load(std::memory_order_relaxed);
|
||||||
@ -362,7 +374,7 @@ class MemTable {
|
|||||||
friend class MemTableList;
|
friend class MemTableList;
|
||||||
|
|
||||||
KeyComparator comparator_;
|
KeyComparator comparator_;
|
||||||
const MemTableOptions moptions_;
|
const ImmutableMemTableOptions moptions_;
|
||||||
int refs_;
|
int refs_;
|
||||||
const size_t kArenaBlockSize;
|
const size_t kArenaBlockSize;
|
||||||
AllocTracker mem_tracker_;
|
AllocTracker mem_tracker_;
|
||||||
@ -376,6 +388,9 @@ class MemTable {
|
|||||||
std::atomic<uint64_t> num_entries_;
|
std::atomic<uint64_t> num_entries_;
|
||||||
std::atomic<uint64_t> num_deletes_;
|
std::atomic<uint64_t> num_deletes_;
|
||||||
|
|
||||||
|
// Dynamically changeable memtable option
|
||||||
|
std::atomic<size_t> write_buffer_size_;
|
||||||
|
|
||||||
// These are used to manage memtable flushes to storage
|
// These are used to manage memtable flushes to storage
|
||||||
bool flush_in_progress_; // started the flush
|
bool flush_in_progress_; // started the flush
|
||||||
bool flush_completed_; // finished the flush
|
bool flush_completed_; // finished the flush
|
||||||
|
@ -992,7 +992,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
MemTable* mem = cf_mems_->GetMemTable();
|
MemTable* mem = cf_mems_->GetMemTable();
|
||||||
auto* moptions = mem->GetMemTableOptions();
|
auto* moptions = mem->GetImmutableMemTableOptions();
|
||||||
if (!moptions->inplace_update_support) {
|
if (!moptions->inplace_update_support) {
|
||||||
mem->Add(sequence_, value_type, key, value, concurrent_memtable_writes_,
|
mem->Add(sequence_, value_type, key, value, concurrent_memtable_writes_,
|
||||||
get_post_process_info(mem));
|
get_post_process_info(mem));
|
||||||
@ -1139,7 +1139,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
MemTable* mem = cf_mems_->GetMemTable();
|
MemTable* mem = cf_mems_->GetMemTable();
|
||||||
auto* moptions = mem->GetMemTableOptions();
|
auto* moptions = mem->GetImmutableMemTableOptions();
|
||||||
bool perform_merge = false;
|
bool perform_merge = false;
|
||||||
|
|
||||||
// If we pass DB through and options.max_successive_merges is hit
|
// If we pass DB through and options.max_successive_merges is hit
|
||||||
|
Loading…
x
Reference in New Issue
Block a user