diff --git a/HISTORY.md b/HISTORY.md index 89e596756..fc6c55874 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators. * Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1. * Universal compactions including the bottom level can be executed in a dedicated thread pool. This alleviates head-of-line blocking in the compaction queue, which cause write stalling, particularly in multi-instance use cases. Users can enable this feature via `Env::SetBackgroundThreads(N, Env::Priority::BOTTOM)`, where `N > 0`. +* Allow merge operator to be called even with a single merge operand during compactions, by appropriately overriding `MergeOperator::AllowSingleOperand`. ### Bug Fixes * Fix wrong latencies in `rocksdb.db.get.micros`, `rocksdb.db.write.micros`, and `rocksdb.sst.read.micros`. diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index b625c99ff..dfc413936 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -455,6 +455,111 @@ TEST_F(CompactionIteratorTest, ShuttingDownInMerge) { EXPECT_EQ(2, filter.last_seen.load()); } +TEST_F(CompactionIteratorTest, SingleMergeOperand) { + class Filter : public CompactionFilter { + virtual Decision FilterV2(int level, const Slice& key, ValueType t, + const Slice& existing_value, + std::string* new_value, + std::string* skip_until) const override { + std::string k = key.ToString(); + std::string v = existing_value.ToString(); + + // See InitIterators() call below for the sequence of keys and their + // filtering decisions. Here we closely assert that compaction filter is + // called with the expected keys and only them, and with the right values. + if (k == "a") { + EXPECT_EQ(ValueType::kMergeOperand, t); + EXPECT_EQ("av1", v); + return Decision::kKeep; + } else if (k == "b") { + EXPECT_EQ(ValueType::kMergeOperand, t); + return Decision::kKeep; + } else if (k == "c") { + return Decision::kKeep; + } + + ADD_FAILURE(); + return Decision::kKeep; + } + + const char* Name() const override { + return "CompactionIteratorTest.SingleMergeOperand::Filter"; + } + }; + + class SingleMergeOp : public MergeOperator { + public: + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + // See InitIterators() call below for why "c" is the only key for which + // FullMergeV2 should be called. + EXPECT_EQ("c", merge_in.key.ToString()); + + std::string temp_value; + if (merge_in.existing_value != nullptr) { + temp_value = merge_in.existing_value->ToString(); + } + + for (auto& operand : merge_in.operand_list) { + temp_value.append(operand.ToString()); + } + merge_out->new_value = temp_value; + + return true; + } + + bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override { + std::string string_key = key.ToString(); + EXPECT_TRUE(string_key == "a" || string_key == "b"); + + if (string_key == "a") { + EXPECT_EQ(1, operand_list.size()); + } else if (string_key == "b") { + EXPECT_EQ(2, operand_list.size()); + } + + std::string temp_value; + for (auto& operand : operand_list) { + temp_value.append(operand.ToString()); + } + swap(temp_value, *new_value); + + return true; + } + + const char* Name() const override { + return "CompactionIteratorTest SingleMergeOp"; + } + + bool AllowSingleOperand() const override { return true; } + }; + + SingleMergeOp merge_op; + Filter filter; + InitIterators( + // a should invoke PartialMergeMulti with a single merge operand. + {test::KeyStr("a", 50, kTypeMerge), + // b should invoke PartialMergeMulti with two operands. + test::KeyStr("b", 70, kTypeMerge), test::KeyStr("b", 60, kTypeMerge), + // c should invoke FullMerge due to kTypeValue at the beginning. + test::KeyStr("c", 90, kTypeMerge), test::KeyStr("c", 80, kTypeValue)}, + {"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber, + &merge_op, &filter); + + c_iter_->SeekToFirst(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), c_iter_->key().ToString()); + ASSERT_EQ("av1", c_iter_->value().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ("bv1bv2", c_iter_->value().ToString()); + c_iter_->Next(); + ASSERT_EQ("cv1cv2", c_iter_->value().ToString()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 625de27c2..55f8254cf 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -18,6 +18,33 @@ namespace rocksdb { +MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator, + const MergeOperator* user_merge_operator, + const CompactionFilter* compaction_filter, + Logger* logger, bool assert_valid_internal_key, + SequenceNumber latest_snapshot, int level, + Statistics* stats, + const std::atomic* shutting_down) + : env_(env), + user_comparator_(user_comparator), + user_merge_operator_(user_merge_operator), + compaction_filter_(compaction_filter), + shutting_down_(shutting_down), + logger_(logger), + assert_valid_internal_key_(assert_valid_internal_key), + allow_single_operand_(false), + latest_snapshot_(latest_snapshot), + level_(level), + keys_(), + filter_timer_(env_), + total_filter_time_(0U), + stats_(stats) { + assert(user_comparator_ != nullptr); + if (user_merge_operator_) { + allow_single_operand_ = user_merge_operator_->AllowSingleOperand(); + } +} + Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, const Slice& key, const Slice* value, const std::vector& operands, @@ -288,7 +315,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // Attempt to use the user's associative merge function to // merge the stacked merge operands into a single operand. s = Status::MergeInProgress(); - if (merge_context_.GetNumOperands() >= 2) { + if (merge_context_.GetNumOperands() >= 2 || + (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) { bool merge_success = false; std::string merge_result; { diff --git a/db/merge_helper.h b/db/merge_helper.h index 59da47a6b..b9ef12a4c 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -34,22 +34,7 @@ class MergeHelper { const CompactionFilter* compaction_filter, Logger* logger, bool assert_valid_internal_key, SequenceNumber latest_snapshot, int level = 0, Statistics* stats = nullptr, - const std::atomic* shutting_down = nullptr) - : env_(env), - user_comparator_(user_comparator), - user_merge_operator_(user_merge_operator), - compaction_filter_(compaction_filter), - shutting_down_(shutting_down), - logger_(logger), - assert_valid_internal_key_(assert_valid_internal_key), - latest_snapshot_(latest_snapshot), - level_(level), - keys_(), - filter_timer_(env_), - total_filter_time_(0U), - stats_(stats) { - assert(user_comparator_ != nullptr); - } + const std::atomic* shutting_down = nullptr); // Wrapper around MergeOperator::FullMergeV2() that records perf statistics. // Result of merge will be written to result if status returned is OK. @@ -158,6 +143,7 @@ class MergeHelper { const std::atomic* shutting_down_; Logger* logger_; bool assert_valid_internal_key_; // enforce no internal key corruption? + bool allow_single_operand_; SequenceNumber latest_snapshot_; int level_; diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index 5fe3e0bfd..f29471005 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -183,6 +183,13 @@ class MergeOperator { // no checking is enforced. Client is responsible for providing // consistent MergeOperator between DB opens. virtual const char* Name() const = 0; + + // Determines whether the MergeOperator can be called with just a single + // merge operand. + // Override and return true for allowing a single operand. FullMergeV2 and + // PartialMerge/PartialMergeMulti should be implemented accordingly to handle + // a single operand. + virtual bool AllowSingleOperand() const { return false; } }; // The simpler, associative merge operator.