Allow merge operator to be called even with a single operand
Summary: Added a function `MergeOperator::DoesAllowSingleMergeOperand()` to allow invoking a merge operator even with a single merge operand, if overriden. This is needed for Cassandra-on-RocksDB work. All Cassandra writes are through merges and this will allow a single merge-value to be updated in the merge-operator invoked via a compaction, if needed, due to an expired TTL. Closes https://github.com/facebook/rocksdb/pull/2721 Differential Revision: D5608706 Pulled By: sagar0 fbshipit-source-id: f299f9f91c4d1ac26e48bd5906e122c1c5e5f3fc
This commit is contained in:
parent
ac8fb77afd
commit
9a44b4c32c
@ -4,6 +4,7 @@
|
||||
* Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators.
|
||||
* Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1.
|
||||
* Universal compactions including the bottom level can be executed in a dedicated thread pool. This alleviates head-of-line blocking in the compaction queue, which cause write stalling, particularly in multi-instance use cases. Users can enable this feature via `Env::SetBackgroundThreads(N, Env::Priority::BOTTOM)`, where `N > 0`.
|
||||
* Allow merge operator to be called even with a single merge operand during compactions, by appropriately overriding `MergeOperator::AllowSingleOperand`.
|
||||
|
||||
### Bug Fixes
|
||||
* Fix wrong latencies in `rocksdb.db.get.micros`, `rocksdb.db.write.micros`, and `rocksdb.sst.read.micros`.
|
||||
|
@ -455,6 +455,111 @@ TEST_F(CompactionIteratorTest, ShuttingDownInMerge) {
|
||||
EXPECT_EQ(2, filter.last_seen.load());
|
||||
}
|
||||
|
||||
TEST_F(CompactionIteratorTest, SingleMergeOperand) {
|
||||
class Filter : public CompactionFilter {
|
||||
virtual Decision FilterV2(int level, const Slice& key, ValueType t,
|
||||
const Slice& existing_value,
|
||||
std::string* new_value,
|
||||
std::string* skip_until) const override {
|
||||
std::string k = key.ToString();
|
||||
std::string v = existing_value.ToString();
|
||||
|
||||
// See InitIterators() call below for the sequence of keys and their
|
||||
// filtering decisions. Here we closely assert that compaction filter is
|
||||
// called with the expected keys and only them, and with the right values.
|
||||
if (k == "a") {
|
||||
EXPECT_EQ(ValueType::kMergeOperand, t);
|
||||
EXPECT_EQ("av1", v);
|
||||
return Decision::kKeep;
|
||||
} else if (k == "b") {
|
||||
EXPECT_EQ(ValueType::kMergeOperand, t);
|
||||
return Decision::kKeep;
|
||||
} else if (k == "c") {
|
||||
return Decision::kKeep;
|
||||
}
|
||||
|
||||
ADD_FAILURE();
|
||||
return Decision::kKeep;
|
||||
}
|
||||
|
||||
const char* Name() const override {
|
||||
return "CompactionIteratorTest.SingleMergeOperand::Filter";
|
||||
}
|
||||
};
|
||||
|
||||
class SingleMergeOp : public MergeOperator {
|
||||
public:
|
||||
bool FullMergeV2(const MergeOperationInput& merge_in,
|
||||
MergeOperationOutput* merge_out) const override {
|
||||
// See InitIterators() call below for why "c" is the only key for which
|
||||
// FullMergeV2 should be called.
|
||||
EXPECT_EQ("c", merge_in.key.ToString());
|
||||
|
||||
std::string temp_value;
|
||||
if (merge_in.existing_value != nullptr) {
|
||||
temp_value = merge_in.existing_value->ToString();
|
||||
}
|
||||
|
||||
for (auto& operand : merge_in.operand_list) {
|
||||
temp_value.append(operand.ToString());
|
||||
}
|
||||
merge_out->new_value = temp_value;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PartialMergeMulti(const Slice& key,
|
||||
const std::deque<Slice>& operand_list,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override {
|
||||
std::string string_key = key.ToString();
|
||||
EXPECT_TRUE(string_key == "a" || string_key == "b");
|
||||
|
||||
if (string_key == "a") {
|
||||
EXPECT_EQ(1, operand_list.size());
|
||||
} else if (string_key == "b") {
|
||||
EXPECT_EQ(2, operand_list.size());
|
||||
}
|
||||
|
||||
std::string temp_value;
|
||||
for (auto& operand : operand_list) {
|
||||
temp_value.append(operand.ToString());
|
||||
}
|
||||
swap(temp_value, *new_value);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
const char* Name() const override {
|
||||
return "CompactionIteratorTest SingleMergeOp";
|
||||
}
|
||||
|
||||
bool AllowSingleOperand() const override { return true; }
|
||||
};
|
||||
|
||||
SingleMergeOp merge_op;
|
||||
Filter filter;
|
||||
InitIterators(
|
||||
// a should invoke PartialMergeMulti with a single merge operand.
|
||||
{test::KeyStr("a", 50, kTypeMerge),
|
||||
// b should invoke PartialMergeMulti with two operands.
|
||||
test::KeyStr("b", 70, kTypeMerge), test::KeyStr("b", 60, kTypeMerge),
|
||||
// c should invoke FullMerge due to kTypeValue at the beginning.
|
||||
test::KeyStr("c", 90, kTypeMerge), test::KeyStr("c", 80, kTypeValue)},
|
||||
{"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber,
|
||||
&merge_op, &filter);
|
||||
|
||||
c_iter_->SeekToFirst();
|
||||
ASSERT_TRUE(c_iter_->Valid());
|
||||
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), c_iter_->key().ToString());
|
||||
ASSERT_EQ("av1", c_iter_->value().ToString());
|
||||
c_iter_->Next();
|
||||
ASSERT_TRUE(c_iter_->Valid());
|
||||
ASSERT_EQ("bv1bv2", c_iter_->value().ToString());
|
||||
c_iter_->Next();
|
||||
ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -18,6 +18,33 @@
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
|
||||
const MergeOperator* user_merge_operator,
|
||||
const CompactionFilter* compaction_filter,
|
||||
Logger* logger, bool assert_valid_internal_key,
|
||||
SequenceNumber latest_snapshot, int level,
|
||||
Statistics* stats,
|
||||
const std::atomic<bool>* shutting_down)
|
||||
: env_(env),
|
||||
user_comparator_(user_comparator),
|
||||
user_merge_operator_(user_merge_operator),
|
||||
compaction_filter_(compaction_filter),
|
||||
shutting_down_(shutting_down),
|
||||
logger_(logger),
|
||||
assert_valid_internal_key_(assert_valid_internal_key),
|
||||
allow_single_operand_(false),
|
||||
latest_snapshot_(latest_snapshot),
|
||||
level_(level),
|
||||
keys_(),
|
||||
filter_timer_(env_),
|
||||
total_filter_time_(0U),
|
||||
stats_(stats) {
|
||||
assert(user_comparator_ != nullptr);
|
||||
if (user_merge_operator_) {
|
||||
allow_single_operand_ = user_merge_operator_->AllowSingleOperand();
|
||||
}
|
||||
}
|
||||
|
||||
Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
|
||||
const Slice& key, const Slice* value,
|
||||
const std::vector<Slice>& operands,
|
||||
@ -288,7 +315,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
|
||||
// Attempt to use the user's associative merge function to
|
||||
// merge the stacked merge operands into a single operand.
|
||||
s = Status::MergeInProgress();
|
||||
if (merge_context_.GetNumOperands() >= 2) {
|
||||
if (merge_context_.GetNumOperands() >= 2 ||
|
||||
(allow_single_operand_ && merge_context_.GetNumOperands() == 1)) {
|
||||
bool merge_success = false;
|
||||
std::string merge_result;
|
||||
{
|
||||
|
@ -34,22 +34,7 @@ class MergeHelper {
|
||||
const CompactionFilter* compaction_filter, Logger* logger,
|
||||
bool assert_valid_internal_key, SequenceNumber latest_snapshot,
|
||||
int level = 0, Statistics* stats = nullptr,
|
||||
const std::atomic<bool>* shutting_down = nullptr)
|
||||
: env_(env),
|
||||
user_comparator_(user_comparator),
|
||||
user_merge_operator_(user_merge_operator),
|
||||
compaction_filter_(compaction_filter),
|
||||
shutting_down_(shutting_down),
|
||||
logger_(logger),
|
||||
assert_valid_internal_key_(assert_valid_internal_key),
|
||||
latest_snapshot_(latest_snapshot),
|
||||
level_(level),
|
||||
keys_(),
|
||||
filter_timer_(env_),
|
||||
total_filter_time_(0U),
|
||||
stats_(stats) {
|
||||
assert(user_comparator_ != nullptr);
|
||||
}
|
||||
const std::atomic<bool>* shutting_down = nullptr);
|
||||
|
||||
// Wrapper around MergeOperator::FullMergeV2() that records perf statistics.
|
||||
// Result of merge will be written to result if status returned is OK.
|
||||
@ -158,6 +143,7 @@ class MergeHelper {
|
||||
const std::atomic<bool>* shutting_down_;
|
||||
Logger* logger_;
|
||||
bool assert_valid_internal_key_; // enforce no internal key corruption?
|
||||
bool allow_single_operand_;
|
||||
SequenceNumber latest_snapshot_;
|
||||
int level_;
|
||||
|
||||
|
@ -183,6 +183,13 @@ class MergeOperator {
|
||||
// no checking is enforced. Client is responsible for providing
|
||||
// consistent MergeOperator between DB opens.
|
||||
virtual const char* Name() const = 0;
|
||||
|
||||
// Determines whether the MergeOperator can be called with just a single
|
||||
// merge operand.
|
||||
// Override and return true for allowing a single operand. FullMergeV2 and
|
||||
// PartialMerge/PartialMergeMulti should be implemented accordingly to handle
|
||||
// a single operand.
|
||||
virtual bool AllowSingleOperand() const { return false; }
|
||||
};
|
||||
|
||||
// The simpler, associative merge operator.
|
||||
|
Loading…
x
Reference in New Issue
Block a user