From 4c49e38f15ad2d575d5768742e961e6eb85a44c0 Mon Sep 17 00:00:00 2001 From: Vijay Nadimpalli Date: Thu, 10 Oct 2019 09:37:38 -0700 Subject: [PATCH] MultiGet batching in memtable (#5818) Summary: RocksDB has a MultiGet() API that implements batched key lookup for higher performance (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L468). Currently, batching is implemented in BlockBasedTableReader::MultiGet() for SST file lookups. One of the ways it improves performance is by pipelining bloom filter lookups (by prefetching required cachelines for all the keys in the batch, and then doing the probe) and thus hiding the cache miss latency. The same concept can be extended to the memtable as well. This PR involves implementing a pipelined bloom filter lookup in DynamicBloom, and implementing MemTable::MultiGet() that can leverage it. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5818 Test Plan: Existing tests Performance Test: Ran the below command which fills up the memtable and makes sure there are no flushes and then call multiget. Ran it on master and on the new change and see atleast 1% performance improvement across all the test runs I did. Sometimes the improvement was upto 5%. TEST_TMPDIR=/data/users/$USER/benchmarks/feature/ numactl -C 10 ./db_bench -benchmarks="fillseq,multireadrandom" -num=600000 -compression_type="none" -level_compaction_dynamic_level_bytes -write_buffer_size=200000000 -target_file_size_base=200000000 -max_bytes_for_level_base=16777216 -reads=90000 -threads=1 -compression_type=none -cache_size=4194304000 -batch_size=32 -disable_auto_compactions=true -bloom_bits=10 -cache_index_and_filter_blocks=true -pin_l0_filter_and_index_blocks_in_cache=true -multiread_batched=true -multiread_stride=4 -statistics -memtable_whole_key_filtering=true -memtable_bloom_size_ratio=10 Differential Revision: D17578869 Pulled By: vjnadimpalli fbshipit-source-id: 23dc651d9bf49db11d22375bf435708875a1f192 --- db/db_basic_test.cc | 16 ++++- db/db_impl/db_impl.cc | 51 ++++++---------- db/memtable.cc | 123 ++++++++++++++++++++++++++++++++------- db/memtable.h | 12 ++++ db/memtable_list.cc | 11 ++++ db/memtable_list.h | 3 + table/multiget_context.h | 10 ++++ util/dynamic_bloom.h | 32 ++++++++-- 8 files changed, 199 insertions(+), 59 deletions(-) diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index b3bd681b2..07d27a41a 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1362,6 +1362,7 @@ class MultiGetPrefixExtractorTest : public DBBasicTest, TEST_P(MultiGetPrefixExtractorTest, Batched) { Options options = CurrentOptions(); options.prefix_extractor.reset(NewFixedPrefixTransform(2)); + options.memtable_prefix_bloom_size_ratio = 10; BlockBasedTableOptions bbto; if (GetParam()) { bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; @@ -1373,17 +1374,30 @@ TEST_P(MultiGetPrefixExtractorTest, Batched) { options.table_factory.reset(NewBlockBasedTableFactory(bbto)); Reopen(options); + SetPerfLevel(kEnableCount); + get_perf_context()->Reset(); + // First key is not in the prefix_extractor domain ASSERT_OK(Put("k", "v0")); ASSERT_OK(Put("kk1", "v1")); ASSERT_OK(Put("kk2", "v2")); ASSERT_OK(Put("kk3", "v3")); ASSERT_OK(Put("kk4", "v4")); + std::vector mem_keys( + {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"}); + std::vector inmem_values; + inmem_values = MultiGet(mem_keys, nullptr); + ASSERT_EQ(inmem_values[0], "v0"); + ASSERT_EQ(inmem_values[1], "v1"); + ASSERT_EQ(inmem_values[2], "v2"); + ASSERT_EQ(inmem_values[3], "v3"); + ASSERT_EQ(inmem_values[4], "v4"); + ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2); + ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5); ASSERT_OK(Flush()); std::vector keys({"k", "kk1", "kk2", "kk3", "kk4"}); std::vector values; - SetPerfLevel(kEnableCount); get_perf_context()->Reset(); values = MultiGet(keys, nullptr); ASSERT_EQ(values[0], "v0"); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index d3b001d25..72cfb0be9 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1961,41 +1961,26 @@ void DBImpl::MultiGetImpl( keys_left -= batch_size; for (auto mget_iter = range.begin(); mget_iter != range.end(); ++mget_iter) { - MergeContext& merge_context = mget_iter->merge_context; - merge_context.Clear(); - Status& s = *mget_iter->s; - PinnableSlice* value = mget_iter->value; - s = Status::OK(); - - bool skip_memtable = - (read_options.read_tier == kPersistedTier && - has_unpersisted_data_.load(std::memory_order_relaxed)); - bool done = false; - if (!skip_memtable) { - if (super_version->mem->Get(*(mget_iter->lkey), value->GetSelf(), &s, - &merge_context, - &mget_iter->max_covering_tombstone_seq, - read_options, callback, is_blob_index)) { - done = true; - value->PinSelf(); - RecordTick(stats_, MEMTABLE_HIT); - } else if (super_version->imm->Get( - *(mget_iter->lkey), value->GetSelf(), &s, &merge_context, - &mget_iter->max_covering_tombstone_seq, read_options, - callback, is_blob_index)) { - done = true; - value->PinSelf(); - RecordTick(stats_, MEMTABLE_HIT); - } - } - if (done) { - range.MarkKeyDone(mget_iter); - } else { - RecordTick(stats_, MEMTABLE_MISS); - lookup_current = true; - } + mget_iter->merge_context.Clear(); + *mget_iter->s = Status::OK(); } + bool skip_memtable = + (read_options.read_tier == kPersistedTier && + has_unpersisted_data_.load(std::memory_order_relaxed)); + if (!skip_memtable) { + super_version->mem->MultiGet(read_options, &range, callback, + is_blob_index); + if (!range.empty()) { + super_version->imm->MultiGet(read_options, &range, callback, + is_blob_index); + } + if (!range.empty()) { + lookup_current = true; + uint64_t left = range.KeysLeft(); + RecordTick(stats_, MEMTABLE_MISS, left); + } + } if (lookup_current) { PERF_TIMER_GUARD(get_from_output_files_time); super_version->current->MultiGet(read_options, &range, callback, diff --git a/db/memtable.cc b/db/memtable.cc index dd6604514..8b2ddf0e1 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -10,9 +10,9 @@ #include "db/memtable.h" #include +#include #include #include - #include "db/dbformat.h" #include "db/merge_context.h" #include "db/merge_helper.h" @@ -804,6 +804,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, bloom_filter_->MayContain(prefix_extractor_->Transform(user_key)); } } + if (bloom_filter_ && !may_contain) { // iter is null if prefix bloom says the key does not exist PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); @@ -812,26 +813,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, if (bloom_filter_) { PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); } - Saver saver; - saver.status = s; - saver.found_final_value = &found_final_value; - saver.merge_in_progress = &merge_in_progress; - saver.key = &key; - saver.value = value; - saver.seq = kMaxSequenceNumber; - saver.mem = this; - saver.merge_context = merge_context; - saver.max_covering_tombstone_seq = *max_covering_tombstone_seq; - saver.merge_operator = moptions_.merge_operator; - saver.logger = moptions_.info_log; - saver.inplace_update_support = moptions_.inplace_update_support; - saver.statistics = moptions_.statistics; - saver.env_ = env_; - saver.callback_ = callback; - saver.is_blob_index = is_blob_index; - saver.do_merge = do_merge; - table_->Get(key, &saver, SaveValue); - *seq = saver.seq; + GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback, + is_blob_index, value, s, merge_context, seq, + &found_final_value, &merge_in_progress); } // No change to value, since we have not yet found a Put/Delete @@ -842,6 +826,103 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, return found_final_value; } +void MemTable::GetFromTable(const LookupKey& key, + SequenceNumber max_covering_tombstone_seq, + bool do_merge, ReadCallback* callback, + bool* is_blob_index, std::string* value, Status* s, + MergeContext* merge_context, SequenceNumber* seq, + bool* found_final_value, bool* merge_in_progress) { + Saver saver; + saver.status = s; + saver.found_final_value = found_final_value; + saver.merge_in_progress = merge_in_progress; + saver.key = &key; + saver.value = value; + saver.seq = kMaxSequenceNumber; + saver.mem = this; + saver.merge_context = merge_context; + saver.max_covering_tombstone_seq = max_covering_tombstone_seq; + saver.merge_operator = moptions_.merge_operator; + saver.logger = moptions_.info_log; + saver.inplace_update_support = moptions_.inplace_update_support; + saver.statistics = moptions_.statistics; + saver.env_ = env_; + saver.callback_ = callback; + saver.is_blob_index = is_blob_index; + saver.do_merge = do_merge; + table_->Get(key, &saver, SaveValue); + *seq = saver.seq; +} + +void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool* is_blob) { + // The sequence number is updated synchronously in version_set.h + if (IsEmpty()) { + // Avoiding recording stats for speed. + return; + } + PERF_TIMER_GUARD(get_from_memtable_time); + + MultiGetRange temp_range(*range, range->begin(), range->end()); + if (bloom_filter_) { + std::array keys; + std::array may_match = {{true}}; + autovector prefixes; + int num_keys = 0; + for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { + if (!prefix_extractor_) { + keys[num_keys++] = &iter->ukey; + } else if (prefix_extractor_->InDomain(iter->ukey)) { + prefixes.emplace_back(prefix_extractor_->Transform(iter->ukey)); + keys[num_keys++] = &prefixes.back(); + } + } + bloom_filter_->MayContain(num_keys, &keys[0], &may_match[0]); + int idx = 0; + for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { + if (prefix_extractor_ && !prefix_extractor_->InDomain(iter->ukey)) { + PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); + continue; + } + if (!may_match[idx]) { + temp_range.SkipKey(iter); + PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); + } else { + PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); + } + idx++; + } + } + for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { + SequenceNumber seq = kMaxSequenceNumber; + bool found_final_value{false}; + bool merge_in_progress = iter->s->IsMergeInProgress(); + std::unique_ptr range_del_iter( + NewRangeTombstoneIterator( + read_options, GetInternalKeySeqno(iter->lkey->internal_key()))); + if (range_del_iter != nullptr) { + iter->max_covering_tombstone_seq = std::max( + iter->max_covering_tombstone_seq, + range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key())); + } + GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, + callback, is_blob, iter->value->GetSelf(), iter->s, + &(iter->merge_context), &seq, &found_final_value, + &merge_in_progress); + + if (!found_final_value && merge_in_progress) { + *(iter->s) = Status::MergeInProgress(); + } + + if (found_final_value) { + iter->value->PinSelf(); + range->MarkKeyDone(iter); + RecordTick(moptions_.statistics, MEMTABLE_HIT); + } + } + PERF_COUNTER_ADD(get_from_memtable_count, 1); +} + void MemTable::Update(SequenceNumber seq, const Slice& key, const Slice& value) { diff --git a/db/memtable.h b/db/memtable.h index f316ab8e2..482e23716 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -26,6 +26,7 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" +#include "table/multiget_context.h" #include "util/dynamic_bloom.h" #include "util/hash.h" @@ -63,6 +64,7 @@ struct MemTablePostProcessInfo { uint64_t num_deletes = 0; }; +using MultiGetRange = MultiGetContext::Range; // Note: Many of the methods in this class have comments indicating that // external synchronization is required as these methods are not thread-safe. // It is up to higher layers of code to decide how to prevent concurrent @@ -221,6 +223,9 @@ class MemTable { read_opts, callback, is_blob_index, do_merge); } + void MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool* is_blob); + // Attempts to update the new_value inplace, else does normal Add // Pseudocode // if key exists in current memtable && prev_value is of type kTypeValue @@ -507,6 +512,13 @@ class MemTable { void UpdateFlushState(); void UpdateOldestKeyTime(); + + void GetFromTable(const LookupKey& key, + SequenceNumber max_covering_tombstone_seq, bool do_merge, + ReadCallback* callback, bool* is_blob_index, + std::string* value, Status* s, MergeContext* merge_context, + SequenceNumber* seq, bool* found_final_value, + bool* merge_in_progress); }; extern const char* EncodeKey(std::string* scratch, const Slice& target); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index e3f0732de..f0a0bc2a4 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -113,6 +113,17 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, is_blob_index); } +void MemTableListVersion::MultiGet(const ReadOptions& read_options, + MultiGetRange* range, ReadCallback* callback, + bool* is_blob) { + for (auto memtable : memlist_) { + memtable->MultiGet(read_options, range, callback, is_blob); + if (range->empty()) { + return; + } + } +} + bool MemTableListVersion::GetMergeOperands( const LookupKey& key, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) { diff --git a/db/memtable_list.h b/db/memtable_list.h index 75cc1a524..0884e1ec0 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -72,6 +72,9 @@ class MemTableListVersion { read_opts, callback, is_blob_index); } + void MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool* is_blob); + // Returns all the merge operands corresponding to the key by searching all // memtables starting from the most recent one. bool GetMergeOperands(const LookupKey& key, Status* s, diff --git a/table/multiget_context.h b/table/multiget_context.h index 88ec4dcc4..fe6bbc3bf 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -229,6 +229,16 @@ class MultiGetContext { return ctx_->value_mask_ & (1ull << iter.index_); } + uint64_t KeysLeft() { + uint64_t new_val = skip_mask_ | ctx_->value_mask_; + uint64_t count = 0; + while (new_val) { + new_val = new_val & (new_val - 1); + count++; + } + return end_ - count; + } + private: friend MultiGetContext; MultiGetContext* ctx_; diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index 312d2805b..eb19c369d 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -5,11 +5,11 @@ #pragma once +#include #include - -#include "rocksdb/slice.h" - #include "port/port.h" +#include "rocksdb/slice.h" +#include "table/multiget_context.h" #include "util/hash.h" #include @@ -65,6 +65,8 @@ class DynamicBloom { // Multithreaded access to this function is OK bool MayContain(const Slice& key) const; + void MayContain(int num_keys, Slice** keys, bool* may_match) const; + // Multithreaded access to this function is OK bool MayContainHash(uint32_t hash) const; @@ -84,6 +86,8 @@ class DynamicBloom { // concurrency safety, working with bytes. template void AddHash(uint32_t hash, const OrFunc& or_func); + + bool DoubleProbe(uint32_t h32, size_t a) const; }; inline void DynamicBloom::Add(const Slice& key) { AddHash(BloomHash(key)); } @@ -116,6 +120,22 @@ inline bool DynamicBloom::MayContain(const Slice& key) const { return (MayContainHash(BloomHash(key))); } +inline void DynamicBloom::MayContain(int num_keys, Slice** keys, + bool* may_match) const { + std::array hashes; + std::array byte_offsets; + for (int i = 0; i < num_keys; ++i) { + hashes[i] = BloomHash(*keys[i]); + size_t a = fastrange32(kLen, hashes[i]); + PREFETCH(data_ + a, 0, 3); + byte_offsets[i] = a; + } + + for (int i = 0; i < num_keys; i++) { + may_match[i] = DoubleProbe(hashes[i], byte_offsets[i]); + } +} + #if defined(_MSC_VER) #pragma warning(push) // local variable is initialized but not referenced @@ -153,13 +173,17 @@ inline void DynamicBloom::Prefetch(uint32_t h32) { inline bool DynamicBloom::MayContainHash(uint32_t h32) const { size_t a = fastrange32(kLen, h32); PREFETCH(data_ + a, 0, 3); + return DoubleProbe(h32, a); +} + +inline bool DynamicBloom::DoubleProbe(uint32_t h32, size_t byte_offset) const { // Expand/remix with 64-bit golden ratio uint64_t h = 0x9e3779b97f4a7c13ULL * h32; for (unsigned i = 0;; ++i) { // Two bit probes per uint64_t probe uint64_t mask = ((uint64_t)1 << (h & 63)) | ((uint64_t)1 << ((h >> 6) & 63)); - uint64_t val = data_[a ^ i].load(std::memory_order_relaxed); + uint64_t val = data_[byte_offset ^ i].load(std::memory_order_relaxed); if (i + 1 >= kNumDoubleProbes) { return (val & mask) == mask; } else if ((val & mask) != mask) {