dynamic inplace_update options
Summary: Make inplace_update_support and inplace_update_num_locks dynamic. inplace_callback becomes immutable We are almost free of references to cfd->options() in db_impl Test Plan: unit test Reviewers: igor, yhchiang, rven, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D25293
This commit is contained in:
parent
bc3bc4bc2f
commit
f1841985e4
@ -388,12 +388,13 @@ const EnvOptions* ColumnFamilyData::soptions() const {
|
||||
|
||||
void ColumnFamilyData::SetCurrent(Version* current) { current_ = current; }
|
||||
|
||||
void ColumnFamilyData::CreateNewMemtable(const MemTableOptions& moptions) {
|
||||
void ColumnFamilyData::CreateNewMemtable(
|
||||
const MutableCFOptions& mutable_cf_options) {
|
||||
assert(current_ != nullptr);
|
||||
if (mem_ != nullptr) {
|
||||
delete mem_->Unref();
|
||||
}
|
||||
mem_ = new MemTable(internal_comparator_, ioptions_, moptions);
|
||||
mem_ = new MemTable(internal_comparator_, ioptions_, mutable_cf_options);
|
||||
mem_->Ref();
|
||||
}
|
||||
|
||||
|
@ -198,7 +198,7 @@ class ColumnFamilyData {
|
||||
Version* dummy_versions() { return dummy_versions_; }
|
||||
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
|
||||
void SetCurrent(Version* current);
|
||||
void CreateNewMemtable(const MemTableOptions& moptions);
|
||||
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options);
|
||||
|
||||
TableCache* table_cache() const { return table_cache_.get(); }
|
||||
|
||||
|
@ -1228,8 +1228,7 @@ Status DBImpl::Recover(
|
||||
if (!s.ok()) {
|
||||
// Clear memtables if recovery failed
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
cfd->CreateNewMemtable(MemTableOptions(
|
||||
*cfd->GetLatestMutableCFOptions(), *cfd->options()));
|
||||
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1360,8 +1359,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
||||
// file-systems cause the DB::Open() to fail.
|
||||
return status;
|
||||
}
|
||||
cfd->CreateNewMemtable(MemTableOptions(
|
||||
*cfd->GetLatestMutableCFOptions(), *cfd->options()));
|
||||
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1398,8 +1396,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
||||
// Recovery failed
|
||||
break;
|
||||
}
|
||||
cfd->CreateNewMemtable(MemTableOptions(
|
||||
*cfd->GetLatestMutableCFOptions(), *cfd->options()));
|
||||
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
|
||||
}
|
||||
|
||||
// write MANIFEST with update
|
||||
@ -2749,7 +2746,7 @@ Status DBImpl::ProcessKeyValueCompaction(
|
||||
ColumnFamilyData* cfd = compact->compaction->column_family_data();
|
||||
MergeHelper merge(
|
||||
cfd->user_comparator(), cfd->ioptions()->merge_operator,
|
||||
db_options_.info_log.get(), cfd->options()->min_partial_merge_operands,
|
||||
db_options_.info_log.get(), cfd->ioptions()->min_partial_merge_operands,
|
||||
false /* internal key corruption is expected */);
|
||||
auto compaction_filter = cfd->ioptions()->compaction_filter;
|
||||
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
|
||||
@ -4281,9 +4278,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
new_mem = new MemTable(cfd->internal_comparator(),
|
||||
*cfd->ioptions(), MemTableOptions(mutable_cf_options,
|
||||
*cfd->options()));
|
||||
new_mem = new MemTable(cfd->internal_comparator(), *cfd->ioptions(),
|
||||
mutable_cf_options);
|
||||
new_superversion = new SuperVersion();
|
||||
}
|
||||
}
|
||||
|
@ -32,7 +32,8 @@
|
||||
namespace rocksdb {
|
||||
|
||||
MemTableOptions::MemTableOptions(
|
||||
const MutableCFOptions& mutable_cf_options, const Options& options)
|
||||
const ImmutableCFOptions& ioptions,
|
||||
const MutableCFOptions& mutable_cf_options)
|
||||
: write_buffer_size(mutable_cf_options.write_buffer_size),
|
||||
arena_block_size(mutable_cf_options.arena_block_size),
|
||||
memtable_prefix_bloom_bits(mutable_cf_options.memtable_prefix_bloom_bits),
|
||||
@ -40,21 +41,23 @@ MemTableOptions::MemTableOptions(
|
||||
mutable_cf_options.memtable_prefix_bloom_probes),
|
||||
memtable_prefix_bloom_huge_page_tlb_size(
|
||||
mutable_cf_options.memtable_prefix_bloom_huge_page_tlb_size),
|
||||
inplace_update_support(options.inplace_update_support),
|
||||
inplace_update_num_locks(options.inplace_update_num_locks),
|
||||
inplace_callback(options.inplace_callback),
|
||||
inplace_update_support(ioptions.inplace_update_support),
|
||||
inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
|
||||
inplace_callback(ioptions.inplace_callback),
|
||||
max_successive_merges(mutable_cf_options.max_successive_merges),
|
||||
filter_deletes(mutable_cf_options.filter_deletes) {}
|
||||
filter_deletes(mutable_cf_options.filter_deletes),
|
||||
statistics(ioptions.statistics),
|
||||
merge_operator(ioptions.merge_operator),
|
||||
info_log(ioptions.info_log) {}
|
||||
|
||||
MemTable::MemTable(const InternalKeyComparator& cmp,
|
||||
const ImmutableCFOptions& ioptions,
|
||||
const MemTableOptions& moptions)
|
||||
const MutableCFOptions& mutable_cf_options)
|
||||
: comparator_(cmp),
|
||||
ioptions_(ioptions),
|
||||
moptions_(moptions),
|
||||
moptions_(ioptions, mutable_cf_options),
|
||||
refs_(0),
|
||||
kArenaBlockSize(OptimizeBlockSize(moptions.arena_block_size)),
|
||||
arena_(moptions.arena_block_size),
|
||||
kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
|
||||
arena_(moptions_.arena_block_size),
|
||||
table_(ioptions.memtable_factory->CreateMemTableRep(
|
||||
comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log)),
|
||||
num_entries_(0),
|
||||
@ -63,20 +66,20 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
|
||||
file_number_(0),
|
||||
first_seqno_(0),
|
||||
mem_next_logfile_number_(0),
|
||||
locks_(moptions.inplace_update_support ? moptions.inplace_update_num_locks
|
||||
: 0),
|
||||
locks_(moptions_.inplace_update_support ?
|
||||
moptions_.inplace_update_num_locks : 0),
|
||||
prefix_extractor_(ioptions.prefix_extractor),
|
||||
should_flush_(ShouldFlushNow()),
|
||||
flush_scheduled_(false) {
|
||||
// if should_flush_ == true without an entry inserted, something must have
|
||||
// gone wrong already.
|
||||
assert(!should_flush_);
|
||||
if (prefix_extractor_ && moptions.memtable_prefix_bloom_bits > 0) {
|
||||
if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) {
|
||||
prefix_bloom_.reset(new DynamicBloom(
|
||||
&arena_,
|
||||
moptions.memtable_prefix_bloom_bits, ioptions.bloom_locality,
|
||||
moptions.memtable_prefix_bloom_probes, nullptr,
|
||||
moptions.memtable_prefix_bloom_huge_page_tlb_size,
|
||||
moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality,
|
||||
moptions_.memtable_prefix_bloom_probes, nullptr,
|
||||
moptions_.memtable_prefix_bloom_huge_page_tlb_size,
|
||||
ioptions.info_log));
|
||||
}
|
||||
}
|
||||
@ -454,10 +457,10 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
||||
saver.status = s;
|
||||
saver.mem = this;
|
||||
saver.merge_context = merge_context;
|
||||
saver.merge_operator = ioptions_.merge_operator;
|
||||
saver.logger = ioptions_.info_log;
|
||||
saver.merge_operator = moptions_.merge_operator;
|
||||
saver.logger = moptions_.info_log;
|
||||
saver.inplace_update_support = moptions_.inplace_update_support;
|
||||
saver.statistics = ioptions_.statistics;
|
||||
saver.statistics = moptions_.statistics;
|
||||
table_->Get(key, &saver, SaveValue);
|
||||
}
|
||||
|
||||
@ -578,12 +581,12 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
|
||||
memcpy(p, prev_buffer, new_prev_size);
|
||||
}
|
||||
}
|
||||
RecordTick(ioptions_.statistics, NUMBER_KEYS_UPDATED);
|
||||
RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
|
||||
should_flush_ = ShouldFlushNow();
|
||||
return true;
|
||||
} else if (status == UpdateStatus::UPDATED) {
|
||||
Add(seq, kTypeValue, key, Slice(str_value));
|
||||
RecordTick(ioptions_.statistics, NUMBER_KEYS_WRITTEN);
|
||||
RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
|
||||
should_flush_ = ShouldFlushNow();
|
||||
return true;
|
||||
} else if (status == UpdateStatus::UPDATE_FAILED) {
|
||||
|
@ -32,8 +32,8 @@ class MergeContext;
|
||||
|
||||
struct MemTableOptions {
|
||||
explicit MemTableOptions(
|
||||
const MutableCFOptions& mutable_cf_options,
|
||||
const Options& options);
|
||||
const ImmutableCFOptions& ioptions,
|
||||
const MutableCFOptions& mutable_cf_options);
|
||||
size_t write_buffer_size;
|
||||
size_t arena_block_size;
|
||||
uint32_t memtable_prefix_bloom_bits;
|
||||
@ -47,6 +47,9 @@ struct MemTableOptions {
|
||||
std::string* merged_value);
|
||||
size_t max_successive_merges;
|
||||
bool filter_deletes;
|
||||
Statistics* statistics;
|
||||
MergeOperator* merge_operator;
|
||||
Logger* info_log;
|
||||
};
|
||||
|
||||
class MemTable {
|
||||
@ -64,7 +67,7 @@ class MemTable {
|
||||
// is zero and the caller must call Ref() at least once.
|
||||
explicit MemTable(const InternalKeyComparator& comparator,
|
||||
const ImmutableCFOptions& ioptions,
|
||||
const MemTableOptions& moptions);
|
||||
const MutableCFOptions& mutable_cf_options);
|
||||
|
||||
~MemTable();
|
||||
|
||||
@ -199,7 +202,6 @@ class MemTable {
|
||||
|
||||
const Arena& TEST_GetArena() const { return arena_; }
|
||||
|
||||
const ImmutableCFOptions* GetImmutableOptions() const { return &ioptions_; }
|
||||
const MemTableOptions* GetMemTableOptions() const { return &moptions_; }
|
||||
|
||||
private:
|
||||
@ -211,7 +213,6 @@ class MemTable {
|
||||
friend class MemTableList;
|
||||
|
||||
KeyComparator comparator_;
|
||||
const ImmutableCFOptions& ioptions_;
|
||||
const MemTableOptions moptions_;
|
||||
int refs_;
|
||||
const size_t kArenaBlockSize;
|
||||
|
@ -220,7 +220,7 @@ class Repairer {
|
||||
Slice record;
|
||||
WriteBatch batch;
|
||||
MemTable* mem = new MemTable(icmp_, ioptions_,
|
||||
MemTableOptions(MutableCFOptions(options_, ioptions_), options_));
|
||||
MutableCFOptions(options_, ioptions_));
|
||||
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_);
|
||||
mem->Ref();
|
||||
int counter = 0;
|
||||
|
@ -2926,8 +2926,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
|
||||
AppendVersion(new_cfd, v);
|
||||
// GetLatestMutableCFOptions() is safe here without mutex since the
|
||||
// cfd is not available to client
|
||||
new_cfd->CreateNewMemtable(MemTableOptions(
|
||||
*new_cfd->GetLatestMutableCFOptions(), *new_cfd->options()));
|
||||
new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions());
|
||||
new_cfd->SetLogNumber(edit->log_number_);
|
||||
return new_cfd;
|
||||
}
|
||||
|
@ -349,13 +349,12 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
return seek_status;
|
||||
}
|
||||
MemTable* mem = cf_mems_->GetMemTable();
|
||||
auto* ioptions = mem->GetImmutableOptions();
|
||||
auto* moptions = mem->GetMemTableOptions();
|
||||
if (!moptions->inplace_update_support) {
|
||||
mem->Add(sequence_, kTypeValue, key, value);
|
||||
} else if (moptions->inplace_callback == nullptr) {
|
||||
mem->Update(sequence_, key, value);
|
||||
RecordTick(ioptions->statistics, NUMBER_KEYS_UPDATED);
|
||||
RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED);
|
||||
} else {
|
||||
if (mem->UpdateCallback(sequence_, key, value)) {
|
||||
} else {
|
||||
@ -382,11 +381,11 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
if (status == UpdateStatus::UPDATED_INPLACE) {
|
||||
// prev_value is updated in-place with final value.
|
||||
mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
|
||||
RecordTick(ioptions->statistics, NUMBER_KEYS_WRITTEN);
|
||||
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
|
||||
} else if (status == UpdateStatus::UPDATED) {
|
||||
// merged_value contains the final value.
|
||||
mem->Add(sequence_, kTypeValue, key, Slice(merged_value));
|
||||
RecordTick(ioptions->statistics, NUMBER_KEYS_WRITTEN);
|
||||
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -406,7 +405,6 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
return seek_status;
|
||||
}
|
||||
MemTable* mem = cf_mems_->GetMemTable();
|
||||
auto* ioptions = mem->GetImmutableOptions();
|
||||
auto* moptions = mem->GetMemTableOptions();
|
||||
bool perform_merge = false;
|
||||
|
||||
@ -441,16 +439,16 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
Slice get_value_slice = Slice(get_value);
|
||||
|
||||
// 2) Apply this merge
|
||||
auto merge_operator = ioptions->merge_operator;
|
||||
auto merge_operator = moptions->merge_operator;
|
||||
assert(merge_operator);
|
||||
|
||||
std::deque<std::string> operands;
|
||||
operands.push_front(value.ToString());
|
||||
std::string new_value;
|
||||
if (!merge_operator->FullMerge(key, &get_value_slice, operands,
|
||||
&new_value, ioptions->info_log)) {
|
||||
&new_value, moptions->info_log)) {
|
||||
// Failed to merge!
|
||||
RecordTick(ioptions->statistics, NUMBER_MERGE_FAILURES);
|
||||
RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES);
|
||||
|
||||
// Store the delta in memtable
|
||||
perform_merge = false;
|
||||
@ -477,7 +475,6 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
return seek_status;
|
||||
}
|
||||
MemTable* mem = cf_mems_->GetMemTable();
|
||||
auto* ioptions = mem->GetImmutableOptions();
|
||||
auto* moptions = mem->GetMemTableOptions();
|
||||
if (!dont_filter_deletes_ && moptions->filter_deletes) {
|
||||
SnapshotImpl read_from_snapshot;
|
||||
@ -490,7 +487,7 @@ class MemTableInserter : public WriteBatch::Handler {
|
||||
cf_handle = db_->DefaultColumnFamily();
|
||||
}
|
||||
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
|
||||
RecordTick(ioptions->statistics, NUMBER_FILTERED_DELETES);
|
||||
RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ static std::string PrintContents(WriteBatch* b) {
|
||||
options.memtable_factory = factory;
|
||||
ImmutableCFOptions ioptions(options);
|
||||
MemTable* mem = new MemTable(cmp, ioptions,
|
||||
MemTableOptions(MutableCFOptions(options, ioptions), options));
|
||||
MutableCFOptions(options, ioptions));
|
||||
mem->Ref();
|
||||
std::string state;
|
||||
ColumnFamilyMemTablesDefault cf_mems_default(mem, &options);
|
||||
|
@ -5,6 +5,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "rocksdb/options.h"
|
||||
|
||||
@ -36,6 +37,13 @@ struct ImmutableCFOptions {
|
||||
|
||||
CompactionFilterFactoryV2* compaction_filter_factory_v2;
|
||||
|
||||
bool inplace_update_support;
|
||||
|
||||
UpdateStatus (*inplace_callback)(char* existing_value,
|
||||
uint32_t* existing_value_size,
|
||||
Slice delta_value,
|
||||
std::string* merged_value);
|
||||
|
||||
Logger* info_log;
|
||||
|
||||
Statistics* statistics;
|
||||
|
@ -439,7 +439,7 @@ class MemTableConstructor: public Constructor {
|
||||
options.memtable_factory = table_factory_;
|
||||
ImmutableCFOptions ioptions(options);
|
||||
memtable_ = new MemTable(internal_comparator_, ioptions,
|
||||
MemTableOptions(MutableCFOptions(options, ioptions), options));
|
||||
MutableCFOptions(options, ioptions));
|
||||
memtable_->Ref();
|
||||
}
|
||||
~MemTableConstructor() {
|
||||
@ -455,7 +455,7 @@ class MemTableConstructor: public Constructor {
|
||||
options.memtable_factory = table_factory_;
|
||||
ImmutableCFOptions mem_ioptions(options);
|
||||
memtable_ = new MemTable(internal_comparator_, mem_ioptions,
|
||||
MemTableOptions(MutableCFOptions(options, mem_ioptions), options));
|
||||
MutableCFOptions(options, mem_ioptions));
|
||||
memtable_->Ref();
|
||||
int seq = 1;
|
||||
for (KVMap::const_iterator it = data.begin();
|
||||
@ -1879,7 +1879,7 @@ TEST(MemTableTest, Simple) {
|
||||
options.memtable_factory = table_factory;
|
||||
ImmutableCFOptions ioptions(options);
|
||||
MemTable* memtable = new MemTable(cmp, ioptions,
|
||||
MemTableOptions(MutableCFOptions(options, ioptions), options));
|
||||
MutableCFOptions(options, ioptions));
|
||||
memtable->Ref();
|
||||
WriteBatch batch;
|
||||
WriteBatchInternal::SetSequence(&batch, 100);
|
||||
|
@ -22,6 +22,7 @@ struct MutableCFOptions {
|
||||
options.memtable_prefix_bloom_huge_page_tlb_size),
|
||||
max_successive_merges(options.max_successive_merges),
|
||||
filter_deletes(options.filter_deletes),
|
||||
inplace_update_num_locks(options.inplace_update_num_locks),
|
||||
disable_auto_compactions(options.disable_auto_compactions),
|
||||
soft_rate_limit(options.soft_rate_limit),
|
||||
hard_rate_limit(options.hard_rate_limit),
|
||||
@ -53,6 +54,7 @@ struct MutableCFOptions {
|
||||
memtable_prefix_bloom_huge_page_tlb_size(0),
|
||||
max_successive_merges(0),
|
||||
filter_deletes(false),
|
||||
inplace_update_num_locks(0),
|
||||
disable_auto_compactions(false),
|
||||
soft_rate_limit(0),
|
||||
hard_rate_limit(0),
|
||||
@ -94,6 +96,7 @@ struct MutableCFOptions {
|
||||
size_t memtable_prefix_bloom_huge_page_tlb_size;
|
||||
size_t max_successive_merges;
|
||||
bool filter_deletes;
|
||||
size_t inplace_update_num_locks;
|
||||
|
||||
// Compaction related options
|
||||
bool disable_auto_compactions;
|
||||
|
@ -42,6 +42,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
|
||||
compaction_filter(options.compaction_filter),
|
||||
compaction_filter_factory(options.compaction_filter_factory.get()),
|
||||
compaction_filter_factory_v2(options.compaction_filter_factory_v2.get()),
|
||||
inplace_update_support(options.inplace_update_support),
|
||||
inplace_callback(options.inplace_callback),
|
||||
info_log(options.info_log.get()),
|
||||
statistics(options.statistics.get()),
|
||||
env(options.env),
|
||||
|
@ -94,6 +94,8 @@ bool ParseMemtableOptions(const std::string& name, const std::string& value,
|
||||
new_options->filter_deletes = ParseBoolean(name, value);
|
||||
} else if (name == "max_write_buffer_number") {
|
||||
new_options->max_write_buffer_number = ParseInt(value);
|
||||
} else if (name == "inplace_update_num_locks") {
|
||||
new_options->inplace_update_num_locks = ParseInt64(value);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
@ -299,14 +301,12 @@ bool GetColumnFamilyOptionsFromMap(
|
||||
} else if (o.first == "compaction_options_fifo") {
|
||||
new_options->compaction_options_fifo.max_table_files_size
|
||||
= ParseUint64(o.second);
|
||||
} else if (o.first == "inplace_update_support") {
|
||||
new_options->inplace_update_support = ParseBoolean(o.first, o.second);
|
||||
} else if (o.first == "inplace_update_num_locks") {
|
||||
new_options->inplace_update_num_locks = ParseInt64(o.second);
|
||||
} else if (o.first == "bloom_locality") {
|
||||
new_options->bloom_locality = ParseUint32(o.second);
|
||||
} else if (o.first == "min_partial_merge_operands") {
|
||||
new_options->min_partial_merge_operands = ParseUint32(o.second);
|
||||
} else if (o.first == "inplace_update_support") {
|
||||
new_options->inplace_update_support = ParseBoolean(o.first, o.second);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user