diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index b3bd681b2..07d27a41a 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1362,6 +1362,7 @@ class MultiGetPrefixExtractorTest : public DBBasicTest, TEST_P(MultiGetPrefixExtractorTest, Batched) { Options options = CurrentOptions(); options.prefix_extractor.reset(NewFixedPrefixTransform(2)); + options.memtable_prefix_bloom_size_ratio = 10; BlockBasedTableOptions bbto; if (GetParam()) { bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; @@ -1373,17 +1374,30 @@ TEST_P(MultiGetPrefixExtractorTest, Batched) { options.table_factory.reset(NewBlockBasedTableFactory(bbto)); Reopen(options); + SetPerfLevel(kEnableCount); + get_perf_context()->Reset(); + // First key is not in the prefix_extractor domain ASSERT_OK(Put("k", "v0")); ASSERT_OK(Put("kk1", "v1")); ASSERT_OK(Put("kk2", "v2")); ASSERT_OK(Put("kk3", "v3")); ASSERT_OK(Put("kk4", "v4")); + std::vector mem_keys( + {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"}); + std::vector inmem_values; + inmem_values = MultiGet(mem_keys, nullptr); + ASSERT_EQ(inmem_values[0], "v0"); + ASSERT_EQ(inmem_values[1], "v1"); + ASSERT_EQ(inmem_values[2], "v2"); + ASSERT_EQ(inmem_values[3], "v3"); + ASSERT_EQ(inmem_values[4], "v4"); + ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2); + ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5); ASSERT_OK(Flush()); std::vector keys({"k", "kk1", "kk2", "kk3", "kk4"}); std::vector values; - SetPerfLevel(kEnableCount); get_perf_context()->Reset(); values = MultiGet(keys, nullptr); ASSERT_EQ(values[0], "v0"); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index d3b001d25..72cfb0be9 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1961,41 +1961,26 @@ void DBImpl::MultiGetImpl( keys_left -= batch_size; for (auto mget_iter = range.begin(); mget_iter != range.end(); ++mget_iter) { - MergeContext& merge_context = mget_iter->merge_context; - merge_context.Clear(); - Status& s = *mget_iter->s; - PinnableSlice* value = mget_iter->value; - s = Status::OK(); - - bool skip_memtable = - (read_options.read_tier == kPersistedTier && - has_unpersisted_data_.load(std::memory_order_relaxed)); - bool done = false; - if (!skip_memtable) { - if (super_version->mem->Get(*(mget_iter->lkey), value->GetSelf(), &s, - &merge_context, - &mget_iter->max_covering_tombstone_seq, - read_options, callback, is_blob_index)) { - done = true; - value->PinSelf(); - RecordTick(stats_, MEMTABLE_HIT); - } else if (super_version->imm->Get( - *(mget_iter->lkey), value->GetSelf(), &s, &merge_context, - &mget_iter->max_covering_tombstone_seq, read_options, - callback, is_blob_index)) { - done = true; - value->PinSelf(); - RecordTick(stats_, MEMTABLE_HIT); - } - } - if (done) { - range.MarkKeyDone(mget_iter); - } else { - RecordTick(stats_, MEMTABLE_MISS); - lookup_current = true; - } + mget_iter->merge_context.Clear(); + *mget_iter->s = Status::OK(); } + bool skip_memtable = + (read_options.read_tier == kPersistedTier && + has_unpersisted_data_.load(std::memory_order_relaxed)); + if (!skip_memtable) { + super_version->mem->MultiGet(read_options, &range, callback, + is_blob_index); + if (!range.empty()) { + super_version->imm->MultiGet(read_options, &range, callback, + is_blob_index); + } + if (!range.empty()) { + lookup_current = true; + uint64_t left = range.KeysLeft(); + RecordTick(stats_, MEMTABLE_MISS, left); + } + } if (lookup_current) { PERF_TIMER_GUARD(get_from_output_files_time); super_version->current->MultiGet(read_options, &range, callback, diff --git a/db/memtable.cc b/db/memtable.cc index dd6604514..8b2ddf0e1 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -10,9 +10,9 @@ #include "db/memtable.h" #include +#include #include #include - #include "db/dbformat.h" #include "db/merge_context.h" #include "db/merge_helper.h" @@ -804,6 +804,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, bloom_filter_->MayContain(prefix_extractor_->Transform(user_key)); } } + if (bloom_filter_ && !may_contain) { // iter is null if prefix bloom says the key does not exist PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); @@ -812,26 +813,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, if (bloom_filter_) { PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); } - Saver saver; - saver.status = s; - saver.found_final_value = &found_final_value; - saver.merge_in_progress = &merge_in_progress; - saver.key = &key; - saver.value = value; - saver.seq = kMaxSequenceNumber; - saver.mem = this; - saver.merge_context = merge_context; - saver.max_covering_tombstone_seq = *max_covering_tombstone_seq; - saver.merge_operator = moptions_.merge_operator; - saver.logger = moptions_.info_log; - saver.inplace_update_support = moptions_.inplace_update_support; - saver.statistics = moptions_.statistics; - saver.env_ = env_; - saver.callback_ = callback; - saver.is_blob_index = is_blob_index; - saver.do_merge = do_merge; - table_->Get(key, &saver, SaveValue); - *seq = saver.seq; + GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback, + is_blob_index, value, s, merge_context, seq, + &found_final_value, &merge_in_progress); } // No change to value, since we have not yet found a Put/Delete @@ -842,6 +826,103 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, return found_final_value; } +void MemTable::GetFromTable(const LookupKey& key, + SequenceNumber max_covering_tombstone_seq, + bool do_merge, ReadCallback* callback, + bool* is_blob_index, std::string* value, Status* s, + MergeContext* merge_context, SequenceNumber* seq, + bool* found_final_value, bool* merge_in_progress) { + Saver saver; + saver.status = s; + saver.found_final_value = found_final_value; + saver.merge_in_progress = merge_in_progress; + saver.key = &key; + saver.value = value; + saver.seq = kMaxSequenceNumber; + saver.mem = this; + saver.merge_context = merge_context; + saver.max_covering_tombstone_seq = max_covering_tombstone_seq; + saver.merge_operator = moptions_.merge_operator; + saver.logger = moptions_.info_log; + saver.inplace_update_support = moptions_.inplace_update_support; + saver.statistics = moptions_.statistics; + saver.env_ = env_; + saver.callback_ = callback; + saver.is_blob_index = is_blob_index; + saver.do_merge = do_merge; + table_->Get(key, &saver, SaveValue); + *seq = saver.seq; +} + +void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool* is_blob) { + // The sequence number is updated synchronously in version_set.h + if (IsEmpty()) { + // Avoiding recording stats for speed. + return; + } + PERF_TIMER_GUARD(get_from_memtable_time); + + MultiGetRange temp_range(*range, range->begin(), range->end()); + if (bloom_filter_) { + std::array keys; + std::array may_match = {{true}}; + autovector prefixes; + int num_keys = 0; + for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { + if (!prefix_extractor_) { + keys[num_keys++] = &iter->ukey; + } else if (prefix_extractor_->InDomain(iter->ukey)) { + prefixes.emplace_back(prefix_extractor_->Transform(iter->ukey)); + keys[num_keys++] = &prefixes.back(); + } + } + bloom_filter_->MayContain(num_keys, &keys[0], &may_match[0]); + int idx = 0; + for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { + if (prefix_extractor_ && !prefix_extractor_->InDomain(iter->ukey)) { + PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); + continue; + } + if (!may_match[idx]) { + temp_range.SkipKey(iter); + PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); + } else { + PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); + } + idx++; + } + } + for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { + SequenceNumber seq = kMaxSequenceNumber; + bool found_final_value{false}; + bool merge_in_progress = iter->s->IsMergeInProgress(); + std::unique_ptr range_del_iter( + NewRangeTombstoneIterator( + read_options, GetInternalKeySeqno(iter->lkey->internal_key()))); + if (range_del_iter != nullptr) { + iter->max_covering_tombstone_seq = std::max( + iter->max_covering_tombstone_seq, + range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key())); + } + GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, + callback, is_blob, iter->value->GetSelf(), iter->s, + &(iter->merge_context), &seq, &found_final_value, + &merge_in_progress); + + if (!found_final_value && merge_in_progress) { + *(iter->s) = Status::MergeInProgress(); + } + + if (found_final_value) { + iter->value->PinSelf(); + range->MarkKeyDone(iter); + RecordTick(moptions_.statistics, MEMTABLE_HIT); + } + } + PERF_COUNTER_ADD(get_from_memtable_count, 1); +} + void MemTable::Update(SequenceNumber seq, const Slice& key, const Slice& value) { diff --git a/db/memtable.h b/db/memtable.h index f316ab8e2..482e23716 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -26,6 +26,7 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" +#include "table/multiget_context.h" #include "util/dynamic_bloom.h" #include "util/hash.h" @@ -63,6 +64,7 @@ struct MemTablePostProcessInfo { uint64_t num_deletes = 0; }; +using MultiGetRange = MultiGetContext::Range; // Note: Many of the methods in this class have comments indicating that // external synchronization is required as these methods are not thread-safe. // It is up to higher layers of code to decide how to prevent concurrent @@ -221,6 +223,9 @@ class MemTable { read_opts, callback, is_blob_index, do_merge); } + void MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool* is_blob); + // Attempts to update the new_value inplace, else does normal Add // Pseudocode // if key exists in current memtable && prev_value is of type kTypeValue @@ -507,6 +512,13 @@ class MemTable { void UpdateFlushState(); void UpdateOldestKeyTime(); + + void GetFromTable(const LookupKey& key, + SequenceNumber max_covering_tombstone_seq, bool do_merge, + ReadCallback* callback, bool* is_blob_index, + std::string* value, Status* s, MergeContext* merge_context, + SequenceNumber* seq, bool* found_final_value, + bool* merge_in_progress); }; extern const char* EncodeKey(std::string* scratch, const Slice& target); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index e3f0732de..f0a0bc2a4 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -113,6 +113,17 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, is_blob_index); } +void MemTableListVersion::MultiGet(const ReadOptions& read_options, + MultiGetRange* range, ReadCallback* callback, + bool* is_blob) { + for (auto memtable : memlist_) { + memtable->MultiGet(read_options, range, callback, is_blob); + if (range->empty()) { + return; + } + } +} + bool MemTableListVersion::GetMergeOperands( const LookupKey& key, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) { diff --git a/db/memtable_list.h b/db/memtable_list.h index 75cc1a524..0884e1ec0 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -72,6 +72,9 @@ class MemTableListVersion { read_opts, callback, is_blob_index); } + void MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool* is_blob); + // Returns all the merge operands corresponding to the key by searching all // memtables starting from the most recent one. bool GetMergeOperands(const LookupKey& key, Status* s, diff --git a/table/multiget_context.h b/table/multiget_context.h index 88ec4dcc4..fe6bbc3bf 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -229,6 +229,16 @@ class MultiGetContext { return ctx_->value_mask_ & (1ull << iter.index_); } + uint64_t KeysLeft() { + uint64_t new_val = skip_mask_ | ctx_->value_mask_; + uint64_t count = 0; + while (new_val) { + new_val = new_val & (new_val - 1); + count++; + } + return end_ - count; + } + private: friend MultiGetContext; MultiGetContext* ctx_; diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index 312d2805b..eb19c369d 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -5,11 +5,11 @@ #pragma once +#include #include - -#include "rocksdb/slice.h" - #include "port/port.h" +#include "rocksdb/slice.h" +#include "table/multiget_context.h" #include "util/hash.h" #include @@ -65,6 +65,8 @@ class DynamicBloom { // Multithreaded access to this function is OK bool MayContain(const Slice& key) const; + void MayContain(int num_keys, Slice** keys, bool* may_match) const; + // Multithreaded access to this function is OK bool MayContainHash(uint32_t hash) const; @@ -84,6 +86,8 @@ class DynamicBloom { // concurrency safety, working with bytes. template void AddHash(uint32_t hash, const OrFunc& or_func); + + bool DoubleProbe(uint32_t h32, size_t a) const; }; inline void DynamicBloom::Add(const Slice& key) { AddHash(BloomHash(key)); } @@ -116,6 +120,22 @@ inline bool DynamicBloom::MayContain(const Slice& key) const { return (MayContainHash(BloomHash(key))); } +inline void DynamicBloom::MayContain(int num_keys, Slice** keys, + bool* may_match) const { + std::array hashes; + std::array byte_offsets; + for (int i = 0; i < num_keys; ++i) { + hashes[i] = BloomHash(*keys[i]); + size_t a = fastrange32(kLen, hashes[i]); + PREFETCH(data_ + a, 0, 3); + byte_offsets[i] = a; + } + + for (int i = 0; i < num_keys; i++) { + may_match[i] = DoubleProbe(hashes[i], byte_offsets[i]); + } +} + #if defined(_MSC_VER) #pragma warning(push) // local variable is initialized but not referenced @@ -153,13 +173,17 @@ inline void DynamicBloom::Prefetch(uint32_t h32) { inline bool DynamicBloom::MayContainHash(uint32_t h32) const { size_t a = fastrange32(kLen, h32); PREFETCH(data_ + a, 0, 3); + return DoubleProbe(h32, a); +} + +inline bool DynamicBloom::DoubleProbe(uint32_t h32, size_t byte_offset) const { // Expand/remix with 64-bit golden ratio uint64_t h = 0x9e3779b97f4a7c13ULL * h32; for (unsigned i = 0;; ++i) { // Two bit probes per uint64_t probe uint64_t mask = ((uint64_t)1 << (h & 63)) | ((uint64_t)1 << ((h >> 6) & 63)); - uint64_t val = data_[a ^ i].load(std::memory_order_relaxed); + uint64_t val = data_[byte_offset ^ i].load(std::memory_order_relaxed); if (i + 1 >= kNumDoubleProbes) { return (val & mask) == mask; } else if ((val & mask) != mask) {