d8c28e692a
Summary: Previously, options.ttl cannot be set with options.max_open_files = -1, because it makes use of creation_time field in table properties, which is not available unless max_open_files = -1. With this commit, the information will be stored in manifest and when it is available, will be used instead. Note that, this change will break forward compatibility for release 5.1 and older. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6060 Test Plan: Extend existing test case to options.max_open_files != -1, and simulate backward compatility in one test case by forcing the value to be 0. Differential Revision: D18631623 fbshipit-source-id: 30c232a8672de5432ce9608bb2488ecc19138830
1279 lines
47 KiB
C++
1279 lines
47 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/version_set.h"
|
|
#include "db/db_impl/db_impl.h"
|
|
#include "db/log_writer.h"
|
|
#include "logging/logging.h"
|
|
#include "table/mock_table.h"
|
|
#include "test_util/testharness.h"
|
|
#include "test_util/testutil.h"
|
|
#include "util/string_util.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
class GenerateLevelFilesBriefTest : public testing::Test {
|
|
public:
|
|
std::vector<FileMetaData*> files_;
|
|
LevelFilesBrief file_level_;
|
|
Arena arena_;
|
|
|
|
GenerateLevelFilesBriefTest() { }
|
|
|
|
~GenerateLevelFilesBriefTest() override {
|
|
for (size_t i = 0; i < files_.size(); i++) {
|
|
delete files_[i];
|
|
}
|
|
}
|
|
|
|
void Add(const char* smallest, const char* largest,
|
|
SequenceNumber smallest_seq = 100,
|
|
SequenceNumber largest_seq = 100) {
|
|
FileMetaData* f = new FileMetaData(
|
|
files_.size() + 1, 0, 0,
|
|
InternalKey(smallest, smallest_seq, kTypeValue),
|
|
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
|
|
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
|
|
kUnknownOldestAncesterTime);
|
|
files_.push_back(f);
|
|
}
|
|
|
|
int Compare() {
|
|
int diff = 0;
|
|
for (size_t i = 0; i < files_.size(); i++) {
|
|
if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) {
|
|
diff++;
|
|
}
|
|
}
|
|
return diff;
|
|
}
|
|
};
|
|
|
|
TEST_F(GenerateLevelFilesBriefTest, Empty) {
|
|
DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
|
|
ASSERT_EQ(0u, file_level_.num_files);
|
|
ASSERT_EQ(0, Compare());
|
|
}
|
|
|
|
TEST_F(GenerateLevelFilesBriefTest, Single) {
|
|
Add("p", "q");
|
|
DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
|
|
ASSERT_EQ(1u, file_level_.num_files);
|
|
ASSERT_EQ(0, Compare());
|
|
}
|
|
|
|
TEST_F(GenerateLevelFilesBriefTest, Multiple) {
|
|
Add("150", "200");
|
|
Add("200", "250");
|
|
Add("300", "350");
|
|
Add("400", "450");
|
|
DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
|
|
ASSERT_EQ(4u, file_level_.num_files);
|
|
ASSERT_EQ(0, Compare());
|
|
}
|
|
|
|
class CountingLogger : public Logger {
|
|
public:
|
|
CountingLogger() : log_count(0) {}
|
|
using Logger::Logv;
|
|
void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; }
|
|
int log_count;
|
|
};
|
|
|
|
Options GetOptionsWithNumLevels(int num_levels,
|
|
std::shared_ptr<CountingLogger> logger) {
|
|
Options opt;
|
|
opt.num_levels = num_levels;
|
|
opt.info_log = logger;
|
|
return opt;
|
|
}
|
|
|
|
class VersionStorageInfoTest : public testing::Test {
|
|
public:
|
|
const Comparator* ucmp_;
|
|
InternalKeyComparator icmp_;
|
|
std::shared_ptr<CountingLogger> logger_;
|
|
Options options_;
|
|
ImmutableCFOptions ioptions_;
|
|
MutableCFOptions mutable_cf_options_;
|
|
VersionStorageInfo vstorage_;
|
|
|
|
InternalKey GetInternalKey(const char* ukey,
|
|
SequenceNumber smallest_seq = 100) {
|
|
return InternalKey(ukey, smallest_seq, kTypeValue);
|
|
}
|
|
|
|
VersionStorageInfoTest()
|
|
: ucmp_(BytewiseComparator()),
|
|
icmp_(ucmp_),
|
|
logger_(new CountingLogger()),
|
|
options_(GetOptionsWithNumLevels(6, logger_)),
|
|
ioptions_(options_),
|
|
mutable_cf_options_(options_),
|
|
vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {}
|
|
|
|
~VersionStorageInfoTest() override {
|
|
for (int i = 0; i < vstorage_.num_levels(); i++) {
|
|
for (auto* f : vstorage_.LevelFiles(i)) {
|
|
if (--f->refs == 0) {
|
|
delete f;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void Add(int level, uint32_t file_number, const char* smallest,
|
|
const char* largest, uint64_t file_size = 0) {
|
|
assert(level < vstorage_.num_levels());
|
|
FileMetaData* f = new FileMetaData(
|
|
file_number, 0, file_size, GetInternalKey(smallest, 0),
|
|
GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,
|
|
/* marked_for_compact */ false, kInvalidBlobFileNumber,
|
|
kUnknownOldestAncesterTime);
|
|
f->compensated_file_size = file_size;
|
|
vstorage_.AddFile(level, f);
|
|
}
|
|
|
|
void Add(int level, uint32_t file_number, const InternalKey& smallest,
|
|
const InternalKey& largest, uint64_t file_size = 0) {
|
|
assert(level < vstorage_.num_levels());
|
|
FileMetaData* f = new FileMetaData(
|
|
file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
|
|
/* largest_seq */ 0, /* marked_for_compact */ false,
|
|
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
|
|
f->compensated_file_size = file_size;
|
|
vstorage_.AddFile(level, f);
|
|
}
|
|
|
|
std::string GetOverlappingFiles(int level, const InternalKey& begin,
|
|
const InternalKey& end) {
|
|
std::vector<FileMetaData*> inputs;
|
|
vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs);
|
|
|
|
std::string result;
|
|
for (size_t i = 0; i < inputs.size(); ++i) {
|
|
if (i > 0) {
|
|
result += ",";
|
|
}
|
|
AppendNumberTo(&result, inputs[i]->fd.GetNumber());
|
|
}
|
|
return result;
|
|
}
|
|
};
|
|
|
|
TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {
|
|
ioptions_.level_compaction_dynamic_level_bytes = false;
|
|
mutable_cf_options_.max_bytes_for_level_base = 10;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 5;
|
|
Add(4, 100U, "1", "2");
|
|
Add(5, 101U, "1", "2");
|
|
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U);
|
|
|
|
ASSERT_EQ(0, logger_->log_count);
|
|
}
|
|
|
|
TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic) {
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.max_bytes_for_level_base = 1000;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 5;
|
|
Add(5, 1U, "1", "2", 500U);
|
|
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(0, logger_->log_count);
|
|
ASSERT_EQ(vstorage_.base_level(), 5);
|
|
|
|
Add(5, 2U, "3", "4", 550U);
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(0, logger_->log_count);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
|
|
ASSERT_EQ(vstorage_.base_level(), 4);
|
|
|
|
Add(4, 3U, "3", "4", 550U);
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(0, logger_->log_count);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
|
|
ASSERT_EQ(vstorage_.base_level(), 4);
|
|
|
|
Add(3, 4U, "3", "4", 250U);
|
|
Add(3, 5U, "5", "7", 300U);
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(1, logger_->log_count);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U);
|
|
ASSERT_EQ(vstorage_.base_level(), 3);
|
|
|
|
Add(1, 6U, "3", "4", 5U);
|
|
Add(1, 7U, "8", "9", 5U);
|
|
logger_->log_count = 0;
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(1, logger_->log_count);
|
|
ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U);
|
|
ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U);
|
|
ASSERT_EQ(vstorage_.base_level(), 1);
|
|
}
|
|
|
|
TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) {
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.max_bytes_for_level_base = 100;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 2;
|
|
Add(0, 1U, "1", "2", 50U);
|
|
Add(1, 2U, "1", "2", 50U);
|
|
Add(2, 3U, "1", "2", 500U);
|
|
Add(3, 4U, "1", "2", 500U);
|
|
Add(4, 5U, "1", "2", 1700U);
|
|
Add(5, 6U, "1", "2", 500U);
|
|
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U);
|
|
ASSERT_EQ(vstorage_.base_level(), 1);
|
|
ASSERT_EQ(0, logger_->log_count);
|
|
}
|
|
|
|
TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) {
|
|
uint64_t kOneGB = 1000U * 1000U * 1000U;
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
|
|
Add(0, 1U, "1", "2", 50U);
|
|
Add(3, 4U, "1", "2", 32U * kOneGB);
|
|
Add(4, 5U, "1", "2", 500U * kOneGB);
|
|
Add(5, 6U, "1", "2", 3000U * kOneGB);
|
|
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB);
|
|
ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB);
|
|
ASSERT_EQ(vstorage_.base_level(), 2);
|
|
ASSERT_EQ(0, logger_->log_count);
|
|
}
|
|
|
|
TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) {
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.max_bytes_for_level_base = 40000;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 5;
|
|
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
|
|
|
Add(0, 1U, "1", "2", 10000U);
|
|
Add(0, 2U, "1", "2", 10000U);
|
|
Add(0, 3U, "1", "2", 10000U);
|
|
|
|
Add(5, 4U, "1", "2", 1286250U);
|
|
Add(4, 5U, "1", "2", 200000U);
|
|
Add(3, 6U, "1", "2", 40000U);
|
|
Add(2, 7U, "1", "2", 8000U);
|
|
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(0, logger_->log_count);
|
|
ASSERT_EQ(2, vstorage_.base_level());
|
|
// level multiplier should be 3.5
|
|
ASSERT_EQ(vstorage_.level_multiplier(), 5.0);
|
|
// Level size should be around 30,000, 105,000, 367,500
|
|
ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2));
|
|
ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3));
|
|
ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4));
|
|
}
|
|
|
|
TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) {
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.max_bytes_for_level_base = 10000;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 5;
|
|
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
|
|
|
Add(0, 11U, "1", "2", 10000U);
|
|
Add(0, 12U, "1", "2", 10000U);
|
|
Add(0, 13U, "1", "2", 10000U);
|
|
|
|
Add(5, 4U, "1", "2", 1286250U);
|
|
Add(4, 5U, "1", "2", 200000U);
|
|
Add(3, 6U, "1", "2", 40000U);
|
|
Add(2, 7U, "1", "2", 8000U);
|
|
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(0, logger_->log_count);
|
|
ASSERT_EQ(2, vstorage_.base_level());
|
|
// level multiplier should be 3.5
|
|
ASSERT_LT(vstorage_.level_multiplier(), 3.6);
|
|
ASSERT_GT(vstorage_.level_multiplier(), 3.4);
|
|
// Level size should be around 30,000, 105,000, 367,500
|
|
ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
|
|
ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
|
|
ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
|
|
ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
|
|
ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
|
|
}
|
|
|
|
TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) {
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.max_bytes_for_level_base = 10000;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 5;
|
|
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
|
|
|
Add(0, 11U, "1", "2", 5000U);
|
|
Add(0, 12U, "1", "2", 5000U);
|
|
Add(0, 13U, "1", "2", 5000U);
|
|
Add(0, 14U, "1", "2", 5000U);
|
|
Add(0, 15U, "1", "2", 5000U);
|
|
Add(0, 16U, "1", "2", 5000U);
|
|
|
|
Add(5, 4U, "1", "2", 1286250U);
|
|
Add(4, 5U, "1", "2", 200000U);
|
|
Add(3, 6U, "1", "2", 40000U);
|
|
Add(2, 7U, "1", "2", 8000U);
|
|
|
|
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
ASSERT_EQ(0, logger_->log_count);
|
|
ASSERT_EQ(2, vstorage_.base_level());
|
|
// level multiplier should be 3.5
|
|
ASSERT_LT(vstorage_.level_multiplier(), 3.6);
|
|
ASSERT_GT(vstorage_.level_multiplier(), 3.4);
|
|
// Level size should be around 30,000, 105,000, 367,500
|
|
ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
|
|
ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
|
|
ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
|
|
ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
|
|
ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
|
|
}
|
|
|
|
TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {
|
|
// Test whether the overlaps are detected as expected
|
|
Add(1, 1U, "4", "7", 1U); // Perfect overlap with last level
|
|
Add(2, 2U, "3", "5", 1U); // Partial overlap with last level
|
|
Add(2, 3U, "6", "8", 1U); // Partial overlap with last level
|
|
Add(3, 4U, "1", "9", 1U); // Contains range of last level
|
|
Add(4, 5U, "4", "5", 1U); // Inside range of last level
|
|
Add(4, 5U, "6", "7", 1U); // Inside range of last level
|
|
Add(5, 6U, "4", "7", 10U);
|
|
ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize());
|
|
}
|
|
|
|
TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) {
|
|
Add(0, 1U, "9", "9", 1U); // Level 0 is not ordered
|
|
Add(0, 1U, "5", "6", 1U); // Ignored because of [5,6] in l1
|
|
Add(1, 1U, "1", "2", 1U); // Ignored because of [2,3] in l2
|
|
Add(1, 2U, "3", "4", 1U); // Ignored because of [2,3] in l2
|
|
Add(1, 3U, "5", "6", 1U);
|
|
Add(2, 4U, "2", "3", 1U);
|
|
Add(3, 5U, "7", "8", 1U);
|
|
ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize());
|
|
}
|
|
|
|
TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {
|
|
// Two files that overlap at the range deletion tombstone sentinel.
|
|
Add(1, 1U, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1);
|
|
Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1);
|
|
// Two files that overlap at the same user key.
|
|
Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1);
|
|
Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1);
|
|
// Two files that do not overlap.
|
|
Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1);
|
|
Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1);
|
|
vstorage_.UpdateNumNonEmptyLevels();
|
|
vstorage_.GenerateLevelFilesBrief();
|
|
|
|
ASSERT_EQ("1,2", GetOverlappingFiles(
|
|
1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue}));
|
|
ASSERT_EQ("1", GetOverlappingFiles(
|
|
1, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}));
|
|
ASSERT_EQ("2", GetOverlappingFiles(
|
|
1, {"b", kMaxSequenceNumber, kTypeValue}, {"c", 0, kTypeValue}));
|
|
ASSERT_EQ("3,4", GetOverlappingFiles(
|
|
1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue}));
|
|
ASSERT_EQ("3", GetOverlappingFiles(
|
|
1, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeRangeDeletion}));
|
|
ASSERT_EQ("3,4", GetOverlappingFiles(
|
|
1, {"e", kMaxSequenceNumber, kTypeValue}, {"f", 0, kTypeValue}));
|
|
ASSERT_EQ("3,4", GetOverlappingFiles(
|
|
1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}));
|
|
ASSERT_EQ("5", GetOverlappingFiles(
|
|
1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}));
|
|
ASSERT_EQ("6", GetOverlappingFiles(
|
|
1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));
|
|
}
|
|
|
|
|
|
class FindLevelFileTest : public testing::Test {
|
|
public:
|
|
LevelFilesBrief file_level_;
|
|
bool disjoint_sorted_files_;
|
|
Arena arena_;
|
|
|
|
FindLevelFileTest() : disjoint_sorted_files_(true) { }
|
|
|
|
~FindLevelFileTest() override {}
|
|
|
|
void LevelFileInit(size_t num = 0) {
|
|
char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange));
|
|
file_level_.files = new (mem)FdWithKeyRange[num];
|
|
file_level_.num_files = 0;
|
|
}
|
|
|
|
void Add(const char* smallest, const char* largest,
|
|
SequenceNumber smallest_seq = 100,
|
|
SequenceNumber largest_seq = 100) {
|
|
InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue);
|
|
InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue);
|
|
|
|
Slice smallest_slice = smallest_key.Encode();
|
|
Slice largest_slice = largest_key.Encode();
|
|
|
|
char* mem = arena_.AllocateAligned(
|
|
smallest_slice.size() + largest_slice.size());
|
|
memcpy(mem, smallest_slice.data(), smallest_slice.size());
|
|
memcpy(mem + smallest_slice.size(), largest_slice.data(),
|
|
largest_slice.size());
|
|
|
|
// add to file_level_
|
|
size_t num = file_level_.num_files;
|
|
auto& file = file_level_.files[num];
|
|
file.fd = FileDescriptor(num + 1, 0, 0);
|
|
file.smallest_key = Slice(mem, smallest_slice.size());
|
|
file.largest_key = Slice(mem + smallest_slice.size(),
|
|
largest_slice.size());
|
|
file_level_.num_files++;
|
|
}
|
|
|
|
int Find(const char* key) {
|
|
InternalKey target(key, 100, kTypeValue);
|
|
InternalKeyComparator cmp(BytewiseComparator());
|
|
return FindFile(cmp, file_level_, target.Encode());
|
|
}
|
|
|
|
bool Overlaps(const char* smallest, const char* largest) {
|
|
InternalKeyComparator cmp(BytewiseComparator());
|
|
Slice s(smallest != nullptr ? smallest : "");
|
|
Slice l(largest != nullptr ? largest : "");
|
|
return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_,
|
|
(smallest != nullptr ? &s : nullptr),
|
|
(largest != nullptr ? &l : nullptr));
|
|
}
|
|
};
|
|
|
|
TEST_F(FindLevelFileTest, LevelEmpty) {
|
|
LevelFileInit(0);
|
|
|
|
ASSERT_EQ(0, Find("foo"));
|
|
ASSERT_TRUE(! Overlaps("a", "z"));
|
|
ASSERT_TRUE(! Overlaps(nullptr, "z"));
|
|
ASSERT_TRUE(! Overlaps("a", nullptr));
|
|
ASSERT_TRUE(! Overlaps(nullptr, nullptr));
|
|
}
|
|
|
|
TEST_F(FindLevelFileTest, LevelSingle) {
|
|
LevelFileInit(1);
|
|
|
|
Add("p", "q");
|
|
ASSERT_EQ(0, Find("a"));
|
|
ASSERT_EQ(0, Find("p"));
|
|
ASSERT_EQ(0, Find("p1"));
|
|
ASSERT_EQ(0, Find("q"));
|
|
ASSERT_EQ(1, Find("q1"));
|
|
ASSERT_EQ(1, Find("z"));
|
|
|
|
ASSERT_TRUE(! Overlaps("a", "b"));
|
|
ASSERT_TRUE(! Overlaps("z1", "z2"));
|
|
ASSERT_TRUE(Overlaps("a", "p"));
|
|
ASSERT_TRUE(Overlaps("a", "q"));
|
|
ASSERT_TRUE(Overlaps("a", "z"));
|
|
ASSERT_TRUE(Overlaps("p", "p1"));
|
|
ASSERT_TRUE(Overlaps("p", "q"));
|
|
ASSERT_TRUE(Overlaps("p", "z"));
|
|
ASSERT_TRUE(Overlaps("p1", "p2"));
|
|
ASSERT_TRUE(Overlaps("p1", "z"));
|
|
ASSERT_TRUE(Overlaps("q", "q"));
|
|
ASSERT_TRUE(Overlaps("q", "q1"));
|
|
|
|
ASSERT_TRUE(! Overlaps(nullptr, "j"));
|
|
ASSERT_TRUE(! Overlaps("r", nullptr));
|
|
ASSERT_TRUE(Overlaps(nullptr, "p"));
|
|
ASSERT_TRUE(Overlaps(nullptr, "p1"));
|
|
ASSERT_TRUE(Overlaps("q", nullptr));
|
|
ASSERT_TRUE(Overlaps(nullptr, nullptr));
|
|
}
|
|
|
|
TEST_F(FindLevelFileTest, LevelMultiple) {
|
|
LevelFileInit(4);
|
|
|
|
Add("150", "200");
|
|
Add("200", "250");
|
|
Add("300", "350");
|
|
Add("400", "450");
|
|
ASSERT_EQ(0, Find("100"));
|
|
ASSERT_EQ(0, Find("150"));
|
|
ASSERT_EQ(0, Find("151"));
|
|
ASSERT_EQ(0, Find("199"));
|
|
ASSERT_EQ(0, Find("200"));
|
|
ASSERT_EQ(1, Find("201"));
|
|
ASSERT_EQ(1, Find("249"));
|
|
ASSERT_EQ(1, Find("250"));
|
|
ASSERT_EQ(2, Find("251"));
|
|
ASSERT_EQ(2, Find("299"));
|
|
ASSERT_EQ(2, Find("300"));
|
|
ASSERT_EQ(2, Find("349"));
|
|
ASSERT_EQ(2, Find("350"));
|
|
ASSERT_EQ(3, Find("351"));
|
|
ASSERT_EQ(3, Find("400"));
|
|
ASSERT_EQ(3, Find("450"));
|
|
ASSERT_EQ(4, Find("451"));
|
|
|
|
ASSERT_TRUE(! Overlaps("100", "149"));
|
|
ASSERT_TRUE(! Overlaps("251", "299"));
|
|
ASSERT_TRUE(! Overlaps("451", "500"));
|
|
ASSERT_TRUE(! Overlaps("351", "399"));
|
|
|
|
ASSERT_TRUE(Overlaps("100", "150"));
|
|
ASSERT_TRUE(Overlaps("100", "200"));
|
|
ASSERT_TRUE(Overlaps("100", "300"));
|
|
ASSERT_TRUE(Overlaps("100", "400"));
|
|
ASSERT_TRUE(Overlaps("100", "500"));
|
|
ASSERT_TRUE(Overlaps("375", "400"));
|
|
ASSERT_TRUE(Overlaps("450", "450"));
|
|
ASSERT_TRUE(Overlaps("450", "500"));
|
|
}
|
|
|
|
TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) {
|
|
LevelFileInit(4);
|
|
|
|
Add("150", "200");
|
|
Add("200", "250");
|
|
Add("300", "350");
|
|
Add("400", "450");
|
|
ASSERT_TRUE(! Overlaps(nullptr, "149"));
|
|
ASSERT_TRUE(! Overlaps("451", nullptr));
|
|
ASSERT_TRUE(Overlaps(nullptr, nullptr));
|
|
ASSERT_TRUE(Overlaps(nullptr, "150"));
|
|
ASSERT_TRUE(Overlaps(nullptr, "199"));
|
|
ASSERT_TRUE(Overlaps(nullptr, "200"));
|
|
ASSERT_TRUE(Overlaps(nullptr, "201"));
|
|
ASSERT_TRUE(Overlaps(nullptr, "400"));
|
|
ASSERT_TRUE(Overlaps(nullptr, "800"));
|
|
ASSERT_TRUE(Overlaps("100", nullptr));
|
|
ASSERT_TRUE(Overlaps("200", nullptr));
|
|
ASSERT_TRUE(Overlaps("449", nullptr));
|
|
ASSERT_TRUE(Overlaps("450", nullptr));
|
|
}
|
|
|
|
TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) {
|
|
LevelFileInit(1);
|
|
|
|
Add("200", "200", 5000, 3000);
|
|
ASSERT_TRUE(! Overlaps("199", "199"));
|
|
ASSERT_TRUE(! Overlaps("201", "300"));
|
|
ASSERT_TRUE(Overlaps("200", "200"));
|
|
ASSERT_TRUE(Overlaps("190", "200"));
|
|
ASSERT_TRUE(Overlaps("200", "210"));
|
|
}
|
|
|
|
TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
|
|
LevelFileInit(2);
|
|
|
|
Add("150", "600");
|
|
Add("400", "500");
|
|
disjoint_sorted_files_ = false;
|
|
ASSERT_TRUE(! Overlaps("100", "149"));
|
|
ASSERT_TRUE(! Overlaps("601", "700"));
|
|
ASSERT_TRUE(Overlaps("100", "150"));
|
|
ASSERT_TRUE(Overlaps("100", "200"));
|
|
ASSERT_TRUE(Overlaps("100", "300"));
|
|
ASSERT_TRUE(Overlaps("100", "400"));
|
|
ASSERT_TRUE(Overlaps("100", "500"));
|
|
ASSERT_TRUE(Overlaps("375", "400"));
|
|
ASSERT_TRUE(Overlaps("450", "450"));
|
|
ASSERT_TRUE(Overlaps("450", "500"));
|
|
ASSERT_TRUE(Overlaps("450", "700"));
|
|
ASSERT_TRUE(Overlaps("600", "700"));
|
|
}
|
|
|
|
class VersionSetTestBase {
|
|
public:
|
|
const static std::string kColumnFamilyName1;
|
|
const static std::string kColumnFamilyName2;
|
|
const static std::string kColumnFamilyName3;
|
|
int num_initial_edits_;
|
|
|
|
VersionSetTestBase()
|
|
: env_(Env::Default()),
|
|
dbname_(test::PerThreadDBPath("version_set_test")),
|
|
db_options_(),
|
|
mutable_cf_options_(cf_options_),
|
|
table_cache_(NewLRUCache(50000, 16)),
|
|
write_buffer_manager_(db_options_.db_write_buffer_size),
|
|
versions_(new VersionSet(dbname_, &db_options_, env_options_,
|
|
table_cache_.get(), &write_buffer_manager_,
|
|
&write_controller_,
|
|
/*block_cache_tracer=*/nullptr)),
|
|
reactive_versions_(std::make_shared<ReactiveVersionSet>(
|
|
dbname_, &db_options_, env_options_, table_cache_.get(),
|
|
&write_buffer_manager_, &write_controller_)),
|
|
shutting_down_(false),
|
|
mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {
|
|
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
|
|
db_options_.db_paths.emplace_back(dbname_,
|
|
std::numeric_limits<uint64_t>::max());
|
|
}
|
|
|
|
void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
|
|
SequenceNumber* last_seqno,
|
|
std::unique_ptr<log::Writer>* log_writer) {
|
|
assert(column_families != nullptr);
|
|
assert(last_seqno != nullptr);
|
|
assert(log_writer != nullptr);
|
|
VersionEdit new_db;
|
|
if (db_options_.write_dbid_to_manifest) {
|
|
DBImpl* impl = new DBImpl(DBOptions(), dbname_);
|
|
std::string db_id;
|
|
impl->GetDbIdentityFromIdentityFile(&db_id);
|
|
new_db.SetDBId(db_id);
|
|
}
|
|
new_db.SetLogNumber(0);
|
|
new_db.SetNextFile(2);
|
|
new_db.SetLastSequence(0);
|
|
|
|
const std::vector<std::string> cf_names = {
|
|
kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
|
|
kColumnFamilyName3};
|
|
const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
|
|
autovector<VersionEdit> new_cfs;
|
|
uint64_t last_seq = 1;
|
|
uint32_t cf_id = 1;
|
|
for (int i = 1; i != kInitialNumOfCfs; ++i) {
|
|
VersionEdit new_cf;
|
|
new_cf.AddColumnFamily(cf_names[i]);
|
|
new_cf.SetColumnFamily(cf_id++);
|
|
new_cf.SetLogNumber(0);
|
|
new_cf.SetNextFile(2);
|
|
new_cf.SetLastSequence(last_seq++);
|
|
new_cfs.emplace_back(new_cf);
|
|
}
|
|
*last_seqno = last_seq;
|
|
num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);
|
|
const std::string manifest = DescriptorFileName(dbname_, 1);
|
|
std::unique_ptr<WritableFile> file;
|
|
Status s = env_->NewWritableFile(
|
|
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
|
|
ASSERT_OK(s);
|
|
std::unique_ptr<WritableFileWriter> file_writer(
|
|
new WritableFileWriter(std::move(file), manifest, env_options_));
|
|
{
|
|
log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
|
|
std::string record;
|
|
new_db.EncodeTo(&record);
|
|
s = (*log_writer)->AddRecord(record);
|
|
for (const auto& e : new_cfs) {
|
|
record.clear();
|
|
e.EncodeTo(&record);
|
|
s = (*log_writer)->AddRecord(record);
|
|
ASSERT_OK(s);
|
|
}
|
|
}
|
|
ASSERT_OK(s);
|
|
|
|
cf_options_.table_factory = mock_table_factory_;
|
|
for (const auto& cf_name : cf_names) {
|
|
column_families->emplace_back(cf_name, cf_options_);
|
|
}
|
|
}
|
|
|
|
// Create DB with 3 column families.
|
|
void NewDB() {
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
SequenceNumber last_seqno;
|
|
std::unique_ptr<log::Writer> log_writer;
|
|
SetIdentityFile(env_, dbname_);
|
|
PrepareManifest(&column_families, &last_seqno, &log_writer);
|
|
log_writer.reset();
|
|
// Make "CURRENT" file point to the new manifest file.
|
|
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
|
|
ASSERT_OK(s);
|
|
|
|
EXPECT_OK(versions_->Recover(column_families, false));
|
|
EXPECT_EQ(column_families.size(),
|
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
}
|
|
|
|
Env* env_;
|
|
const std::string dbname_;
|
|
EnvOptions env_options_;
|
|
ImmutableDBOptions db_options_;
|
|
ColumnFamilyOptions cf_options_;
|
|
MutableCFOptions mutable_cf_options_;
|
|
std::shared_ptr<Cache> table_cache_;
|
|
WriteController write_controller_;
|
|
WriteBufferManager write_buffer_manager_;
|
|
std::shared_ptr<VersionSet> versions_;
|
|
std::shared_ptr<ReactiveVersionSet> reactive_versions_;
|
|
InstrumentedMutex mutex_;
|
|
std::atomic<bool> shutting_down_;
|
|
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
|
|
};
|
|
|
|
const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
|
|
const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";
|
|
const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
|
|
|
|
class VersionSetTest : public VersionSetTestBase, public testing::Test {
|
|
public:
|
|
VersionSetTest() : VersionSetTestBase() {}
|
|
};
|
|
|
|
TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
|
|
NewDB();
|
|
const int kGroupSize = 5;
|
|
autovector<VersionEdit> edits;
|
|
for (int i = 0; i != kGroupSize; ++i) {
|
|
edits.emplace_back(VersionEdit());
|
|
}
|
|
autovector<ColumnFamilyData*> cfds;
|
|
autovector<const MutableCFOptions*> all_mutable_cf_options;
|
|
autovector<autovector<VersionEdit*>> edit_lists;
|
|
for (int i = 0; i != kGroupSize; ++i) {
|
|
cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault());
|
|
all_mutable_cf_options.emplace_back(&mutable_cf_options_);
|
|
autovector<VersionEdit*> edit_list;
|
|
edit_list.emplace_back(&edits[i]);
|
|
edit_lists.emplace_back(edit_list);
|
|
}
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
int count = 0;
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) {
|
|
uint32_t* cf_id = reinterpret_cast<uint32_t*>(arg);
|
|
EXPECT_EQ(0u, *cf_id);
|
|
++count;
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
mutex_.Lock();
|
|
Status s =
|
|
versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_);
|
|
mutex_.Unlock();
|
|
EXPECT_OK(s);
|
|
EXPECT_EQ(kGroupSize - 1, count);
|
|
}
|
|
|
|
class VersionSetAtomicGroupTest : public VersionSetTestBase,
|
|
public testing::Test {
|
|
public:
|
|
VersionSetAtomicGroupTest() : VersionSetTestBase() {}
|
|
|
|
void SetUp() override {
|
|
PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
|
|
SetupTestSyncPoints();
|
|
}
|
|
|
|
void SetupValidAtomicGroup(int atomic_group_size) {
|
|
edits_.resize(atomic_group_size);
|
|
int remaining = atomic_group_size;
|
|
for (size_t i = 0; i != edits_.size(); ++i) {
|
|
edits_[i].SetLogNumber(0);
|
|
edits_[i].SetNextFile(2);
|
|
edits_[i].MarkAtomicGroup(--remaining);
|
|
edits_[i].SetLastSequence(last_seqno_++);
|
|
}
|
|
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
|
|
}
|
|
|
|
void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
|
|
edits_.resize(atomic_group_size);
|
|
int remaining = atomic_group_size;
|
|
for (size_t i = 0; i != edits_.size(); ++i) {
|
|
edits_[i].SetLogNumber(0);
|
|
edits_[i].SetNextFile(2);
|
|
edits_[i].MarkAtomicGroup(--remaining);
|
|
edits_[i].SetLastSequence(last_seqno_++);
|
|
}
|
|
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
|
|
}
|
|
|
|
void SetupCorruptedAtomicGroup(int atomic_group_size) {
|
|
edits_.resize(atomic_group_size);
|
|
int remaining = atomic_group_size;
|
|
for (size_t i = 0; i != edits_.size(); ++i) {
|
|
edits_[i].SetLogNumber(0);
|
|
edits_[i].SetNextFile(2);
|
|
if (i != ((size_t)atomic_group_size / 2)) {
|
|
edits_[i].MarkAtomicGroup(--remaining);
|
|
}
|
|
edits_[i].SetLastSequence(last_seqno_++);
|
|
}
|
|
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
|
|
}
|
|
|
|
void SetupIncorrectAtomicGroup(int atomic_group_size) {
|
|
edits_.resize(atomic_group_size);
|
|
int remaining = atomic_group_size;
|
|
for (size_t i = 0; i != edits_.size(); ++i) {
|
|
edits_[i].SetLogNumber(0);
|
|
edits_[i].SetNextFile(2);
|
|
if (i != 1) {
|
|
edits_[i].MarkAtomicGroup(--remaining);
|
|
} else {
|
|
edits_[i].MarkAtomicGroup(remaining--);
|
|
}
|
|
edits_[i].SetLastSequence(last_seqno_++);
|
|
}
|
|
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
|
|
}
|
|
|
|
void SetupTestSyncPoints() {
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) {
|
|
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
|
|
EXPECT_EQ(edits_.front().DebugString(),
|
|
e->DebugString()); // compare based on value
|
|
first_in_atomic_group_ = true;
|
|
});
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) {
|
|
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
|
|
EXPECT_EQ(edits_.back().DebugString(),
|
|
e->DebugString()); // compare based on value
|
|
EXPECT_TRUE(first_in_atomic_group_);
|
|
last_in_atomic_group_ = true;
|
|
});
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) {
|
|
num_recovered_edits_ = *reinterpret_cast<int*>(arg);
|
|
});
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"ReactiveVersionSet::ReadAndApply:AppliedEdits",
|
|
[&](void* arg) { num_applied_edits_ = *reinterpret_cast<int*>(arg); });
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"AtomicGroupReadBuffer::AddEdit:AtomicGroup",
|
|
[&](void* /* arg */) { ++num_edits_in_atomic_group_; });
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits",
|
|
[&](void* arg) {
|
|
corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg);
|
|
});
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize",
|
|
[&](void* arg) {
|
|
edit_with_incorrect_group_size_ =
|
|
*reinterpret_cast<VersionEdit*>(arg);
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
}
|
|
|
|
void AddNewEditsToLog(int num_edits) {
|
|
for (int i = 0; i < num_edits; i++) {
|
|
std::string record;
|
|
edits_[i].EncodeTo(&record);
|
|
ASSERT_OK(log_writer_->AddRecord(record));
|
|
}
|
|
}
|
|
|
|
void TearDown() override {
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
log_writer_.reset();
|
|
}
|
|
|
|
protected:
|
|
std::vector<ColumnFamilyDescriptor> column_families_;
|
|
SequenceNumber last_seqno_;
|
|
std::vector<VersionEdit> edits_;
|
|
bool first_in_atomic_group_ = false;
|
|
bool last_in_atomic_group_ = false;
|
|
int num_edits_in_atomic_group_ = 0;
|
|
int num_recovered_edits_ = 0;
|
|
int num_applied_edits_ = 0;
|
|
VersionEdit corrupted_edit_;
|
|
VersionEdit edit_with_incorrect_group_size_;
|
|
std::unique_ptr<log::Writer> log_writer_;
|
|
};
|
|
|
|
TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) {
|
|
const int kAtomicGroupSize = 3;
|
|
SetupValidAtomicGroup(kAtomicGroupSize);
|
|
AddNewEditsToLog(kAtomicGroupSize);
|
|
EXPECT_OK(versions_->Recover(column_families_, false));
|
|
EXPECT_EQ(column_families_.size(),
|
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
EXPECT_TRUE(first_in_atomic_group_);
|
|
EXPECT_TRUE(last_in_atomic_group_);
|
|
EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
|
|
EXPECT_EQ(0, num_applied_edits_);
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleValidAtomicGroupWithReactiveVersionSetRecover) {
|
|
const int kAtomicGroupSize = 3;
|
|
SetupValidAtomicGroup(kAtomicGroupSize);
|
|
AddNewEditsToLog(kAtomicGroupSize);
|
|
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
|
|
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
|
|
std::unique_ptr<Status> manifest_reader_status;
|
|
EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
|
|
&manifest_reporter,
|
|
&manifest_reader_status));
|
|
EXPECT_EQ(column_families_.size(),
|
|
reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
EXPECT_TRUE(first_in_atomic_group_);
|
|
EXPECT_TRUE(last_in_atomic_group_);
|
|
// The recover should clean up the replay buffer.
|
|
EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
|
|
EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
|
|
EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
|
|
EXPECT_EQ(0, num_applied_edits_);
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) {
|
|
const int kAtomicGroupSize = 3;
|
|
SetupValidAtomicGroup(kAtomicGroupSize);
|
|
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
|
|
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
|
|
std::unique_ptr<Status> manifest_reader_status;
|
|
EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
|
|
&manifest_reporter,
|
|
&manifest_reader_status));
|
|
AddNewEditsToLog(kAtomicGroupSize);
|
|
InstrumentedMutex mu;
|
|
std::unordered_set<ColumnFamilyData*> cfds_changed;
|
|
mu.Lock();
|
|
EXPECT_OK(
|
|
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
|
|
mu.Unlock();
|
|
EXPECT_TRUE(first_in_atomic_group_);
|
|
EXPECT_TRUE(last_in_atomic_group_);
|
|
// The recover should clean up the replay buffer.
|
|
EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
|
|
EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
|
|
EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
|
|
EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) {
|
|
const int kAtomicGroupSize = 4;
|
|
const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
|
|
SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
|
|
AddNewEditsToLog(kNumberOfPersistedVersionEdits);
|
|
EXPECT_OK(versions_->Recover(column_families_, false));
|
|
EXPECT_EQ(column_families_.size(),
|
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
EXPECT_TRUE(first_in_atomic_group_);
|
|
EXPECT_FALSE(last_in_atomic_group_);
|
|
EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
|
|
EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
|
|
EXPECT_EQ(0, num_applied_edits_);
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) {
|
|
const int kAtomicGroupSize = 4;
|
|
const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
|
|
SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
|
|
AddNewEditsToLog(kNumberOfPersistedVersionEdits);
|
|
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
|
|
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
|
|
std::unique_ptr<Status> manifest_reader_status;
|
|
EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
|
|
&manifest_reporter,
|
|
&manifest_reader_status));
|
|
EXPECT_EQ(column_families_.size(),
|
|
reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
EXPECT_TRUE(first_in_atomic_group_);
|
|
EXPECT_FALSE(last_in_atomic_group_);
|
|
EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
|
|
// Reactive version set should store the edits in the replay buffer.
|
|
EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
|
|
kNumberOfPersistedVersionEdits);
|
|
EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
|
|
// Write the last record. The reactive version set should now apply all
|
|
// edits.
|
|
std::string last_record;
|
|
edits_[kAtomicGroupSize - 1].EncodeTo(&last_record);
|
|
EXPECT_OK(log_writer_->AddRecord(last_record));
|
|
InstrumentedMutex mu;
|
|
std::unordered_set<ColumnFamilyData*> cfds_changed;
|
|
mu.Lock();
|
|
EXPECT_OK(
|
|
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
|
|
mu.Unlock();
|
|
// Reactive version set should be empty now.
|
|
EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
|
|
EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
|
|
EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
|
|
EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) {
|
|
const int kAtomicGroupSize = 4;
|
|
const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
|
|
SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
|
|
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
|
|
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
|
|
std::unique_ptr<Status> manifest_reader_status;
|
|
// No edits in an atomic group.
|
|
EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
|
|
&manifest_reporter,
|
|
&manifest_reader_status));
|
|
EXPECT_EQ(column_families_.size(),
|
|
reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
// Write a few edits in an atomic group.
|
|
AddNewEditsToLog(kNumberOfPersistedVersionEdits);
|
|
InstrumentedMutex mu;
|
|
std::unordered_set<ColumnFamilyData*> cfds_changed;
|
|
mu.Lock();
|
|
EXPECT_OK(
|
|
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
|
|
mu.Unlock();
|
|
EXPECT_TRUE(first_in_atomic_group_);
|
|
EXPECT_FALSE(last_in_atomic_group_);
|
|
EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
|
|
// Reactive version set should store the edits in the replay buffer.
|
|
EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
|
|
kNumberOfPersistedVersionEdits);
|
|
EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
|
|
EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
|
|
EXPECT_EQ(0, num_applied_edits_);
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleCorruptedAtomicGroupWithVersionSetRecover) {
|
|
const int kAtomicGroupSize = 4;
|
|
SetupCorruptedAtomicGroup(kAtomicGroupSize);
|
|
AddNewEditsToLog(kAtomicGroupSize);
|
|
EXPECT_NOK(versions_->Recover(column_families_, false));
|
|
EXPECT_EQ(column_families_.size(),
|
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
|
|
corrupted_edit_.DebugString());
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) {
|
|
const int kAtomicGroupSize = 4;
|
|
SetupCorruptedAtomicGroup(kAtomicGroupSize);
|
|
AddNewEditsToLog(kAtomicGroupSize);
|
|
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
|
|
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
|
|
std::unique_ptr<Status> manifest_reader_status;
|
|
EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
|
|
&manifest_reporter,
|
|
&manifest_reader_status));
|
|
EXPECT_EQ(column_families_.size(),
|
|
reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
|
|
corrupted_edit_.DebugString());
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) {
|
|
const int kAtomicGroupSize = 4;
|
|
SetupCorruptedAtomicGroup(kAtomicGroupSize);
|
|
InstrumentedMutex mu;
|
|
std::unordered_set<ColumnFamilyData*> cfds_changed;
|
|
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
|
|
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
|
|
std::unique_ptr<Status> manifest_reader_status;
|
|
EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
|
|
&manifest_reporter,
|
|
&manifest_reader_status));
|
|
// Write the corrupted edits.
|
|
AddNewEditsToLog(kAtomicGroupSize);
|
|
mu.Lock();
|
|
EXPECT_OK(
|
|
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
|
|
mu.Unlock();
|
|
EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
|
|
corrupted_edit_.DebugString());
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleIncorrectAtomicGroupSizeWithVersionSetRecover) {
|
|
const int kAtomicGroupSize = 4;
|
|
SetupIncorrectAtomicGroup(kAtomicGroupSize);
|
|
AddNewEditsToLog(kAtomicGroupSize);
|
|
EXPECT_NOK(versions_->Recover(column_families_, false));
|
|
EXPECT_EQ(column_families_.size(),
|
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
EXPECT_EQ(edits_[1].DebugString(),
|
|
edit_with_incorrect_group_size_.DebugString());
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) {
|
|
const int kAtomicGroupSize = 4;
|
|
SetupIncorrectAtomicGroup(kAtomicGroupSize);
|
|
AddNewEditsToLog(kAtomicGroupSize);
|
|
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
|
|
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
|
|
std::unique_ptr<Status> manifest_reader_status;
|
|
EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
|
|
&manifest_reporter,
|
|
&manifest_reader_status));
|
|
EXPECT_EQ(column_families_.size(),
|
|
reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
EXPECT_EQ(edits_[1].DebugString(),
|
|
edit_with_incorrect_group_size_.DebugString());
|
|
}
|
|
|
|
TEST_F(VersionSetAtomicGroupTest,
|
|
HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) {
|
|
const int kAtomicGroupSize = 4;
|
|
SetupIncorrectAtomicGroup(kAtomicGroupSize);
|
|
InstrumentedMutex mu;
|
|
std::unordered_set<ColumnFamilyData*> cfds_changed;
|
|
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
|
|
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
|
|
std::unique_ptr<Status> manifest_reader_status;
|
|
EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
|
|
&manifest_reporter,
|
|
&manifest_reader_status));
|
|
AddNewEditsToLog(kAtomicGroupSize);
|
|
mu.Lock();
|
|
EXPECT_OK(
|
|
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
|
|
mu.Unlock();
|
|
EXPECT_EQ(edits_[1].DebugString(),
|
|
edit_with_incorrect_group_size_.DebugString());
|
|
}
|
|
|
|
class VersionSetTestDropOneCF : public VersionSetTestBase,
|
|
public testing::TestWithParam<std::string> {
|
|
public:
|
|
VersionSetTestDropOneCF() : VersionSetTestBase() {}
|
|
};
|
|
|
|
// This test simulates the following execution sequence
|
|
// Time thread1 bg_flush_thr
|
|
// | Prepare version edits (e1,e2,e3) for atomic
|
|
// | flush cf1, cf2, cf3
|
|
// | Enqueue e to drop cfi
|
|
// | to manifest_writers_
|
|
// | Enqueue (e1,e2,e3) to manifest_writers_
|
|
// |
|
|
// | Apply e,
|
|
// | cfi.IsDropped() is true
|
|
// | Apply (e1,e2,e3),
|
|
// | since cfi.IsDropped() == true, we need to
|
|
// | drop ei and write the rest to MANIFEST.
|
|
// V
|
|
//
|
|
// Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and
|
|
// last column family in an atomic group.
|
|
TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
SequenceNumber last_seqno;
|
|
std::unique_ptr<log::Writer> log_writer;
|
|
PrepareManifest(&column_families, &last_seqno, &log_writer);
|
|
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
|
|
ASSERT_OK(s);
|
|
|
|
EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
|
|
EXPECT_EQ(column_families.size(),
|
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
|
|
|
const int kAtomicGroupSize = 3;
|
|
const std::vector<std::string> non_default_cf_names = {
|
|
kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};
|
|
|
|
// Drop one column family
|
|
VersionEdit drop_cf_edit;
|
|
drop_cf_edit.DropColumnFamily();
|
|
const std::string cf_to_drop_name(GetParam());
|
|
auto cfd_to_drop =
|
|
versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
|
|
ASSERT_NE(nullptr, cfd_to_drop);
|
|
// Increase its refcount because cfd_to_drop is used later, and we need to
|
|
// prevent it from being deleted.
|
|
cfd_to_drop->Ref();
|
|
drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
|
|
mutex_.Lock();
|
|
s = versions_->LogAndApply(cfd_to_drop,
|
|
*cfd_to_drop->GetLatestMutableCFOptions(),
|
|
&drop_cf_edit, &mutex_);
|
|
mutex_.Unlock();
|
|
ASSERT_OK(s);
|
|
|
|
std::vector<VersionEdit> edits(kAtomicGroupSize);
|
|
uint32_t remaining = kAtomicGroupSize;
|
|
size_t i = 0;
|
|
autovector<ColumnFamilyData*> cfds;
|
|
autovector<const MutableCFOptions*> mutable_cf_options_list;
|
|
autovector<autovector<VersionEdit*>> edit_lists;
|
|
for (const auto& cf_name : non_default_cf_names) {
|
|
auto cfd = (cf_name != cf_to_drop_name)
|
|
? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)
|
|
: cfd_to_drop;
|
|
ASSERT_NE(nullptr, cfd);
|
|
cfds.push_back(cfd);
|
|
mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
|
|
edits[i].SetColumnFamily(cfd->GetID());
|
|
edits[i].SetLogNumber(0);
|
|
edits[i].SetNextFile(2);
|
|
edits[i].MarkAtomicGroup(--remaining);
|
|
edits[i].SetLastSequence(last_seqno++);
|
|
autovector<VersionEdit*> tmp_edits;
|
|
tmp_edits.push_back(&edits[i]);
|
|
edit_lists.emplace_back(tmp_edits);
|
|
++i;
|
|
}
|
|
int called = 0;
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {
|
|
std::vector<VersionEdit*>* tmp_edits =
|
|
reinterpret_cast<std::vector<VersionEdit*>*>(arg);
|
|
EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());
|
|
for (const auto e : *tmp_edits) {
|
|
bool found = false;
|
|
for (const auto& e2 : edits) {
|
|
if (&e2 == e) {
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
ASSERT_TRUE(found);
|
|
}
|
|
++called;
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
mutex_.Lock();
|
|
s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,
|
|
&mutex_);
|
|
mutex_.Unlock();
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(1, called);
|
|
if (cfd_to_drop->Unref()) {
|
|
delete cfd_to_drop;
|
|
cfd_to_drop = nullptr;
|
|
}
|
|
}
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
AtomicGroup, VersionSetTestDropOneCF,
|
|
testing::Values(VersionSetTestBase::kColumnFamilyName1,
|
|
VersionSetTestBase::kColumnFamilyName2,
|
|
VersionSetTestBase::kColumnFamilyName3));
|
|
} // namespace rocksdb
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|