2016-12-16 11:00:03 -08:00
|
|
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
2017-07-15 16:03:42 -07:00
|
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
|
|
// (found in the LICENSE.Apache file in the root directory).
|
2016-12-16 11:00:03 -08:00
|
|
|
#include <string>
|
|
|
|
#include <vector>
|
|
|
|
|
|
|
|
#include "db/db_test_util.h"
|
|
|
|
#include "db/forward_iterator.h"
|
|
|
|
#include "port/stack_trace.h"
|
2017-09-28 15:49:05 -07:00
|
|
|
#include "rocksdb/merge_operator.h"
|
2016-12-16 11:00:03 -08:00
|
|
|
#include "utilities/merge_operators.h"
|
2017-09-28 15:49:05 -07:00
|
|
|
#include "utilities/merge_operators/string_append/stringappend2.h"
|
2016-12-16 11:00:03 -08:00
|
|
|
|
|
|
|
namespace rocksdb {
|
|
|
|
|
2018-02-09 14:43:56 -08:00
|
|
|
class TestReadCallback : public ReadCallback {
|
|
|
|
public:
|
|
|
|
TestReadCallback(SnapshotChecker* snapshot_checker,
|
|
|
|
SequenceNumber snapshot_seq)
|
|
|
|
: snapshot_checker_(snapshot_checker), snapshot_seq_(snapshot_seq) {}
|
|
|
|
|
|
|
|
bool IsCommitted(SequenceNumber seq) override {
|
|
|
|
return snapshot_checker_->IsInSnapshot(seq, snapshot_seq_);
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
SnapshotChecker* snapshot_checker_;
|
|
|
|
SequenceNumber snapshot_seq_;
|
|
|
|
};
|
|
|
|
|
2016-12-16 11:00:03 -08:00
|
|
|
// Test merge operator functionality.
|
|
|
|
class DBMergeOperatorTest : public DBTestBase {
|
|
|
|
public:
|
|
|
|
DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
|
2018-02-09 14:43:56 -08:00
|
|
|
|
|
|
|
std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
|
|
|
|
const Slice& key,
|
|
|
|
const Snapshot* snapshot = nullptr) {
|
|
|
|
SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
|
|
|
|
: snapshot->GetSequenceNumber();
|
|
|
|
TestReadCallback read_callback(snapshot_checker, seq);
|
|
|
|
ReadOptions read_opt;
|
|
|
|
read_opt.snapshot = snapshot;
|
|
|
|
PinnableSlice value;
|
|
|
|
Status s =
|
|
|
|
dbfull()->GetImpl(read_opt, db_->DefaultColumnFamily(), key, &value,
|
|
|
|
nullptr /*value_found*/, &read_callback);
|
|
|
|
if (!s.ok()) {
|
|
|
|
return s.ToString();
|
|
|
|
}
|
|
|
|
return value.ToString();
|
|
|
|
}
|
2016-12-16 11:00:03 -08:00
|
|
|
};
|
|
|
|
|
2017-09-28 15:49:05 -07:00
|
|
|
TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
|
|
|
|
class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
|
|
|
|
public:
|
|
|
|
LimitedStringAppendMergeOp(int limit, char delim)
|
|
|
|
: StringAppendTESTOperator(delim), limit_(limit) {}
|
|
|
|
|
|
|
|
const char* Name() const override {
|
|
|
|
return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
|
|
|
|
}
|
|
|
|
|
|
|
|
bool ShouldMerge(const std::vector<Slice>& operands) const override {
|
|
|
|
if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
size_t limit_ = 0;
|
|
|
|
};
|
|
|
|
|
|
|
|
Options options;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
// Use only the latest two merge operands.
|
|
|
|
options.merge_operator =
|
|
|
|
std::make_shared<LimitedStringAppendMergeOp>(2, ',');
|
|
|
|
options.env = env_;
|
|
|
|
Reopen(options);
|
|
|
|
// All K1 values are in memtable.
|
|
|
|
ASSERT_OK(Merge("k1", "a"));
|
|
|
|
ASSERT_OK(Merge("k1", "b"));
|
|
|
|
ASSERT_OK(Merge("k1", "c"));
|
|
|
|
ASSERT_OK(Merge("k1", "d"));
|
|
|
|
std::string value;
|
|
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
|
|
|
|
// Make sure that only the latest two merge operands are used. If this was
|
|
|
|
// not the case the value would be "a,b,c,d".
|
|
|
|
ASSERT_EQ(value, "c,d");
|
|
|
|
|
|
|
|
// All K2 values are flushed to L0 into a single file.
|
|
|
|
ASSERT_OK(Merge("k2", "a"));
|
|
|
|
ASSERT_OK(Merge("k2", "b"));
|
|
|
|
ASSERT_OK(Merge("k2", "c"));
|
|
|
|
ASSERT_OK(Merge("k2", "d"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
|
|
|
|
ASSERT_EQ(value, "c,d");
|
|
|
|
|
|
|
|
// All K3 values are flushed and are in different files.
|
|
|
|
ASSERT_OK(Merge("k3", "ab"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_OK(Merge("k3", "bc"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_OK(Merge("k3", "cd"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_OK(Merge("k3", "de"));
|
|
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
|
|
|
|
ASSERT_EQ(value, "cd,de");
|
|
|
|
|
|
|
|
// All K4 values are in different levels
|
|
|
|
ASSERT_OK(Merge("k4", "ab"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
MoveFilesToLevel(4);
|
|
|
|
ASSERT_OK(Merge("k4", "bc"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
MoveFilesToLevel(3);
|
|
|
|
ASSERT_OK(Merge("k4", "cd"));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
MoveFilesToLevel(1);
|
|
|
|
ASSERT_OK(Merge("k4", "de"));
|
|
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
|
|
|
|
ASSERT_EQ(value, "cd,de");
|
|
|
|
}
|
|
|
|
|
2016-12-16 11:00:03 -08:00
|
|
|
TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
|
|
|
|
Options options;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.merge_operator.reset(new TestPutOperator());
|
2017-03-13 09:41:30 -07:00
|
|
|
options.env = env_;
|
2016-12-16 11:00:03 -08:00
|
|
|
Reopen(options);
|
|
|
|
ASSERT_OK(Merge("k1", "v1"));
|
|
|
|
ASSERT_OK(Merge("k1", "corrupted"));
|
|
|
|
std::string value;
|
|
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
|
|
|
|
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
|
|
|
|
Options options;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.merge_operator.reset(new TestPutOperator());
|
|
|
|
options.max_successive_merges = 3;
|
2017-03-13 09:41:30 -07:00
|
|
|
options.env = env_;
|
2016-12-16 11:00:03 -08:00
|
|
|
Reopen(options);
|
|
|
|
ASSERT_OK(Merge("k1", "v1"));
|
|
|
|
ASSERT_OK(Merge("k1", "v2"));
|
|
|
|
// Will trigger a merge when hitting max_successive_merges and the merge
|
|
|
|
// will fail. The delta will be inserted nevertheless.
|
|
|
|
ASSERT_OK(Merge("k1", "corrupted"));
|
|
|
|
// Data should stay unmerged after the error.
|
|
|
|
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
|
|
|
|
Options options;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.merge_operator.reset(new TestPutOperator());
|
2017-03-13 09:41:30 -07:00
|
|
|
options.env = env_;
|
2016-12-16 11:00:03 -08:00
|
|
|
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
ASSERT_OK(Merge("k1", "v1"));
|
|
|
|
ASSERT_OK(Merge("k1", "corrupted"));
|
|
|
|
ASSERT_OK(Put("k2", "v2"));
|
2017-12-06 11:54:45 -08:00
|
|
|
auto* iter = db_->NewIterator(ReadOptions());
|
|
|
|
iter->Seek("k1");
|
|
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
ASSERT_TRUE(iter->status().IsCorruption());
|
|
|
|
delete iter;
|
|
|
|
iter = db_->NewIterator(ReadOptions());
|
|
|
|
iter->Seek("k2");
|
|
|
|
ASSERT_TRUE(iter->Valid());
|
|
|
|
ASSERT_OK(iter->status());
|
|
|
|
iter->Prev();
|
|
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
ASSERT_TRUE(iter->status().IsCorruption());
|
|
|
|
delete iter;
|
2016-12-16 11:00:03 -08:00
|
|
|
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
|
|
|
|
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
ASSERT_OK(Merge("k1", "v1"));
|
|
|
|
ASSERT_OK(Put("k2", "v2"));
|
|
|
|
ASSERT_OK(Merge("k2", "corrupted"));
|
2017-12-06 11:54:45 -08:00
|
|
|
iter = db_->NewIterator(ReadOptions());
|
|
|
|
iter->Seek("k1");
|
|
|
|
ASSERT_TRUE(iter->Valid());
|
|
|
|
ASSERT_OK(iter->status());
|
|
|
|
iter->Next();
|
|
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
ASSERT_TRUE(iter->status().IsCorruption());
|
|
|
|
delete iter;
|
2016-12-16 11:00:03 -08:00
|
|
|
VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
|
|
|
|
}
|
|
|
|
|
2017-04-11 16:11:27 -07:00
|
|
|
|
|
|
|
class MergeOperatorPinningTest : public DBMergeOperatorTest,
|
|
|
|
public testing::WithParamInterface<bool> {
|
|
|
|
public:
|
|
|
|
MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
|
|
|
|
|
|
|
|
bool disable_block_cache_;
|
|
|
|
};
|
|
|
|
|
|
|
|
INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
|
|
|
|
::testing::Bool());
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.block_size = 1; // every block will contain one entry
|
|
|
|
table_options.no_block_cache = disable_block_cache_;
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
|
|
options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
|
|
|
|
options.level0_slowdown_writes_trigger = (1 << 30);
|
|
|
|
options.level0_stop_writes_trigger = (1 << 30);
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
const int kKeysPerFile = 10;
|
|
|
|
const int kOperandsPerKeyPerFile = 7;
|
|
|
|
const int kOperandSize = 100;
|
|
|
|
// Filse to write in L0 before compacting to lower level
|
|
|
|
const int kFilesPerLevel = 3;
|
|
|
|
|
|
|
|
Random rnd(301);
|
|
|
|
std::map<std::string, std::string> true_data;
|
|
|
|
int batch_num = 1;
|
|
|
|
int lvl_to_fill = 4;
|
|
|
|
int key_id = 0;
|
|
|
|
while (true) {
|
|
|
|
for (int j = 0; j < kKeysPerFile; j++) {
|
|
|
|
std::string key = Key(key_id % 35);
|
|
|
|
key_id++;
|
|
|
|
for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
|
|
|
|
std::string val = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Merge(WriteOptions(), key, val));
|
|
|
|
if (true_data[key].size() == 0) {
|
|
|
|
true_data[key] = val;
|
|
|
|
} else {
|
|
|
|
true_data[key] += "," + val;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (lvl_to_fill == -1) {
|
|
|
|
// Keep last batch in memtable and stop
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
if (batch_num % kFilesPerLevel == 0) {
|
|
|
|
if (lvl_to_fill != 0) {
|
|
|
|
MoveFilesToLevel(lvl_to_fill);
|
|
|
|
}
|
|
|
|
lvl_to_fill--;
|
|
|
|
}
|
|
|
|
batch_num++;
|
|
|
|
}
|
|
|
|
|
|
|
|
// 3 L0 files
|
|
|
|
// 1 L1 file
|
|
|
|
// 3 L2 files
|
|
|
|
// 1 L3 file
|
|
|
|
// 3 L4 Files
|
|
|
|
ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
|
|
|
|
|
|
|
|
VerifyDBFromMap(true_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_P(MergeOperatorPinningTest, Randomized) {
|
|
|
|
do {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.merge_operator = MergeOperators::CreateMaxOperator();
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.no_block_cache = disable_block_cache_;
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
Random rnd(301);
|
|
|
|
std::map<std::string, std::string> true_data;
|
|
|
|
|
2018-03-02 16:19:56 -08:00
|
|
|
const int kTotalMerges = 5000;
|
2017-04-11 16:11:27 -07:00
|
|
|
// Every key gets ~10 operands
|
|
|
|
const int kKeyRange = kTotalMerges / 10;
|
|
|
|
const int kOperandSize = 20;
|
|
|
|
const int kNumPutBefore = kKeyRange / 10; // 10% value
|
|
|
|
const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
|
|
|
|
const int kNumDelete = kKeyRange / 10; // 10% delete
|
|
|
|
|
|
|
|
// kNumPutBefore keys will have base values
|
|
|
|
for (int i = 0; i < kNumPutBefore; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
std::string value = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), key, value));
|
|
|
|
|
|
|
|
true_data[key] = value;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Do kTotalMerges merges
|
|
|
|
for (int i = 0; i < kTotalMerges; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
std::string value = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Merge(WriteOptions(), key, value));
|
|
|
|
|
|
|
|
if (true_data[key] < value) {
|
|
|
|
true_data[key] = value;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Overwrite random kNumPutAfter keys
|
|
|
|
for (int i = 0; i < kNumPutAfter; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
std::string value = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), key, value));
|
|
|
|
|
|
|
|
true_data[key] = value;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete random kNumDelete keys
|
|
|
|
for (int i = 0; i < kNumDelete; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
ASSERT_OK(db_->Delete(WriteOptions(), key));
|
|
|
|
|
|
|
|
true_data.erase(key);
|
|
|
|
}
|
|
|
|
|
|
|
|
VerifyDBFromMap(true_data);
|
|
|
|
|
|
|
|
// Skip HashCuckoo since it does not support merge operators
|
|
|
|
} while (ChangeOptions(kSkipMergePut | kSkipHashCuckoo));
|
|
|
|
}
|
|
|
|
|
|
|
|
class MergeOperatorHook : public MergeOperator {
|
|
|
|
public:
|
|
|
|
explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
|
|
|
|
: merge_op_(_merge_op) {}
|
|
|
|
|
|
|
|
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
|
|
|
|
MergeOperationOutput* merge_out) const override {
|
|
|
|
before_merge_();
|
|
|
|
bool res = merge_op_->FullMergeV2(merge_in, merge_out);
|
|
|
|
after_merge_();
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual const char* Name() const override { return merge_op_->Name(); }
|
|
|
|
|
|
|
|
std::shared_ptr<MergeOperator> merge_op_;
|
|
|
|
std::function<void()> before_merge_ = []() {};
|
|
|
|
std::function<void()> after_merge_ = []() {};
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
|
|
|
|
auto merge_hook =
|
|
|
|
std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
|
|
|
|
options.merge_operator = merge_hook;
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
options.level0_slowdown_writes_trigger = (1 << 30);
|
|
|
|
options.level0_stop_writes_trigger = (1 << 30);
|
|
|
|
options.max_open_files = 20;
|
|
|
|
BlockBasedTableOptions bbto;
|
|
|
|
bbto.no_block_cache = disable_block_cache_;
|
|
|
|
if (bbto.no_block_cache == false) {
|
|
|
|
bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
|
|
|
|
} else {
|
|
|
|
bbto.block_cache = nullptr;
|
|
|
|
}
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
const int kNumOperands = 30;
|
|
|
|
const int kNumKeys = 1000;
|
|
|
|
const int kOperandSize = 100;
|
|
|
|
Random rnd(301);
|
|
|
|
|
|
|
|
// 1000 keys every key have 30 operands, every operand is in a different file
|
|
|
|
std::map<std::string, std::string> true_data;
|
|
|
|
for (int i = 0; i < kNumOperands; i++) {
|
|
|
|
for (int j = 0; j < kNumKeys; j++) {
|
|
|
|
std::string k = Key(j);
|
|
|
|
std::string v = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Merge(WriteOptions(), k, v));
|
|
|
|
|
|
|
|
true_data[k] = std::max(true_data[k], v);
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
|
|
|
|
ASSERT_EQ(file_numbers.size(), kNumOperands);
|
|
|
|
int merge_cnt = 0;
|
|
|
|
|
|
|
|
// Code executed before merge operation
|
|
|
|
merge_hook->before_merge_ = [&]() {
|
|
|
|
// Evict all tables from cache before every merge operation
|
|
|
|
for (uint64_t num : file_numbers) {
|
|
|
|
TableCache::Evict(dbfull()->TEST_table_cache(), num);
|
|
|
|
}
|
|
|
|
// Decrease cache capacity to force all unrefed blocks to be evicted
|
|
|
|
if (bbto.block_cache) {
|
|
|
|
bbto.block_cache->SetCapacity(1);
|
|
|
|
}
|
|
|
|
merge_cnt++;
|
|
|
|
};
|
|
|
|
|
|
|
|
// Code executed after merge operation
|
|
|
|
merge_hook->after_merge_ = [&]() {
|
|
|
|
// Increase capacity again after doing the merge
|
|
|
|
if (bbto.block_cache) {
|
|
|
|
bbto.block_cache->SetCapacity(64 * 1024 * 1024);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
size_t total_reads;
|
|
|
|
VerifyDBFromMap(true_data, &total_reads);
|
|
|
|
ASSERT_EQ(merge_cnt, total_reads);
|
|
|
|
|
|
|
|
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
|
|
|
|
|
|
|
VerifyDBFromMap(true_data, &total_reads);
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_P(MergeOperatorPinningTest, TailingIterator) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.merge_operator = MergeOperators::CreateMaxOperator();
|
|
|
|
BlockBasedTableOptions bbto;
|
|
|
|
bbto.no_block_cache = disable_block_cache_;
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
const int kNumOperands = 100;
|
|
|
|
const int kNumWrites = 100000;
|
|
|
|
|
|
|
|
std::function<void()> writer_func = [&]() {
|
|
|
|
int k = 0;
|
|
|
|
for (int i = 0; i < kNumWrites; i++) {
|
|
|
|
db_->Merge(WriteOptions(), Key(k), Key(k));
|
|
|
|
|
|
|
|
if (i && i % kNumOperands == 0) {
|
|
|
|
k++;
|
|
|
|
}
|
|
|
|
if (i && i % 127 == 0) {
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
if (i && i % 317 == 0) {
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
std::function<void()> reader_func = [&]() {
|
|
|
|
ReadOptions ro;
|
|
|
|
ro.tailing = true;
|
|
|
|
Iterator* iter = db_->NewIterator(ro);
|
|
|
|
|
|
|
|
iter->SeekToFirst();
|
|
|
|
for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
|
|
|
|
while (!iter->Valid()) {
|
|
|
|
// wait for the key to be written
|
|
|
|
env_->SleepForMicroseconds(100);
|
|
|
|
iter->Seek(Key(i));
|
|
|
|
}
|
|
|
|
ASSERT_EQ(iter->key(), Key(i));
|
|
|
|
ASSERT_EQ(iter->value(), Key(i));
|
|
|
|
|
|
|
|
iter->Next();
|
|
|
|
}
|
|
|
|
|
|
|
|
delete iter;
|
|
|
|
};
|
|
|
|
|
|
|
|
rocksdb::port::Thread writer_thread(writer_func);
|
|
|
|
rocksdb::port::Thread reader_thread(reader_func);
|
|
|
|
|
|
|
|
writer_thread.join();
|
|
|
|
reader_thread.join();
|
|
|
|
}
|
2018-02-02 21:16:26 -08:00
|
|
|
|
|
|
|
TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.merge_operator = MergeOperators::CreateStringAppendOperator();
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
// Overview of the test:
|
|
|
|
// * There are two merge operands for the same key: one in an sst file,
|
|
|
|
// another in a memtable.
|
|
|
|
// * Seek a tailing iterator to this key.
|
|
|
|
// * As part of the seek, the iterator will:
|
|
|
|
// (a) first visit the operand in the memtable and tell ForwardIterator
|
|
|
|
// to pin this operand, then
|
|
|
|
// (b) move on to the operand in the sst file, then pass both operands
|
|
|
|
// to merge operator.
|
|
|
|
// * The memtable may get flushed and unreferenced by another thread between
|
|
|
|
// (a) and (b). The test simulates it by flushing the memtable inside a
|
|
|
|
// SyncPoint callback located between (a) and (b).
|
|
|
|
// * In this case it's ForwardIterator's responsibility to keep the memtable
|
|
|
|
// pinned until (b) is complete. There used to be a bug causing
|
|
|
|
// ForwardIterator to not pin it in some circumstances. This test
|
|
|
|
// reproduces it.
|
|
|
|
|
|
|
|
db_->Merge(WriteOptions(), "key", "sst");
|
|
|
|
db_->Flush(FlushOptions()); // Switch to SuperVersion A
|
|
|
|
db_->Merge(WriteOptions(), "key", "memtable");
|
|
|
|
|
|
|
|
// Pin SuperVersion A
|
|
|
|
std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
|
|
|
|
|
|
|
|
bool pushed_first_operand = false;
|
|
|
|
bool stepped_to_next_operand = false;
|
|
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
|
|
|
|
EXPECT_FALSE(pushed_first_operand);
|
|
|
|
pushed_first_operand = true;
|
|
|
|
db_->Flush(FlushOptions()); // Switch to SuperVersion B
|
|
|
|
});
|
|
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
|
|
|
|
EXPECT_FALSE(stepped_to_next_operand);
|
|
|
|
stepped_to_next_operand = true;
|
|
|
|
someone_else.reset(); // Unpin SuperVersion A
|
|
|
|
});
|
|
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
|
|
|
|
ReadOptions ro;
|
|
|
|
ro.tailing = true;
|
|
|
|
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
|
|
|
|
iter->Seek("key");
|
|
|
|
|
|
|
|
ASSERT_TRUE(iter->status().ok());
|
|
|
|
ASSERT_TRUE(iter->Valid());
|
|
|
|
EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
|
|
|
|
EXPECT_TRUE(pushed_first_operand);
|
|
|
|
EXPECT_TRUE(stepped_to_next_operand);
|
|
|
|
}
|
2017-04-11 16:11:27 -07:00
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
2018-02-09 14:43:56 -08:00
|
|
|
TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.merge_operator = MergeOperators::CreateStringAppendOperator();
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
class TestSnapshotChecker : public SnapshotChecker {
|
|
|
|
bool IsInSnapshot(SequenceNumber seq,
|
|
|
|
SequenceNumber snapshot_seq) const override {
|
|
|
|
switch (snapshot_seq) {
|
|
|
|
case 0:
|
|
|
|
return seq == 0;
|
|
|
|
case 1:
|
|
|
|
return seq <= 1;
|
|
|
|
case 2:
|
|
|
|
// seq = 2 not visible to snapshot with seq = 2
|
|
|
|
return seq <= 1;
|
|
|
|
case 3:
|
|
|
|
return seq <= 3;
|
|
|
|
case 4:
|
|
|
|
// seq = 4 not visible to snpahost with seq = 4
|
|
|
|
return seq <= 3;
|
|
|
|
default:
|
|
|
|
// seq >=4 is uncommitted
|
|
|
|
return seq <= 4;
|
|
|
|
};
|
|
|
|
}
|
|
|
|
};
|
|
|
|
TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
|
|
|
|
dbfull()->SetSnapshotChecker(snapshot_checker);
|
|
|
|
|
|
|
|
std::string value;
|
|
|
|
ASSERT_OK(Merge("foo", "v1"));
|
|
|
|
ASSERT_EQ(1, db_->GetLatestSequenceNumber());
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
ASSERT_OK(Merge("foo", "v2"));
|
|
|
|
ASSERT_EQ(2, db_->GetLatestSequenceNumber());
|
|
|
|
// v2 is not visible to latest snapshot, which has seq = 2.
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
// Take a snapshot with seq = 2.
|
|
|
|
const Snapshot* snapshot1 = db_->GetSnapshot();
|
|
|
|
ASSERT_EQ(2, snapshot1->GetSequenceNumber());
|
|
|
|
// v2 is not visible to snapshot1, which has seq = 2
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
|
|
|
|
|
|
|
|
// Verify flush doesn't alter the result.
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
|
|
|
|
ASSERT_OK(Merge("foo", "v3"));
|
|
|
|
ASSERT_EQ(3, db_->GetLatestSequenceNumber());
|
|
|
|
ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
ASSERT_OK(Merge("foo", "v4"));
|
|
|
|
ASSERT_EQ(4, db_->GetLatestSequenceNumber());
|
|
|
|
// v4 is not visible to latest snapshot, which has seq = 4.
|
|
|
|
ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
const Snapshot* snapshot2 = db_->GetSnapshot();
|
|
|
|
ASSERT_EQ(4, snapshot2->GetSequenceNumber());
|
|
|
|
// v4 is not visible to snapshot2, which has seq = 4.
|
|
|
|
ASSERT_EQ("v1,v2,v3",
|
|
|
|
GetWithReadCallback(snapshot_checker, "foo", snapshot2));
|
|
|
|
|
|
|
|
// Verify flush doesn't alter the result.
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
|
|
|
|
ASSERT_EQ("v1,v2,v3",
|
|
|
|
GetWithReadCallback(snapshot_checker, "foo", snapshot2));
|
|
|
|
ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
|
|
|
|
ASSERT_OK(Merge("foo", "v5"));
|
|
|
|
ASSERT_EQ(5, db_->GetLatestSequenceNumber());
|
|
|
|
// v5 is uncommitted
|
|
|
|
ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
|
|
|
|
// full manual compaction.
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
|
|
|
|
// Verify compaction doesn't alter the result.
|
|
|
|
ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
|
|
|
|
ASSERT_EQ("v1,v2,v3",
|
|
|
|
GetWithReadCallback(snapshot_checker, "foo", snapshot2));
|
|
|
|
ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
|
|
|
|
|
|
|
|
db_->ReleaseSnapshot(snapshot1);
|
|
|
|
db_->ReleaseSnapshot(snapshot2);
|
|
|
|
}
|
|
|
|
|
2016-12-16 11:00:03 -08:00
|
|
|
} // namespace rocksdb
|
|
|
|
|
|
|
|
int main(int argc, char** argv) {
|
|
|
|
rocksdb::port::InstallStackTraceHandler();
|
|
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
|
|
return RUN_ALL_TESTS();
|
|
|
|
}
|