diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index dc77fb91a..9b820f1a3 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -16,6 +16,9 @@ #if !defined(ROCKSDB_LITE) #include "test_util/sync_point.h" #endif +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/string_append/stringappend2.h" namespace rocksdb { @@ -1336,6 +1339,106 @@ TEST_F(DBBasicTest, GetAllKeyVersions) { } #endif // !ROCKSDB_LITE +TEST_F(DBBasicTest, GetMergeOperands) { + class LimitedStringAppendMergeOp : public StringAppendTESTOperator { + public: + LimitedStringAppendMergeOp(int limit, char delim) + : StringAppendTESTOperator(delim), limit_(limit) {} + + const char* Name() const override { + return "DBMergeOperatorTest::LimitedStringAppendMergeOp"; + } + + bool ShouldMerge(const std::vector& operands) const override { + if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) { + return true; + } + return false; + } + + private: + size_t limit_ = 0; + }; + std::vector rest; + int* a = new int(); + *a = 5; + rest.push_back(*a); + Options options; + options.create_if_missing = true; + // Use only the latest two merge operands. + options.merge_operator = + std::make_shared(2, ','); + options.env = env_; + Reopen(options); + // All K1 values are in memtable. + ASSERT_OK(Merge("k1", "a")); + Put("k1", "asd"); + ASSERT_OK(Merge("k1", "b")); + ASSERT_OK(Merge("k1", "c")); + ASSERT_OK(Merge("k1", "d")); + std::vector values; + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", &values); + for(PinnableSlice& value: values) { + std::cout << *value.GetSelf() << "\n"; + } + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok()); + // Make sure that only the latest two merge operands are used. If this was + // not the case the value would be "a,b,c,d". + ASSERT_EQ(value, "c,d"); + + // All K2 values are flushed to L0 into a single file. + ASSERT_OK(Merge("k2", "a")); + ASSERT_OK(Merge("k2", "b")); + ASSERT_OK(Merge("k2", "c")); + ASSERT_OK(Merge("k2", "d")); + ASSERT_OK(Flush()); + std::vector values2(4); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", &values2); + for(PinnableSlice& psl: values2) { + std::cout << *psl.GetSelf() << "\n"; + } + ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok()); + ASSERT_EQ(value, "c,d"); + + // All K3 values are flushed and are in different files. + ASSERT_OK(Merge("k3", "ab")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "bc")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "cd")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("k3", "de")); + std::vector values3(4); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", &values3); + for(PinnableSlice& psl: values3) { + std::cout << *psl.GetSelf() << "\n"; + } + + ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok()); + ASSERT_EQ(value, "cd,de"); + + // All K4 values are in different levels + ASSERT_OK(Merge("k4", "ab")); + ASSERT_OK(Flush()); + MoveFilesToLevel(4); + ASSERT_OK(Merge("k4", "bc")); + ASSERT_OK(Flush()); + MoveFilesToLevel(3); + ASSERT_OK(Merge("k4", "cd")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_OK(Merge("k4", "de")); + ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok()); + ASSERT_EQ(value, "cd,de"); + + std::vector values4(4); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", &values4); + for(PinnableSlice& psl: values4) { + std::cout << *psl.GetSelf() << "\n"; + } +} + class DBBasicTestWithParallelIO : public DBTestBase, public testing::WithParamInterface> { @@ -1624,6 +1727,7 @@ INSTANTIATE_TEST_CASE_P( std::make_tuple(true, true, true, false), std::make_tuple(false, true, false, false))); + class DBBasicTestWithTimestampWithParam : public DBTestBase, public testing::WithParamInterface { diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 7f639c853..1b0a57df7 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -4454,6 +4454,8 @@ TEST_F(DBCompactionTest, PartialManualCompaction) { uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2; ASSERT_OK(dbfull()->SetOptions( {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}})); + ASSERT_OK(dbfull()->SetOptions( + {{"compaction_readahead_size", std::to_string(2097152)}})); CompactRangeOptions cro; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index af9aea011..68abb549c 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1564,6 +1564,99 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, return s; } +Status DBImpl::GetMergeOperands(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + std::vector* pinnable_val) { + assert(pinnable_val != nullptr); + PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); + StopWatch sw(env_, stats_, DB_GET); + PERF_TIMER_GUARD(get_snapshot_time); + + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + + if (tracer_) { + // TODO: This mutex should be removed later, to improve performance when + // tracing is enabled. + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + tracer_->Get(column_family, key); + } + } + + // Acquire SuperVersion + SuperVersion* sv = GetAndRefSuperVersion(cfd); + + SequenceNumber snapshot; + if (read_options.snapshot != nullptr) { + snapshot = + reinterpret_cast(read_options.snapshot)->number_; + } else { + // Note that the snapshot is assigned AFTER referencing the super + // version because otherwise a flush happening in between may compact away + // data for the snapshot, so the reader would see neither data that was be + // visible to the snapshot before compaction nor the newer data inserted + // afterwards. + snapshot = last_seq_same_as_publish_seq_ + ? versions_->LastSequence() + : versions_->LastPublishedSequence(); + } + + // Prepare to store a list of merge operations if merge occurs. + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq = 0; + + Status s; + // First look in the memtable, then in the immutable memtable (if any). + // s is both in/out. When in, s could either be OK or MergeInProgress. + // merge_operands will contain the sequence of merges in the latter case. + LookupKey lkey(key, snapshot); + PERF_TIMER_STOP(get_snapshot_time); + + bool skip_memtable = (read_options.read_tier == kPersistedTier && + has_unpersisted_data_.load(std::memory_order_relaxed)); + bool done = false; + if (!skip_memtable) { + if (sv->mem->GetMergeOperands(lkey, pinnable_val, &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { + done = true; + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + sv->imm->GetMergeOperands(lkey, pinnable_val, &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { + done = true; + RecordTick(stats_, MEMTABLE_HIT); + } + if (!done && !s.ok() && !s.IsMergeInProgress()) { + ReturnAndCleanupSuperVersion(cfd, sv); + return s; + } + } + if (!done) { + PERF_TIMER_GUARD(get_from_output_files_time); + sv->current->GetMergeOperands(read_options, lkey, pinnable_val, &s, &merge_context, + &max_covering_tombstone_seq, nullptr, nullptr, nullptr, + nullptr, nullptr); + RecordTick(stats_, MEMTABLE_MISS); + } + + { + PERF_TIMER_GUARD(get_post_process_time); + + ReturnAndCleanupSuperVersion(cfd, sv); + + RecordTick(stats_, NUMBER_KEYS_READ); + size_t size = 0; + if (s.ok()) { + size = pinnable_val->size(); + RecordTick(stats_, BYTES_READ, size); + PERF_COUNTER_ADD(get_read_bytes, size); + } + RecordInHistogram(stats_, BYTES_PER_READ, size); + } + return s; +} + std::vector DBImpl::MultiGet( const ReadOptions& read_options, const std::vector& column_family, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 547e3e1d6..3bb9521f1 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -159,6 +159,11 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + using DB::GetMergeOperands; + virtual Status GetMergeOperands(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::vector* value) override; + using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, diff --git a/db/db_memtable_test.cc b/db/db_memtable_test.cc index 184c6f53b..1bf33afb2 100644 --- a/db/db_memtable_test.cc +++ b/db/db_memtable_test.cc @@ -42,8 +42,8 @@ class MockMemTableRep : public MemTableRep { bool Contains(const char* key) const override { return rep_->Contains(key); } void Get(const LookupKey& k, void* callback_args, - bool (*callback_func)(void* arg, const char* entry)) override { - rep_->Get(k, callback_args, callback_func); + bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) override { + rep_->Get(k, callback_args, callback_func, do_merge); } size_t ApproximateMemoryUsage() override { diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 2b5e4a445..f6d6c4588 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -86,9 +86,15 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) { Reopen(options); // All K1 values are in memtable. ASSERT_OK(Merge("k1", "a")); + Put("k1", "asd"); ASSERT_OK(Merge("k1", "b")); ASSERT_OK(Merge("k1", "c")); ASSERT_OK(Merge("k1", "d")); + std::vector values; + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", &values); + for(PinnableSlice& value: values) { + std::cout << *value.GetSelf() << "\n"; + } std::string value; ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok()); // Make sure that only the latest two merge operands are used. If this was @@ -101,6 +107,11 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) { ASSERT_OK(Merge("k2", "c")); ASSERT_OK(Merge("k2", "d")); ASSERT_OK(Flush()); + std::vector values2(4); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k2", &values2); + for(PinnableSlice& psl: values2) { + std::cout << *psl.GetSelf() << "\n"; + } ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok()); ASSERT_EQ(value, "c,d"); @@ -112,6 +123,12 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) { ASSERT_OK(Merge("k3", "cd")); ASSERT_OK(Flush()); ASSERT_OK(Merge("k3", "de")); + std::vector values3(4); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k3", &values3); + for(PinnableSlice& psl: values3) { + std::cout << *psl.GetSelf() << "\n"; + } + ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok()); ASSERT_EQ(value, "cd,de"); @@ -128,6 +145,13 @@ TEST_F(DBMergeOperatorTest, LimitMergeOperands) { ASSERT_OK(Merge("k4", "de")); ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok()); ASSERT_EQ(value, "cd,de"); + + std::vector values4(4); + db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k4", &values4); + for(PinnableSlice& psl: values4) { + std::cout << *psl.GetSelf() << "\n"; + } + } TEST_F(DBMergeOperatorTest, MergeErrorOnRead) { diff --git a/db/db_test.cc b/db/db_test.cc index 36bdda59e..75f40fc1b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2466,6 +2466,18 @@ class ModelDB : public DB { return Status::NotSupported(key); } + using DB::GetMergeOperands; + virtual Status GetMergeOperands(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::vector* value) override { + + (void)options; + (void) column_family; + (void) key; + (void) value; + return Status::NotSupported(key); + } + using DB::MultiGet; std::vector MultiGet( const ReadOptions& /*options*/, diff --git a/db/db_test_util.h b/db/db_test_util.h index 6e1d0ed7a..84e0ceaaa 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -155,8 +155,8 @@ class SpecialMemTableRep : public MemTableRep { virtual void Get(const LookupKey& k, void* callback_args, bool (*callback_func)(void* arg, - const char* entry)) override { - memtable_->Get(k, callback_args, callback_func); + const char* entry, bool do_merge), bool do_merge) override { + memtable_->Get(k, callback_args, callback_func, do_merge); } uint64_t ApproximateNumEntries(const Slice& start_ikey, diff --git a/db/memtable.cc b/db/memtable.cc index fdd1a577a..42504ee0e 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -614,7 +614,7 @@ struct Saver { }; } // namespace -static bool SaveValue(void* arg, const char* entry) { +static bool SaveValue(void* arg, const char* entry, bool do_merge) { Saver* s = reinterpret_cast(arg); assert(s != nullptr); MergeContext* merge_context = s->merge_context; @@ -627,7 +627,7 @@ static bool SaveValue(void* arg, const char* entry) { // klength varint32 // userkey char[klength-8] // tag uint64 - // vlength varint32 + // vlength varint32f // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped @@ -677,12 +677,18 @@ static bool SaveValue(void* arg, const char* entry) { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *(s->status) = Status::OK(); if (*(s->merge_in_progress)) { - if (s->value != nullptr) { - *(s->status) = MergeHelper::TimedFullMerge( - merge_operator, s->key->user_key(), &v, - merge_context->GetOperands(), s->value, s->logger, - s->statistics, s->env_, nullptr /* result_operand */, true); - } + if (!do_merge) { + merge_context->PushOperand( + v, s->inplace_update_support == false /* operand_pinned */); + } + else { + if (s->value != nullptr) { + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), &v, + merge_context->GetOperands(), s->value, s->logger, + s->statistics, s->env_, nullptr /* result_operand */, true); + } + } } else if (s->value != nullptr) { s->value->assign(v.data(), v.size()); } @@ -726,7 +732,7 @@ static bool SaveValue(void* arg, const char* entry) { *(s->merge_in_progress) = true; merge_context->PushOperand( v, s->inplace_update_support == false /* operand_pinned */); - if (merge_operator->ShouldMerge(merge_context->GetOperandsDirectionBackward())) { + if (do_merge && merge_operator->ShouldMerge(merge_context->GetOperandsDirectionBackward())) { *(s->status) = MergeHelper::TimedFullMerge( merge_operator, s->key->user_key(), nullptr, merge_context->GetOperands(), s->value, s->logger, s->statistics, @@ -810,7 +816,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.env_ = env_; saver.callback_ = callback; saver.is_blob_index = is_blob_index; - table_->Get(key, &saver, SaveValue); + table_->Get(key, &saver, SaveValue, true); *seq = saver.seq; } @@ -823,6 +829,87 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, return found_final_value; } +bool MemTable::GetMergeOperands(const LookupKey& key, + std::vector* pinnable_val, + Status* s, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + const ReadOptions& read_opts, + ReadCallback* callback, + bool* is_blob_index) { + // The sequence number is updated synchronously in version_set.h + if (IsEmpty()) { + // Avoiding recording stats for speed. + return false; + } + PERF_TIMER_GUARD(get_from_memtable_time); + + std::unique_ptr range_del_iter( + NewRangeTombstoneIterator(read_opts, + GetInternalKeySeqno(key.internal_key()))); + if (range_del_iter != nullptr) { + *max_covering_tombstone_seq = + std::max(*max_covering_tombstone_seq, + range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key())); + } + + Slice user_key = key.user_key(); + bool found_final_value = false; + bool merge_in_progress = s->IsMergeInProgress(); + bool may_contain = true; + size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size(); + if (bloom_filter_) { + // when both memtable_whole_key_filtering and prefix_extractor_ are set, + // only do whole key filtering for Get() to save CPU + if (moptions_.memtable_whole_key_filtering) { + may_contain = + bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz)); + } else { + assert(prefix_extractor_); + may_contain = + !prefix_extractor_->InDomain(user_key) || + bloom_filter_->MayContain(prefix_extractor_->Transform(user_key)); + } + } + if (bloom_filter_ && !may_contain) { + // iter is null if prefix bloom says the key does not exist + PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); + } else { + if (bloom_filter_) { + PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); + } + Saver saver; + saver.status = s; + saver.found_final_value = &found_final_value; + saver.merge_in_progress = &merge_in_progress; + saver.key = &key; + saver.seq = kMaxSequenceNumber; + saver.mem = this; + saver.merge_context = merge_context; + saver.max_covering_tombstone_seq = *max_covering_tombstone_seq; + saver.merge_operator = moptions_.merge_operator; + saver.logger = moptions_.info_log; + saver.inplace_update_support = moptions_.inplace_update_support; + saver.statistics = moptions_.statistics; + saver.env_ = env_; + saver.callback_ = callback; + saver.is_blob_index = is_blob_index; + table_->Get(key, &saver, SaveValue, false); + PinnableSlice* psliceptr = pinnable_val->data(); + for (Slice slice : saver.merge_context->GetOperands()) { + psliceptr->PinSelf(slice); + psliceptr++; + } + } + + // No change to value, since we have not yet found a Put/Delete + if (!found_final_value && merge_in_progress) { + *s = Status::MergeInProgress(); + } + PERF_COUNTER_ADD(get_from_memtable_count, 1); + + return found_final_value; +} + void MemTable::Update(SequenceNumber seq, const Slice& key, const Slice& value) { @@ -996,10 +1083,10 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { } void MemTableRep::Get(const LookupKey& k, void* callback_args, - bool (*callback_func)(void* arg, const char* entry)) { + bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) { auto iter = GetDynamicPrefixIterator(); for (iter->Seek(k.internal_key(), k.memtable_key().data()); - iter->Valid() && callback_func(callback_args, iter->key()); + iter->Valid() && callback_func(callback_args, iter->key(), do_merge); iter->Next()) { } } diff --git a/db/memtable.h b/db/memtable.h index 6b8c4141f..5bdc66c74 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -204,6 +204,14 @@ class MemTable { read_opts, callback, is_blob_index); } + bool GetMergeOperands(const LookupKey& key, + std::vector* pinnable_val, + Status* s, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + const ReadOptions& read_opts, + ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); + // Attempts to update the new_value inplace, else does normal Add // Pseudocode // if key exists in current memtable && prev_value is of type kTypeValue diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 0f796eb9a..3a41b241a 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -109,6 +109,23 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, is_blob_index); } +bool MemTableListVersion::GetMergeOperands(const LookupKey& key, + std::vector* pinnable_val, + Status* s, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + const ReadOptions& read_opts, + ReadCallback* callback, + bool* is_blob_index) { + for (MemTable* memtable : memlist_) { + bool done = memtable->GetMergeOperands(key, pinnable_val, s, merge_context, + max_covering_tombstone_seq, read_opts, callback, is_blob_index); + if (done) { + return true; + } + } + return false; +} + bool MemTableListVersion::GetFromHistory( const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, diff --git a/db/memtable_list.h b/db/memtable_list.h index a72077ff3..7d1a49a75 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -71,6 +71,14 @@ class MemTableListVersion { read_opts, callback, is_blob_index); } + bool GetMergeOperands(const LookupKey& key, + std::vector* pinnable_val, + Status* s, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + const ReadOptions& read_opts, + ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); + // Similar to Get(), but searches the Memtable history of memtables that // have already been flushed. Should only be used from in-memory only // queries (such as Transaction validation) as the history may contain diff --git a/db/version_set.cc b/db/version_set.cc index 0d3b9fb4e..30fbb65cf 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1783,6 +1783,120 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } } +void Version::GetMergeOperands(const ReadOptions& read_options, const LookupKey& k, + std::vector* pinnable_val, Status* status, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, bool* value_found, + bool* key_exists, SequenceNumber* seq, ReadCallback* callback, + bool* is_blob) { + Slice ikey = k.internal_key(); + Slice user_key = k.user_key(); + + assert(status->ok() || status->IsMergeInProgress()); + + if (key_exists != nullptr) { + // will falsify below if not found + *key_exists = true; + } + + PinnedIteratorsManager pinned_iters_mgr; + GetContext get_context( + user_comparator(), merge_operator_, info_log_, db_statistics_, + status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, + nullptr, value_found, merge_context, max_covering_tombstone_seq, this->env_, + seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, false); + + // Pin blocks that we read to hold merge operands + if (merge_operator_) { + pinned_iters_mgr.StartPinning(); + } + + FilePicker fp( + storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_, + storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_, + user_comparator(), internal_comparator()); + FdWithKeyRange* f = fp.GetNextFile(); + + while (f != nullptr) { + if (*max_covering_tombstone_seq > 0) { + // The remaining files we look at will only contain covered keys, so we + // stop here. + break; + } + if (get_context.sample()) { + sample_file_read_inc(f->file_metadata); + } + + bool timer_enabled = + GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && + get_perf_context()->per_level_perf_context_enabled; + StopWatchNano timer(env_, timer_enabled /* auto_start */); + *status = table_cache_->Get( + read_options, *internal_comparator(), *f->file_metadata, ikey, + &get_context, mutable_cf_options_.prefix_extractor.get(), + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()), + fp.GetCurrentLevel()); + // TODO: examine the behavior for corrupted key + if (timer_enabled) { + PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), + fp.GetCurrentLevel()); + } + if (!status->ok()) { + return; + } + + // report the counters before returning + if (get_context.State() != GetContext::kNotFound && + get_context.State() != GetContext::kMerge && + db_statistics_ != nullptr) { + get_context.ReportCounters(); + } + switch (get_context.State()) { + case GetContext::kNotFound: + // Keep searching in other files + break; + case GetContext::kMerge: + // TODO: update per-level perfcontext user_key_return_count for kMerge + break; + case GetContext::kFound: + if (fp.GetHitFileLevel() == 0) { + RecordTick(db_statistics_, GET_HIT_L0); + } else if (fp.GetHitFileLevel() == 1) { + RecordTick(db_statistics_, GET_HIT_L1); + } else if (fp.GetHitFileLevel() >= 2) { + RecordTick(db_statistics_, GET_HIT_L2_AND_UP); + } + PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel()); + break; + case GetContext::kDeleted: + // Use empty error message for speed + *status = Status::NotFound(); + break; + case GetContext::kCorrupt: + *status = Status::Corruption("corrupted key for ", user_key); + return; + case GetContext::kBlobIndex: + ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); + *status = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + return; + } + f = fp.GetNextFile(); + } + + if (db_statistics_ != nullptr) { + get_context.ReportCounters(); + } + PinnableSlice* pin_slice = pinnable_val->data(); + for (Slice slice : merge_context->GetOperands()){ + pin_slice->PinSelf(slice); + pin_slice++; + } +} + void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, ReadCallback* callback, bool* is_blob) { PinnedIteratorsManager pinned_iters_mgr; diff --git a/db/version_set.h b/db/version_set.h index 6b7c42881..722cec289 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -63,7 +63,6 @@ class VersionSet; class WriteBufferManager; class MergeContext; class ColumnFamilySet; -class TableCache; class MergeIteratorBuilder; // Return the smallest index i such that file_level.files[i]->largest >= key. @@ -584,6 +583,14 @@ class Version { SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, bool* is_blob = nullptr); + void GetMergeOperands(const ReadOptions&, const LookupKey& key, + std::vector* pinnable_val, + Status* status, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + bool* value_found = nullptr, bool* key_exists = nullptr, + SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, + bool* is_blob = nullptr); + void MultiGet(const ReadOptions&, MultiGetRange* range, ReadCallback* callback = nullptr, bool* is_blob = nullptr); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index d90ca900f..fd0e7d2ef 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -403,6 +403,10 @@ class DB { return Get(options, DefaultColumnFamily(), key, value); } + virtual Status GetMergeOperands(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::vector* value) = 0; + // If keys[i] does not exist in the database, then the i'th returned // status will be one for which Status::IsNotFound() is true, and // (*values)[i] will be set to some arbitrary value (often ""). Otherwise, diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 328422f57..70aceba84 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -165,7 +165,7 @@ class MemTableRep { // Get() function with a default value of dynamically construct an iterator, // seek and call the call back function. virtual void Get(const LookupKey& k, void* callback_args, - bool (*callback_func)(void* arg, const char* entry)); + bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge); virtual uint64_t ApproximateNumEntries(const Slice& /*start_ikey*/, const Slice& /*end_key*/) { diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index a52aff5d8..92ed2e984 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -88,6 +88,13 @@ class StackableDB : public DB { return db_->Get(options, column_family, key, value); } + using DB::GetMergeOperands; + virtual Status GetMergeOperands(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::vector* value) override { + return db_->GetMergeOperands(options, column_family, key, value); + } + using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, diff --git a/memtable/hash_linklist_rep.cc b/memtable/hash_linklist_rep.cc index e347abe6e..31320a581 100644 --- a/memtable/hash_linklist_rep.cc +++ b/memtable/hash_linklist_rep.cc @@ -177,7 +177,7 @@ class HashLinkListRep : public MemTableRep { size_t ApproximateMemoryUsage() override; void Get(const LookupKey& k, void* callback_args, - bool (*callback_func)(void* arg, const char* entry)) override; + bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) override; ~HashLinkListRep() override; @@ -714,7 +714,7 @@ size_t HashLinkListRep::ApproximateMemoryUsage() { } void HashLinkListRep::Get(const LookupKey& k, void* callback_args, - bool (*callback_func)(void* arg, const char* entry)) { + bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) { auto transformed = transform_->Transform(k.user_key()); auto bucket = GetBucket(transformed); @@ -723,7 +723,7 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args, // Is a skip list MemtableSkipList::Iterator iter(&skip_list_header->skip_list); for (iter.Seek(k.memtable_key().data()); - iter.Valid() && callback_func(callback_args, iter.key()); + iter.Valid() && callback_func(callback_args, iter.key(), do_merge); iter.Next()) { } } else { @@ -731,7 +731,7 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args, if (link_list_head != nullptr) { LinkListIterator iter(this, link_list_head); for (iter.Seek(k.internal_key(), nullptr); - iter.Valid() && callback_func(callback_args, iter.key()); + iter.Valid() && callback_func(callback_args, iter.key(), do_merge); iter.Next()) { } } diff --git a/memtable/hash_skiplist_rep.cc b/memtable/hash_skiplist_rep.cc index 5c74657cd..5f07d5c2a 100644 --- a/memtable/hash_skiplist_rep.cc +++ b/memtable/hash_skiplist_rep.cc @@ -35,7 +35,7 @@ class HashSkipListRep : public MemTableRep { size_t ApproximateMemoryUsage() override; void Get(const LookupKey& k, void* callback_args, - bool (*callback_func)(void* arg, const char* entry)) override; + bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) override; ~HashSkipListRep() override; @@ -287,13 +287,13 @@ size_t HashSkipListRep::ApproximateMemoryUsage() { } void HashSkipListRep::Get(const LookupKey& k, void* callback_args, - bool (*callback_func)(void* arg, const char* entry)) { + bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) { auto transformed = transform_->Transform(k.user_key()); auto bucket = GetBucket(transformed); if (bucket != nullptr) { Bucket::Iterator iter(bucket); for (iter.Seek(k.memtable_key().data()); - iter.Valid() && callback_func(callback_args, iter.key()); + iter.Valid() && callback_func(callback_args, iter.key(), do_merge); iter.Next()) { } } diff --git a/memtable/memtablerep_bench.cc b/memtable/memtablerep_bench.cc index 1e2b5bdd1..430baf572 100644 --- a/memtable/memtablerep_bench.cc +++ b/memtable/memtablerep_bench.cc @@ -293,7 +293,7 @@ class ReadBenchmarkThread : public BenchmarkThread { : BenchmarkThread(table, key_gen, bytes_written, bytes_read, sequence, num_ops, read_hits) {} - static bool callback(void* arg, const char* entry) { + static bool callback(void* arg, const char* entry, bool /*do_merge*/) { CallbackVerifyArgs* callback_args = static_cast(arg); assert(callback_args != nullptr); uint32_t key_length; @@ -318,7 +318,7 @@ class ReadBenchmarkThread : public BenchmarkThread { verify_args.key = &lookup_key; verify_args.table = table_; verify_args.comparator = &internal_key_comp; - table_->Get(lookup_key, &verify_args, callback); + table_->Get(lookup_key, &verify_args, callback, true); if (verify_args.found) { *bytes_read_ += VarintLength(16) + 16 + FLAGS_item_size; ++*read_hits_; diff --git a/memtable/skiplistrep.cc b/memtable/skiplistrep.cc index 3955217cc..b38669f23 100644 --- a/memtable/skiplistrep.cc +++ b/memtable/skiplistrep.cc @@ -69,11 +69,11 @@ public: } void Get(const LookupKey& k, void* callback_args, - bool (*callback_func)(void* arg, const char* entry)) override { + bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) override { SkipListRep::Iterator iter(&skip_list_); Slice dummy_slice; for (iter.Seek(dummy_slice, k.memtable_key().data()); - iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) { + iter.Valid() && callback_func(callback_args, iter.key(), do_merge); iter.Next()) { } } diff --git a/memtable/vectorrep.cc b/memtable/vectorrep.cc index e7acc94ad..efeea0145 100644 --- a/memtable/vectorrep.cc +++ b/memtable/vectorrep.cc @@ -41,7 +41,7 @@ class VectorRep : public MemTableRep { size_t ApproximateMemoryUsage() override; void Get(const LookupKey& k, void* callback_args, - bool (*callback_func)(void* arg, const char* entry)) override; + bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) override; ~VectorRep() override {} @@ -248,7 +248,7 @@ void VectorRep::Iterator::SeekToLast() { } void VectorRep::Get(const LookupKey& k, void* callback_args, - bool (*callback_func)(void* arg, const char* entry)) { + bool (*callback_func)(void* arg, const char* entry, bool do_merge), bool do_merge) { rwlock_.ReadLock(); VectorRep* vector_rep; std::shared_ptr bucket; @@ -262,7 +262,7 @@ void VectorRep::Get(const LookupKey& k, void* callback_args, rwlock_.ReadUnlock(); for (iter.Seek(k.user_key(), k.memtable_key().data()); - iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) { + iter.Valid() && callback_func(callback_args, iter.key(), do_merge); iter.Next()) { } } diff --git a/table/get_context.cc b/table/get_context.cc index f0c7928bf..7f925b138 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -44,7 +44,7 @@ GetContext::GetContext( PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, SequenceNumber* _max_covering_tombstone_seq, Env* env, SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback, - bool* is_blob_index, uint64_t tracing_get_id) + bool* is_blob_index, uint64_t tracing_get_id, bool do_merge) : ucmp_(ucmp), merge_operator_(merge_operator), logger_(logger), @@ -61,7 +61,8 @@ GetContext::GetContext( pinned_iters_mgr_(_pinned_iters_mgr), callback_(callback), is_blob_index_(is_blob_index), - tracing_get_id_(tracing_get_id) { + tracing_get_id_(tracing_get_id), + do_merge_(do_merge){ if (seq_) { *seq_ = kMaxSequenceNumber; } @@ -230,14 +231,23 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(merge_operator_ != nullptr); state_ = kFound; if (LIKELY(pinnable_val_ != nullptr)) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, &value, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, env_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; - } + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, &value, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } + } else { + assert (do_merge_ == false); + if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && + value_pinner != nullptr) { + value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); + merge_context_->PushOperand(value, true /*value_pinned*/); + } else { + merge_context_->PushOperand(value, false); + } } } if (is_blob_index_ != nullptr) { @@ -256,14 +266,16 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } else if (kMerge == state_) { state_ = kFound; if (LIKELY(pinnable_val_ != nullptr)) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, env_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; - } + if (do_merge_) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } + } } } return false; @@ -279,18 +291,20 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } else { merge_context_->PushOperand(value, false); } - if (merge_operator_ != nullptr && + if (do_merge_ && merge_operator_ != nullptr && merge_operator_->ShouldMerge(merge_context_->GetOperandsDirectionBackward())) { state_ = kFound; if (LIKELY(pinnable_val_ != nullptr)) { - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, nullptr, - merge_context_->GetOperands(), pinnable_val_->GetSelf(), - logger_, statistics_, env_); - pinnable_val_->PinSelf(); - if (!merge_status.ok()) { - state_ = kCorrupt; - } + if (do_merge_) { + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, nullptr, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), + logger_, statistics_, env_); + pinnable_val_->PinSelf(); + if (!merge_status.ok()) { + state_ = kCorrupt; + } + } } return false; } diff --git a/table/get_context.h b/table/get_context.h index 7a37beb2d..e7cd0ef88 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -86,7 +86,7 @@ class GetContext { SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr, ReadCallback* callback = nullptr, bool* is_blob_index = nullptr, - uint64_t tracing_get_id = 0); + uint64_t tracing_get_id = 0, bool do_merge = true); GetContext() = default; @@ -164,6 +164,7 @@ class GetContext { // Used for block cache tracing only. A tracing get id uniquely identifies a // Get or a MultiGet. const uint64_t tracing_get_id_; + bool do_merge_; }; // Call this to replay a log and bring the get_context up to date. The replay