MultiGet batching in memtable (#5818)

Summary:
RocksDB has a MultiGet() API that implements batched key lookup for higher performance (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L468). Currently, batching is implemented in BlockBasedTableReader::MultiGet() for SST file lookups. One of the ways it improves performance is by pipelining bloom filter lookups (by prefetching required cachelines for all the keys in the batch, and then doing the probe) and thus hiding the cache miss latency. The same concept can be extended to the memtable as well. This PR involves implementing a pipelined bloom filter lookup in DynamicBloom, and implementing MemTable::MultiGet() that can leverage it.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5818

Test Plan:
Existing tests

Performance Test:
Ran the below command which fills up the memtable and makes sure there are no flushes and then call multiget. Ran it on master and on the new change and see atleast 1% performance improvement across all the test runs I did. Sometimes the improvement was upto 5%.

TEST_TMPDIR=/data/users/$USER/benchmarks/feature/ numactl -C 10 ./db_bench -benchmarks="fillseq,multireadrandom" -num=600000 -compression_type="none" -level_compaction_dynamic_level_bytes -write_buffer_size=200000000 -target_file_size_base=200000000 -max_bytes_for_level_base=16777216 -reads=90000 -threads=1 -compression_type=none -cache_size=4194304000 -batch_size=32 -disable_auto_compactions=true -bloom_bits=10 -cache_index_and_filter_blocks=true -pin_l0_filter_and_index_blocks_in_cache=true -multiread_batched=true -multiread_stride=4 -statistics -memtable_whole_key_filtering=true -memtable_bloom_size_ratio=10

Differential Revision: D17578869

Pulled By: vjnadimpalli

fbshipit-source-id: 23dc651d9bf49db11d22375bf435708875a1f192
This commit is contained in:
Vijay Nadimpalli 2019-10-10 09:37:38 -07:00 committed by Facebook Github Bot
parent 80ad996b35
commit 4c49e38f15
8 changed files with 199 additions and 59 deletions

View File

@ -1362,6 +1362,7 @@ class MultiGetPrefixExtractorTest : public DBBasicTest,
TEST_P(MultiGetPrefixExtractorTest, Batched) {
Options options = CurrentOptions();
options.prefix_extractor.reset(NewFixedPrefixTransform(2));
options.memtable_prefix_bloom_size_ratio = 10;
BlockBasedTableOptions bbto;
if (GetParam()) {
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
@ -1373,17 +1374,30 @@ TEST_P(MultiGetPrefixExtractorTest, Batched) {
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
SetPerfLevel(kEnableCount);
get_perf_context()->Reset();
// First key is not in the prefix_extractor domain
ASSERT_OK(Put("k", "v0"));
ASSERT_OK(Put("kk1", "v1"));
ASSERT_OK(Put("kk2", "v2"));
ASSERT_OK(Put("kk3", "v3"));
ASSERT_OK(Put("kk4", "v4"));
std::vector<std::string> mem_keys(
{"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
std::vector<std::string> inmem_values;
inmem_values = MultiGet(mem_keys, nullptr);
ASSERT_EQ(inmem_values[0], "v0");
ASSERT_EQ(inmem_values[1], "v1");
ASSERT_EQ(inmem_values[2], "v2");
ASSERT_EQ(inmem_values[3], "v3");
ASSERT_EQ(inmem_values[4], "v4");
ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5);
ASSERT_OK(Flush());
std::vector<std::string> keys({"k", "kk1", "kk2", "kk3", "kk4"});
std::vector<std::string> values;
SetPerfLevel(kEnableCount);
get_perf_context()->Reset();
values = MultiGet(keys, nullptr);
ASSERT_EQ(values[0], "v0");

View File

@ -1961,41 +1961,26 @@ void DBImpl::MultiGetImpl(
keys_left -= batch_size;
for (auto mget_iter = range.begin(); mget_iter != range.end();
++mget_iter) {
MergeContext& merge_context = mget_iter->merge_context;
merge_context.Clear();
Status& s = *mget_iter->s;
PinnableSlice* value = mget_iter->value;
s = Status::OK();
bool skip_memtable =
(read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
if (!skip_memtable) {
if (super_version->mem->Get(*(mget_iter->lkey), value->GetSelf(), &s,
&merge_context,
&mget_iter->max_covering_tombstone_seq,
read_options, callback, is_blob_index)) {
done = true;
value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if (super_version->imm->Get(
*(mget_iter->lkey), value->GetSelf(), &s, &merge_context,
&mget_iter->max_covering_tombstone_seq, read_options,
callback, is_blob_index)) {
done = true;
value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
}
}
if (done) {
range.MarkKeyDone(mget_iter);
} else {
RecordTick(stats_, MEMTABLE_MISS);
lookup_current = true;
}
mget_iter->merge_context.Clear();
*mget_iter->s = Status::OK();
}
bool skip_memtable =
(read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
if (!skip_memtable) {
super_version->mem->MultiGet(read_options, &range, callback,
is_blob_index);
if (!range.empty()) {
super_version->imm->MultiGet(read_options, &range, callback,
is_blob_index);
}
if (!range.empty()) {
lookup_current = true;
uint64_t left = range.KeysLeft();
RecordTick(stats_, MEMTABLE_MISS, left);
}
}
if (lookup_current) {
PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->MultiGet(read_options, &range, callback,

View File

@ -10,9 +10,9 @@
#include "db/memtable.h"
#include <algorithm>
#include <array>
#include <limits>
#include <memory>
#include "db/dbformat.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
@ -804,6 +804,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
}
}
if (bloom_filter_ && !may_contain) {
// iter is null if prefix bloom says the key does not exist
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
@ -812,26 +813,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
if (bloom_filter_) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
Saver saver;
saver.status = s;
saver.found_final_value = &found_final_value;
saver.merge_in_progress = &merge_in_progress;
saver.key = &key;
saver.value = value;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
saver.max_covering_tombstone_seq = *max_covering_tombstone_seq;
saver.merge_operator = moptions_.merge_operator;
saver.logger = moptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;
saver.statistics = moptions_.statistics;
saver.env_ = env_;
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
saver.do_merge = do_merge;
table_->Get(key, &saver, SaveValue);
*seq = saver.seq;
GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
is_blob_index, value, s, merge_context, seq,
&found_final_value, &merge_in_progress);
}
// No change to value, since we have not yet found a Put/Delete
@ -842,6 +826,103 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
return found_final_value;
}
void MemTable::GetFromTable(const LookupKey& key,
SequenceNumber max_covering_tombstone_seq,
bool do_merge, ReadCallback* callback,
bool* is_blob_index, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* seq,
bool* found_final_value, bool* merge_in_progress) {
Saver saver;
saver.status = s;
saver.found_final_value = found_final_value;
saver.merge_in_progress = merge_in_progress;
saver.key = &key;
saver.value = value;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
saver.max_covering_tombstone_seq = max_covering_tombstone_seq;
saver.merge_operator = moptions_.merge_operator;
saver.logger = moptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;
saver.statistics = moptions_.statistics;
saver.env_ = env_;
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
saver.do_merge = do_merge;
table_->Get(key, &saver, SaveValue);
*seq = saver.seq;
}
void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool* is_blob) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
return;
}
PERF_TIMER_GUARD(get_from_memtable_time);
MultiGetRange temp_range(*range, range->begin(), range->end());
if (bloom_filter_) {
std::array<Slice*, MultiGetContext::MAX_BATCH_SIZE> keys;
std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match = {{true}};
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> prefixes;
int num_keys = 0;
for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
if (!prefix_extractor_) {
keys[num_keys++] = &iter->ukey;
} else if (prefix_extractor_->InDomain(iter->ukey)) {
prefixes.emplace_back(prefix_extractor_->Transform(iter->ukey));
keys[num_keys++] = &prefixes.back();
}
}
bloom_filter_->MayContain(num_keys, &keys[0], &may_match[0]);
int idx = 0;
for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
if (prefix_extractor_ && !prefix_extractor_->InDomain(iter->ukey)) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
continue;
}
if (!may_match[idx]) {
temp_range.SkipKey(iter);
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
} else {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
idx++;
}
}
for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
SequenceNumber seq = kMaxSequenceNumber;
bool found_final_value{false};
bool merge_in_progress = iter->s->IsMergeInProgress();
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
NewRangeTombstoneIterator(
read_options, GetInternalKeySeqno(iter->lkey->internal_key())));
if (range_del_iter != nullptr) {
iter->max_covering_tombstone_seq = std::max(
iter->max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
}
GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
callback, is_blob, iter->value->GetSelf(), iter->s,
&(iter->merge_context), &seq, &found_final_value,
&merge_in_progress);
if (!found_final_value && merge_in_progress) {
*(iter->s) = Status::MergeInProgress();
}
if (found_final_value) {
iter->value->PinSelf();
range->MarkKeyDone(iter);
RecordTick(moptions_.statistics, MEMTABLE_HIT);
}
}
PERF_COUNTER_ADD(get_from_memtable_count, 1);
}
void MemTable::Update(SequenceNumber seq,
const Slice& key,
const Slice& value) {

View File

@ -26,6 +26,7 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "table/multiget_context.h"
#include "util/dynamic_bloom.h"
#include "util/hash.h"
@ -63,6 +64,7 @@ struct MemTablePostProcessInfo {
uint64_t num_deletes = 0;
};
using MultiGetRange = MultiGetContext::Range;
// Note: Many of the methods in this class have comments indicating that
// external synchronization is required as these methods are not thread-safe.
// It is up to higher layers of code to decide how to prevent concurrent
@ -221,6 +223,9 @@ class MemTable {
read_opts, callback, is_blob_index, do_merge);
}
void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool* is_blob);
// Attempts to update the new_value inplace, else does normal Add
// Pseudocode
// if key exists in current memtable && prev_value is of type kTypeValue
@ -507,6 +512,13 @@ class MemTable {
void UpdateFlushState();
void UpdateOldestKeyTime();
void GetFromTable(const LookupKey& key,
SequenceNumber max_covering_tombstone_seq, bool do_merge,
ReadCallback* callback, bool* is_blob_index,
std::string* value, Status* s, MergeContext* merge_context,
SequenceNumber* seq, bool* found_final_value,
bool* merge_in_progress);
};
extern const char* EncodeKey(std::string* scratch, const Slice& target);

