Fixed and simplified merge_helper
Summary: MergeUntil was not reporting a success when merging an operand with a Value/Deletion despite the comments in MergeHelper and CompactionJob indicating otherwise. This lead to operands being written to the compaction output unnecessarily: M1 M2 M3 P M4 M5 --> (P+M1+M2+M3) M2 M3 M4 M5 (before the diff) M1 M2 M3 P M4 M5 --> (P+M1+M2+M3) M4 M5 (after the diff) In addition, the code handling Values/Deletion was basically identical. This patch unifies the code. Finally, this patch also adds testing for merge_helper. Test Plan: make && make check Reviewers: sdong, rven, yhchiang, tnovak, igor Reviewed By: igor Subscribers: tnovak, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42351
This commit is contained in:
parent
aede5cd8e6
commit
1d20fa9d0f
4
Makefile
4
Makefile
@ -252,6 +252,7 @@ TESTS = \
|
||||
memenv_test \
|
||||
mock_env_test \
|
||||
memtable_list_test \
|
||||
merge_helper_test \
|
||||
merge_test \
|
||||
merger_test \
|
||||
redis_test \
|
||||
@ -805,6 +806,9 @@ write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
write_controller_test: db/write_controller_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
merge_helper_test: db/merge_helper_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
|
@ -110,15 +110,18 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
|
||||
// At this point we are guaranteed that we need to process this key.
|
||||
|
||||
if (kTypeDeletion == ikey.type) {
|
||||
// hit a delete
|
||||
// => merge nullptr with operands_
|
||||
assert(ikey.type <= kValueTypeForSeek);
|
||||
if (ikey.type != kTypeMerge) {
|
||||
// hit a put/delete
|
||||
// => merge the put value or a nullptr with operands_
|
||||
// => store result in operands_.back() (and update keys_.back())
|
||||
// => change the entry type to kTypeValue for keys_.back()
|
||||
// We are done! Return a success if the merge passes.
|
||||
|
||||
// We are done! Success!
|
||||
const Slice val = iter->value();
|
||||
const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr;
|
||||
std::string merge_result;
|
||||
Status s = TimedFullMerge(ikey.user_key, nullptr, operands_,
|
||||
Status s =
|
||||
TimedFullMerge(ikey.user_key, val_ptr, operands_,
|
||||
user_merge_operator_, stats, env_, logger_,
|
||||
&merge_result);
|
||||
|
||||
@ -130,36 +133,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
orig_ikey.type = kTypeValue;
|
||||
UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
|
||||
operands_.back() = std::move(merge_result);
|
||||
}
|
||||
|
||||
// move iter to the next entry (before doing anything else)
|
||||
iter->Next();
|
||||
if (steps) {
|
||||
++(*steps);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (kTypeValue == ikey.type) {
|
||||
// hit a put
|
||||
// => merge the put value with operands_
|
||||
// => store result in operands_.back() (and update keys_.back())
|
||||
// => change the entry type to kTypeValue for keys_.back()
|
||||
// We are done! Success!
|
||||
const Slice val = iter->value();
|
||||
std::string merge_result;
|
||||
Status s =
|
||||
TimedFullMerge(ikey.user_key, &val, operands_, user_merge_operator_,
|
||||
stats, env_, logger_, &merge_result);
|
||||
|
||||
// We store the result in keys_.back() and operands_.back()
|
||||
// if nothing went wrong (i.e.: no operand corruption on disk)
|
||||
if (s.ok()) {
|
||||
std::string& original_key =
|
||||
keys_.back(); // The original key encountered
|
||||
orig_ikey.type = kTypeValue;
|
||||
UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
|
||||
operands_.back() = std::move(merge_result);
|
||||
success_ = true;
|
||||
}
|
||||
|
||||
// move iter to the next entry
|
||||
@ -168,9 +142,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
++(*steps);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (kTypeMerge == ikey.type) {
|
||||
} else {
|
||||
// hit a merge
|
||||
// => merge the operand into the front of the operands_ list
|
||||
// => use the user's associative merge function to determine how.
|
||||
|
155
db/merge_helper_test.cc
Normal file
155
db/merge_helper_test.cc
Normal file
@ -0,0 +1,155 @@
|
||||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "db/merge_helper.h"
|
||||
#include "rocksdb/comparator.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/testharness.h"
|
||||
#include "util/testutil.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class MergeHelperTest : public testing::Test {
|
||||
public:
|
||||
MergeHelperTest() : steps_(0) {}
|
||||
~MergeHelperTest() = default;
|
||||
|
||||
void RunUInt64MergeHelper(SequenceNumber stop_before, bool at_bottom) {
|
||||
InitIterator();
|
||||
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
||||
merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(),
|
||||
nullptr, 2U, true));
|
||||
merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr,
|
||||
&steps_, Env::Default());
|
||||
}
|
||||
|
||||
void RunStringAppendMergeHelper(SequenceNumber stop_before, bool at_bottom) {
|
||||
InitIterator();
|
||||
merge_op_ = MergeOperators::CreateStringAppendTESTOperator();
|
||||
merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(),
|
||||
nullptr, 2U, true));
|
||||
merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr,
|
||||
&steps_, Env::Default());
|
||||
}
|
||||
|
||||
std::string Key(const std::string& user_key, const SequenceNumber& seq,
|
||||
const ValueType& t) {
|
||||
std::string ikey;
|
||||
AppendInternalKey(&ikey, ParsedInternalKey(Slice(user_key), seq, t));
|
||||
return ikey;
|
||||
}
|
||||
|
||||
void AddKeyVal(const std::string& user_key, const SequenceNumber& seq,
|
||||
const ValueType& t, const std::string& val) {
|
||||
ks_.push_back(Key(user_key, seq, t));
|
||||
vs_.push_back(val);
|
||||
}
|
||||
|
||||
void InitIterator() {
|
||||
iter_.reset(new test::VectorIterator(ks_, vs_));
|
||||
iter_->SeekToFirst();
|
||||
}
|
||||
|
||||
std::string EncodeInt(uint64_t x) {
|
||||
std::string result;
|
||||
PutFixed64(&result, x);
|
||||
return result;
|
||||
}
|
||||
|
||||
void CheckState(bool success, int steps, int iter_pos) {
|
||||
ASSERT_EQ(success, merge_helper_->IsSuccess());
|
||||
ASSERT_EQ(steps, steps_);
|
||||
if (iter_pos == -1) {
|
||||
ASSERT_FALSE(iter_->Valid());
|
||||
} else {
|
||||
ASSERT_EQ(ks_[iter_pos], iter_->key());
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<test::VectorIterator> iter_;
|
||||
std::shared_ptr<MergeOperator> merge_op_;
|
||||
std::unique_ptr<MergeHelper> merge_helper_;
|
||||
std::vector<std::string> ks_;
|
||||
std::vector<std::string> vs_;
|
||||
int steps_;
|
||||
};
|
||||
|
||||
// If MergeHelper encounters a new key on the last level, we know that
|
||||
// the key has no more history and it can merge keys.
|
||||
TEST_F(MergeHelperTest, MergeAtBottomSuccess) {
|
||||
AddKeyVal("a", 20, kTypeMerge, EncodeInt(1U));
|
||||
AddKeyVal("a", 10, kTypeMerge, EncodeInt(3U));
|
||||
AddKeyVal("b", 10, kTypeMerge, EncodeInt(4U)); // <- Iterator after merge
|
||||
|
||||
RunUInt64MergeHelper(0, true);
|
||||
CheckState(true, 2, 2);
|
||||
ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->key());
|
||||
ASSERT_EQ(EncodeInt(4U), merge_helper_->value());
|
||||
}
|
||||
|
||||
// Merging with a value results in a successful merge.
|
||||
TEST_F(MergeHelperTest, MergeValue) {
|
||||
AddKeyVal("a", 40, kTypeMerge, EncodeInt(1U));
|
||||
AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U));
|
||||
AddKeyVal("a", 20, kTypeValue, EncodeInt(4U)); // <- Iterator after merge
|
||||
AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U));
|
||||
|
||||
RunUInt64MergeHelper(0, false);
|
||||
CheckState(true, 3, 3);
|
||||
ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->key());
|
||||
ASSERT_EQ(EncodeInt(8U), merge_helper_->value());
|
||||
}
|
||||
|
||||
// Merging stops before a snapshot.
|
||||
TEST_F(MergeHelperTest, SnapshotBeforeValue) {
|
||||
AddKeyVal("a", 50, kTypeMerge, EncodeInt(1U));
|
||||
AddKeyVal("a", 40, kTypeMerge, EncodeInt(3U)); // <- Iterator after merge
|
||||
AddKeyVal("a", 30, kTypeMerge, EncodeInt(1U));
|
||||
AddKeyVal("a", 20, kTypeValue, EncodeInt(4U));
|
||||
AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U));
|
||||
|
||||
RunUInt64MergeHelper(31, true);
|
||||
CheckState(false, 2, 2);
|
||||
ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]);
|
||||
ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]);
|
||||
}
|
||||
|
||||
// MergeHelper preserves the operand stack for merge operators that
|
||||
// cannot do a partial merge.
|
||||
TEST_F(MergeHelperTest, NoPartialMerge) {
|
||||
AddKeyVal("a", 50, kTypeMerge, "v2");
|
||||
AddKeyVal("a", 40, kTypeMerge, "v"); // <- Iterator after merge
|
||||
AddKeyVal("a", 30, kTypeMerge, "v");
|
||||
|
||||
RunStringAppendMergeHelper(31, true);
|
||||
CheckState(false, 2, 2);
|
||||
ASSERT_EQ(Key("a", 40, kTypeMerge), merge_helper_->keys()[0]);
|
||||
ASSERT_EQ("v", merge_helper_->values()[0]);
|
||||
ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[1]);
|
||||
ASSERT_EQ("v2", merge_helper_->values()[1]);
|
||||
}
|
||||
|
||||
// Merging with a deletion turns the deletion into a value
|
||||
TEST_F(MergeHelperTest, MergeDeletion) {
|
||||
AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U));
|
||||
AddKeyVal("a", 20, kTypeDeletion, "");
|
||||
|
||||
RunUInt64MergeHelper(15, false);
|
||||
CheckState(true, 2, -1);
|
||||
ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->key());
|
||||
ASSERT_EQ(EncodeInt(3U), merge_helper_->value());
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -5,45 +5,13 @@
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <algorithm>
|
||||
|
||||
#include "rocksdb/iterator.h"
|
||||
#include "table/merger.h"
|
||||
#include "util/testharness.h"
|
||||
#include "util/testutil.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class VectorIterator : public Iterator {
|
||||
public:
|
||||
explicit VectorIterator(const std::vector<std::string>& keys)
|
||||
: keys_(keys), current_(keys.size()) {
|
||||
std::sort(keys_.begin(), keys_.end());
|
||||
}
|
||||
|
||||
virtual bool Valid() const override { return current_ < keys_.size(); }
|
||||
|
||||
virtual void SeekToFirst() override { current_ = 0; }
|
||||
virtual void SeekToLast() override { current_ = keys_.size() - 1; }
|
||||
|
||||
virtual void Seek(const Slice& target) override {
|
||||
current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
|
||||
keys_.begin();
|
||||
}
|
||||
|
||||
virtual void Next() override { current_++; }
|
||||
virtual void Prev() override { current_--; }
|
||||
|
||||
virtual Slice key() const override { return Slice(keys_[current_]); }
|
||||
virtual Slice value() const override { return Slice(); }
|
||||
|
||||
virtual Status status() const override { return Status::OK(); }
|
||||
|
||||
private:
|
||||
std::vector<std::string> keys_;
|
||||
size_t current_;
|
||||
};
|
||||
|
||||
class MergerTest : public testing::Test {
|
||||
public:
|
||||
MergerTest()
|
||||
@ -123,14 +91,14 @@ class MergerTest : public testing::Test {
|
||||
std::vector<Iterator*> small_iterators;
|
||||
for (size_t i = 0; i < num_iterators; ++i) {
|
||||
auto strings = GenerateStrings(strings_per_iterator, letters_per_string);
|
||||
small_iterators.push_back(new VectorIterator(strings));
|
||||
small_iterators.push_back(new test::VectorIterator(strings));
|
||||
all_keys_.insert(all_keys_.end(), strings.begin(), strings.end());
|
||||
}
|
||||
|
||||
merging_iterator_.reset(
|
||||
NewMergingIterator(BytewiseComparator(), &small_iterators[0],
|
||||
static_cast<int>(small_iterators.size())));
|
||||
single_iterator_.reset(new VectorIterator(all_keys_));
|
||||
single_iterator_.reset(new test::VectorIterator(all_keys_));
|
||||
}
|
||||
|
||||
Random rnd_;
|
||||
|
@ -8,9 +8,13 @@
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#pragma once
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "db/dbformat.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "util/random.h"
|
||||
|
||||
@ -117,5 +121,44 @@ class SimpleSuffixReverseComparator : public Comparator {
|
||||
// endian machines.
|
||||
extern const Comparator* Uint64Comparator();
|
||||
|
||||
// Iterator over a vector of keys/values
|
||||
class VectorIterator : public Iterator {
|
||||
public:
|
||||
explicit VectorIterator(const std::vector<std::string>& keys)
|
||||
: keys_(keys), current_(keys.size()) {
|
||||
std::sort(keys_.begin(), keys_.end());
|
||||
values_.resize(keys.size());
|
||||
}
|
||||
|
||||
VectorIterator(const std::vector<std::string>& keys,
|
||||
const std::vector<std::string>& values)
|
||||
: keys_(keys), values_(values), current_(keys.size()) {
|
||||
assert(keys_.size() == values_.size());
|
||||
}
|
||||
|
||||
virtual bool Valid() const override { return current_ < keys_.size(); }
|
||||
|
||||
virtual void SeekToFirst() override { current_ = 0; }
|
||||
virtual void SeekToLast() override { current_ = keys_.size() - 1; }
|
||||
|
||||
virtual void Seek(const Slice& target) override {
|
||||
current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
|
||||
keys_.begin();
|
||||
}
|
||||
|
||||
virtual void Next() override { current_++; }
|
||||
virtual void Prev() override { current_--; }
|
||||
|
||||
virtual Slice key() const override { return Slice(keys_[current_]); }
|
||||
virtual Slice value() const override { return Slice(values_[current_]); }
|
||||
|
||||
virtual Status status() const override { return Status::OK(); }
|
||||
|
||||
private:
|
||||
std::vector<std::string> keys_;
|
||||
std::vector<std::string> values_;
|
||||
size_t current_;
|
||||
};
|
||||
|
||||
} // namespace test
|
||||
} // namespace rocksdb
|
||||
|
@ -4,6 +4,7 @@
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/merge_operator.h"
|
||||
#include "rocksdb/slice.h"
|
||||
|
Loading…
Reference in New Issue
Block a user