dc5de45af8
Summary: The patch adds a new BlobDB configuration option `blob_compaction_readahead_size` that can be used to enable prefetching data from blob files during compaction. This is important when using storage with higher latencies like HDDs or remote filesystems. If enabled, prefetching is used for all cases when blobs are read during compaction, namely garbage collection, compaction filters (when the existing value has to be read from a blob file), and `Merge` (when the value of the base `Put` is stored in a blob file). Pull Request resolved: https://github.com/facebook/rocksdb/pull/9187 Test Plan: Ran `make check` and the stress/crash test. Reviewed By: riversand963 Differential Revision: D32565512 Pulled By: ltamasi fbshipit-source-id: 87be9cebc3aa01cc227bec6b5f64d827b8164f5d
297 lines
11 KiB
C++
297 lines
11 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/merge_helper.h"
|
|
|
|
#include <algorithm>
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
#include "db/dbformat.h"
|
|
#include "rocksdb/comparator.h"
|
|
#include "test_util/testharness.h"
|
|
#include "test_util/testutil.h"
|
|
#include "util/coding.h"
|
|
#include "util/vector_iterator.h"
|
|
#include "utilities/merge_operators.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class MergeHelperTest : public testing::Test {
|
|
public:
|
|
MergeHelperTest() : icmp_(BytewiseComparator()) { env_ = Env::Default(); }
|
|
|
|
~MergeHelperTest() override = default;
|
|
|
|
Status Run(SequenceNumber stop_before, bool at_bottom,
|
|
SequenceNumber latest_snapshot = 0) {
|
|
iter_.reset(new VectorIterator(ks_, vs_, &icmp_));
|
|
iter_->SeekToFirst();
|
|
merge_helper_.reset(new MergeHelper(env_, icmp_.user_comparator(),
|
|
merge_op_.get(), filter_.get(), nullptr,
|
|
false, latest_snapshot));
|
|
return merge_helper_->MergeUntil(
|
|
iter_.get(), nullptr /* range_del_agg */, stop_before, at_bottom,
|
|
false /* allow_data_in_errors */, nullptr /* blob_fetcher */,
|
|
nullptr /* prefetch_buffers */, nullptr /* c_iter_stats */);
|
|
}
|
|
|
|
void AddKeyVal(const std::string& user_key, const SequenceNumber& seq,
|
|
const ValueType& t, const std::string& val,
|
|
bool corrupt = false) {
|
|
InternalKey ikey(user_key, seq, t);
|
|
if (corrupt) {
|
|
test::CorruptKeyType(&ikey);
|
|
}
|
|
ks_.push_back(ikey.Encode().ToString());
|
|
vs_.push_back(val);
|
|
}
|
|
|
|
Env* env_;
|
|
InternalKeyComparator icmp_;
|
|
std::unique_ptr<VectorIterator> iter_;
|
|
std::shared_ptr<MergeOperator> merge_op_;
|
|
std::unique_ptr<MergeHelper> merge_helper_;
|
|
std::vector<std::string> ks_;
|
|
std::vector<std::string> vs_;
|
|
std::unique_ptr<test::FilterNumber> filter_;
|
|
};
|
|
|
|
// If MergeHelper encounters a new key on the last level, we know that
|
|
// the key has no more history and it can merge keys.
|
|
TEST_F(MergeHelperTest, MergeAtBottomSuccess) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 20, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("b", 10, kTypeMerge, test::EncodeInt(4U)); // <- iter_ after merge
|
|
|
|
ASSERT_TRUE(Run(0, true).ok());
|
|
ASSERT_EQ(ks_[2], iter_->key());
|
|
ASSERT_EQ(test::KeyStr("a", 20, kTypeValue), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// Merging with a value results in a successful merge.
|
|
TEST_F(MergeHelperTest, MergeValue) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 40, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 20, kTypeValue, test::EncodeInt(4U)); // <- iter_ after merge
|
|
AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(1U));
|
|
|
|
ASSERT_TRUE(Run(0, false).ok());
|
|
ASSERT_EQ(ks_[3], iter_->key());
|
|
ASSERT_EQ(test::KeyStr("a", 40, kTypeValue), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(8U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// Merging stops before a snapshot.
|
|
TEST_F(MergeHelperTest, SnapshotBeforeValue) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 50, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 40, kTypeMerge, test::EncodeInt(3U)); // <- iter_ after merge
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 20, kTypeValue, test::EncodeInt(4U));
|
|
AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(1U));
|
|
|
|
ASSERT_TRUE(Run(31, true).IsMergeInProgress());
|
|
ASSERT_EQ(ks_[2], iter_->key());
|
|
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// MergeHelper preserves the operand stack for merge operators that
|
|
// cannot do a partial merge.
|
|
TEST_F(MergeHelperTest, NoPartialMerge) {
|
|
merge_op_ = MergeOperators::CreateStringAppendTESTOperator();
|
|
|
|
AddKeyVal("a", 50, kTypeMerge, "v2");
|
|
AddKeyVal("a", 40, kTypeMerge, "v"); // <- iter_ after merge
|
|
AddKeyVal("a", 30, kTypeMerge, "v");
|
|
|
|
ASSERT_TRUE(Run(31, true).IsMergeInProgress());
|
|
ASSERT_EQ(ks_[2], iter_->key());
|
|
ASSERT_EQ(test::KeyStr("a", 40, kTypeMerge), merge_helper_->keys()[0]);
|
|
ASSERT_EQ("v", merge_helper_->values()[0]);
|
|
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[1]);
|
|
ASSERT_EQ("v2", merge_helper_->values()[1]);
|
|
ASSERT_EQ(2U, merge_helper_->keys().size());
|
|
ASSERT_EQ(2U, merge_helper_->values().size());
|
|
}
|
|
|
|
// A single operand can not be merged.
|
|
TEST_F(MergeHelperTest, SingleOperand) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 50, kTypeMerge, test::EncodeInt(1U));
|
|
|
|
ASSERT_TRUE(Run(31, false).IsMergeInProgress());
|
|
ASSERT_FALSE(iter_->Valid());
|
|
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(1U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// Merging with a deletion turns the deletion into a value
|
|
TEST_F(MergeHelperTest, MergeDeletion) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 20, kTypeDeletion, "");
|
|
|
|
ASSERT_TRUE(Run(15, false).ok());
|
|
ASSERT_FALSE(iter_->Valid());
|
|
ASSERT_EQ(test::KeyStr("a", 30, kTypeValue), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(3U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// The merge helper stops upon encountering a corrupt key
|
|
TEST_F(MergeHelperTest, CorruptKey) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(1U));
|
|
// Corrupt key
|
|
AddKeyVal("a", 20, kTypeDeletion, "", true); // <- iter_ after merge
|
|
|
|
ASSERT_TRUE(Run(15, false).IsMergeInProgress());
|
|
ASSERT_EQ(ks_[2], iter_->key());
|
|
ASSERT_EQ(test::KeyStr("a", 30, kTypeMerge), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// The compaction filter is called on every merge operand
|
|
TEST_F(MergeHelperTest, FilterMergeOperands) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
filter_.reset(new test::FilterNumber(5U));
|
|
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("a", 25, kTypeValue, test::EncodeInt(1U));
|
|
|
|
ASSERT_TRUE(Run(15, false).ok());
|
|
ASSERT_FALSE(iter_->Valid());
|
|
MergeOutputIterator merge_output_iter(merge_helper_.get());
|
|
merge_output_iter.SeekToFirst();
|
|
ASSERT_EQ(test::KeyStr("a", 30, kTypeValue),
|
|
merge_output_iter.key().ToString());
|
|
ASSERT_EQ(test::EncodeInt(8U), merge_output_iter.value().ToString());
|
|
merge_output_iter.Next();
|
|
ASSERT_FALSE(merge_output_iter.Valid());
|
|
}
|
|
|
|
TEST_F(MergeHelperTest, FilterAllMergeOperands) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
filter_.reset(new test::FilterNumber(5U));
|
|
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U));
|
|
|
|
// filtered out all
|
|
ASSERT_TRUE(Run(15, false).ok());
|
|
ASSERT_FALSE(iter_->Valid());
|
|
MergeOutputIterator merge_output_iter(merge_helper_.get());
|
|
merge_output_iter.SeekToFirst();
|
|
ASSERT_FALSE(merge_output_iter.Valid());
|
|
|
|
// we have one operand that will survive because it's a delete
|
|
AddKeyVal("a", 24, kTypeDeletion, test::EncodeInt(5U));
|
|
AddKeyVal("b", 23, kTypeValue, test::EncodeInt(5U));
|
|
ASSERT_TRUE(Run(15, true).ok());
|
|
merge_output_iter = MergeOutputIterator(merge_helper_.get());
|
|
ASSERT_TRUE(iter_->Valid());
|
|
merge_output_iter.SeekToFirst();
|
|
ASSERT_FALSE(merge_output_iter.Valid());
|
|
|
|
// when all merge operands are filtered out, we leave the iterator pointing to
|
|
// the Put/Delete that survived
|
|
ASSERT_EQ(test::KeyStr("a", 24, kTypeDeletion), iter_->key().ToString());
|
|
ASSERT_EQ(test::EncodeInt(5U), iter_->value().ToString());
|
|
}
|
|
|
|
// Make sure that merge operands are filtered at the beginning
|
|
TEST_F(MergeHelperTest, FilterFirstMergeOperand) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
filter_.reset(new test::FilterNumber(5U));
|
|
|
|
AddKeyVal("a", 31, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(2U));
|
|
AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("b", 24, kTypeValue, test::EncodeInt(5U)); // next user key
|
|
|
|
ASSERT_OK(Run(15, true));
|
|
ASSERT_TRUE(iter_->Valid());
|
|
MergeOutputIterator merge_output_iter(merge_helper_.get());
|
|
merge_output_iter.SeekToFirst();
|
|
// sequence number is 29 here, because the first merge operand got filtered
|
|
// out
|
|
ASSERT_EQ(test::KeyStr("a", 29, kTypeValue),
|
|
merge_output_iter.key().ToString());
|
|
ASSERT_EQ(test::EncodeInt(6U), merge_output_iter.value().ToString());
|
|
merge_output_iter.Next();
|
|
ASSERT_FALSE(merge_output_iter.Valid());
|
|
|
|
// make sure that we're passing user keys into the filter
|
|
ASSERT_EQ("a", filter_->last_merge_operand_key());
|
|
}
|
|
|
|
// Make sure that merge operands are not filtered out if there's a snapshot
|
|
// pointing at them
|
|
TEST_F(MergeHelperTest, DontFilterMergeOperandsBeforeSnapshotTest) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
filter_.reset(new test::FilterNumber(5U));
|
|
|
|
AddKeyVal("a", 31, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(2U));
|
|
AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("b", 24, kTypeValue, test::EncodeInt(5U));
|
|
|
|
ASSERT_OK(Run(15, true, 32));
|
|
ASSERT_TRUE(iter_->Valid());
|
|
MergeOutputIterator merge_output_iter(merge_helper_.get());
|
|
merge_output_iter.SeekToFirst();
|
|
ASSERT_EQ(test::KeyStr("a", 31, kTypeValue),
|
|
merge_output_iter.key().ToString());
|
|
ASSERT_EQ(test::EncodeInt(26U), merge_output_iter.value().ToString());
|
|
merge_output_iter.Next();
|
|
ASSERT_FALSE(merge_output_iter.Valid());
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|