View File

@ -113,6 +113,17 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
is_blob_index);
}
void MemTableListVersion::MultiGet(const ReadOptions& read_options,
MultiGetRange* range, ReadCallback* callback,
bool* is_blob) {
for (auto memtable : memlist_) {
memtable->MultiGet(read_options, range, callback, is_blob);
if (range->empty()) {
return;
}
}
}
bool MemTableListVersion::GetMergeOperands(
const LookupKey& key, Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {

View File

@ -72,6 +72,9 @@ class MemTableListVersion {
read_opts, callback, is_blob_index);
}
void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool* is_blob);
// Returns all the merge operands corresponding to the key by searching all
// memtables starting from the most recent one.
bool GetMergeOperands(const LookupKey& key, Status* s,

View File

@ -229,6 +229,16 @@ class MultiGetContext {
return ctx_->value_mask_ & (1ull << iter.index_);
}
uint64_t KeysLeft() {
uint64_t new_val = skip_mask_ | ctx_->value_mask_;
uint64_t count = 0;
while (new_val) {
new_val = new_val & (new_val - 1);
count++;
}
return end_ - count;
}
private:
friend MultiGetContext;
MultiGetContext* ctx_;

View File

@ -5,11 +5,11 @@
#pragma once
#include <array>
#include <string>
#include "rocksdb/slice.h"
#include "port/port.h"
#include "rocksdb/slice.h"
#include "table/multiget_context.h"
#include "util/hash.h"
#include <atomic>
@ -65,6 +65,8 @@ class DynamicBloom {
// Multithreaded access to this function is OK
bool MayContain(const Slice& key) const;
void MayContain(int num_keys, Slice** keys, bool* may_match) const;
// Multithreaded access to this function is OK
bool MayContainHash(uint32_t hash) const;
@ -84,6 +86,8 @@ class DynamicBloom {
// concurrency safety, working with bytes.
template <typename OrFunc>
void AddHash(uint32_t hash, const OrFunc& or_func);
bool DoubleProbe(uint32_t h32, size_t a) const;
};
inline void DynamicBloom::Add(const Slice& key) { AddHash(BloomHash(key)); }
@ -116,6 +120,22 @@ inline bool DynamicBloom::MayContain(const Slice& key) const {
return (MayContainHash(BloomHash(key)));
}
inline void DynamicBloom::MayContain(int num_keys, Slice** keys,
bool* may_match) const {
std::array<uint32_t, MultiGetContext::MAX_BATCH_SIZE> hashes;
std::array<size_t, MultiGetContext::MAX_BATCH_SIZE> byte_offsets;
for (int i = 0; i < num_keys; ++i) {
hashes[i] = BloomHash(*keys[i]);
size_t a = fastrange32(kLen, hashes[i]);
PREFETCH(data_ + a, 0, 3);
byte_offsets[i] = a;
}
for (int i = 0; i < num_keys; i++) {
may_match[i] = DoubleProbe(hashes[i], byte_offsets[i]);
}
}
#if defined(_MSC_VER)
#pragma warning(push)
// local variable is initialized but not referenced
@ -153,13 +173,17 @@ inline void DynamicBloom::Prefetch(uint32_t h32) {
inline bool DynamicBloom::MayContainHash(uint32_t h32) const {
size_t a = fastrange32(kLen, h32);
PREFETCH(data_ + a, 0, 3);
return DoubleProbe(h32, a);
}
inline bool DynamicBloom::DoubleProbe(uint32_t h32, size_t byte_offset) const {
// Expand/remix with 64-bit golden ratio
uint64_t h = 0x9e3779b97f4a7c13ULL * h32;
for (unsigned i = 0;; ++i) {
// Two bit probes per uint64_t probe
uint64_t mask =
((uint64_t)1 << (h & 63)) | ((uint64_t)1 << ((h >> 6) & 63));
uint64_t val = data_[a ^ i].load(std::memory_order_relaxed);
uint64_t val = data_[byte_offset ^ i].load(std::memory_order_relaxed);
if (i + 1 >= kNumDoubleProbes) {
return (val & mask) == mask;
} else if ((val & mask) != mask) {