6059bdf86a
Summary: Some Mongo+Rocks datasets in Parse's environment are not doing compactions very frequently. During the quiet period (with no IO), we'd like to schedule compactions so that our reads become faster. Also, aggressively compacting during quiet periods helps when write bursts happen. In addition, we also want to compact files that are containing deleted key ranges (like old oplog keys). All of this is currently not possible with CompactRange() because it's single-threaded and blocks all other compactions from happening. Running CompactRange() risks an issue of blocking writes because we generate too much Level 0 files before the compaction is over. Stopping writes is very dangerous because they hold transaction locks. We tried running manual compaction once on Mongo+Rocks and everything fell apart. MarkForCompaction() solves all of those problems. This is very light-weight manual compaction. It is lower priority than automatic compactions, which means it shouldn't interfere with background process keeping the LSM tree clean. However, if no automatic compactions need to be run (or we have extra background threads available), we will start compacting files that are marked for compaction. Test Plan: added a new unit test Reviewers: yhchiang, rven, MarkCallaghan, sdong Reviewed By: sdong Subscribers: yoshinorim, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D37083
405 lines
16 KiB
C++
405 lines
16 KiB
C++
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
#include "db/compaction_picker.h"
|
|
#include <limits>
|
|
#include <string>
|
|
#include "util/logging.h"
|
|
#include "util/string_util.h"
|
|
#include "util/testharness.h"
|
|
#include "util/testutil.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
class CountingLogger : public Logger {
|
|
public:
|
|
using Logger::Logv;
|
|
virtual void Logv(const char* format, va_list ap) override { log_count++; }
|
|
size_t log_count;
|
|
};
|
|
|
|
class CompactionPickerTest : public testing::Test {
|
|
public:
|
|
const Comparator* ucmp_;
|
|
InternalKeyComparator icmp_;
|
|
Options options_;
|
|
ImmutableCFOptions ioptions_;
|
|
MutableCFOptions mutable_cf_options_;
|
|
LevelCompactionPicker level_compaction_picker;
|
|
std::string cf_name_;
|
|
CountingLogger logger_;
|
|
LogBuffer log_buffer_;
|
|
uint32_t file_num_;
|
|
CompactionOptionsFIFO fifo_options_;
|
|
std::unique_ptr<VersionStorageInfo> vstorage_;
|
|
std::vector<std::unique_ptr<FileMetaData>> files_;
|
|
|
|
CompactionPickerTest()
|
|
: ucmp_(BytewiseComparator()),
|
|
icmp_(ucmp_),
|
|
ioptions_(options_),
|
|
mutable_cf_options_(options_, ioptions_),
|
|
level_compaction_picker(ioptions_, &icmp_),
|
|
cf_name_("dummy"),
|
|
log_buffer_(InfoLogLevel::INFO_LEVEL, &logger_),
|
|
file_num_(1),
|
|
vstorage_(nullptr) {
|
|
fifo_options_.max_table_files_size = 1;
|
|
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
|
|
ioptions_.db_paths.emplace_back("dummy",
|
|
std::numeric_limits<uint64_t>::max());
|
|
}
|
|
|
|
~CompactionPickerTest() {
|
|
}
|
|
|
|
void NewVersionStorage(int num_levels, CompactionStyle style) {
|
|
DeleteVersionStorage();
|
|
options_.num_levels = num_levels;
|
|
vstorage_.reset(new VersionStorageInfo(
|
|
&icmp_, ucmp_, options_.num_levels, style, nullptr));
|
|
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
}
|
|
|
|
void DeleteVersionStorage() {
|
|
vstorage_.reset();
|
|
files_.clear();
|
|
}
|
|
|
|
void Add(int level, uint32_t file_number, const char* smallest,
|
|
const char* largest, uint64_t file_size = 0, uint32_t path_id = 0,
|
|
SequenceNumber smallest_seq = 100,
|
|
SequenceNumber largest_seq = 100) {
|
|
assert(level < vstorage_->num_levels());
|
|
FileMetaData* f = new FileMetaData;
|
|
f->fd = FileDescriptor(file_number, path_id, file_size);
|
|
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
|
|
f->largest = InternalKey(largest, largest_seq, kTypeValue);
|
|
f->compensated_file_size = file_size;
|
|
f->refs = 0;
|
|
vstorage_->AddFile(level, f);
|
|
files_.emplace_back(f);
|
|
}
|
|
|
|
void UpdateVersionStorageInfo() {
|
|
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
|
vstorage_->UpdateFilesBySize();
|
|
vstorage_->UpdateNumNonEmptyLevels();
|
|
vstorage_->GenerateFileIndexer();
|
|
vstorage_->GenerateLevelFilesBrief();
|
|
vstorage_->ComputeCompactionScore(mutable_cf_options_, fifo_options_);
|
|
vstorage_->SetFinalized();
|
|
}
|
|
};
|
|
|
|
TEST_F(CompactionPickerTest, Empty) {
|
|
NewVersionStorage(6, kCompactionStyleLevel);
|
|
UpdateVersionStorageInfo();
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() == nullptr);
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, Single) {
|
|
NewVersionStorage(6, kCompactionStyleLevel);
|
|
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
|
Add(0, 1U, "p", "q");
|
|
UpdateVersionStorageInfo();
|
|
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() == nullptr);
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, Level0Trigger) {
|
|
NewVersionStorage(6, kCompactionStyleLevel);
|
|
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
|
Add(0, 1U, "150", "200");
|
|
Add(0, 2U, "200", "250");
|
|
|
|
UpdateVersionStorageInfo();
|
|
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() != nullptr);
|
|
ASSERT_EQ(2U, compaction->num_input_files(0));
|
|
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
|
|
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, Level1Trigger) {
|
|
NewVersionStorage(6, kCompactionStyleLevel);
|
|
Add(1, 66U, "150", "200", 1000000000U);
|
|
UpdateVersionStorageInfo();
|
|
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() != nullptr);
|
|
ASSERT_EQ(1U, compaction->num_input_files(0));
|
|
ASSERT_EQ(66U, compaction->input(0, 0)->fd.GetNumber());
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, Level1Trigger2) {
|
|
NewVersionStorage(6, kCompactionStyleLevel);
|
|
Add(1, 66U, "150", "200", 1000000001U);
|
|
Add(1, 88U, "201", "300", 1000000000U);
|
|
Add(2, 6U, "150", "179", 1000000000U);
|
|
Add(2, 7U, "180", "220", 1000000000U);
|
|
Add(2, 8U, "221", "300", 1000000000U);
|
|
UpdateVersionStorageInfo();
|
|
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() != nullptr);
|
|
ASSERT_EQ(1U, compaction->num_input_files(0));
|
|
ASSERT_EQ(2U, compaction->num_input_files(1));
|
|
ASSERT_EQ(66U, compaction->input(0, 0)->fd.GetNumber());
|
|
ASSERT_EQ(6U, compaction->input(1, 0)->fd.GetNumber());
|
|
ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber());
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, LevelMaxScore) {
|
|
NewVersionStorage(6, kCompactionStyleLevel);
|
|
mutable_cf_options_.target_file_size_base = 10000000;
|
|
mutable_cf_options_.target_file_size_multiplier = 10;
|
|
Add(0, 1U, "150", "200", 1000000000U);
|
|
// Level 1 score 1.2
|
|
Add(1, 66U, "150", "200", 6000000U);
|
|
Add(1, 88U, "201", "300", 6000000U);
|
|
// Level 2 score 1.8. File 7 is the largest. Should be picked
|
|
Add(2, 6U, "150", "179", 60000000U);
|
|
Add(2, 7U, "180", "220", 60000001U);
|
|
Add(2, 8U, "221", "300", 60000000U);
|
|
// Level 3 score slightly larger than 1
|
|
Add(3, 26U, "150", "170", 260000000U);
|
|
Add(3, 27U, "171", "179", 260000000U);
|
|
Add(3, 28U, "191", "220", 260000000U);
|
|
Add(3, 29U, "221", "300", 260000000U);
|
|
UpdateVersionStorageInfo();
|
|
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() != nullptr);
|
|
ASSERT_EQ(1U, compaction->num_input_files(0));
|
|
ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber());
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, NeedsCompactionLevel) {
|
|
const int kLevels = 6;
|
|
const int kFileCount = 20;
|
|
|
|
for (int level = 0; level < kLevels - 1; ++level) {
|
|
NewVersionStorage(kLevels, kCompactionStyleLevel);
|
|
uint64_t file_size = vstorage_->MaxBytesForLevel(level) * 2 / kFileCount;
|
|
for (int file_count = 1; file_count <= kFileCount; ++file_count) {
|
|
// start a brand new version in each test.
|
|
NewVersionStorage(kLevels, kCompactionStyleLevel);
|
|
for (int i = 0; i < file_count; ++i) {
|
|
Add(level, i, ToString((i + 100) * 1000).c_str(),
|
|
ToString((i + 100) * 1000 + 999).c_str(),
|
|
file_size, 0, i * 100, i * 100 + 99);
|
|
}
|
|
UpdateVersionStorageInfo();
|
|
ASSERT_EQ(vstorage_->CompactionScoreLevel(0), level);
|
|
ASSERT_EQ(level_compaction_picker.NeedsCompaction(vstorage_.get()),
|
|
vstorage_->CompactionScore(0) >= 1);
|
|
// release the version storage
|
|
DeleteVersionStorage();
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, Level0TriggerDynamic) {
|
|
int num_levels = ioptions_.num_levels;
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
|
mutable_cf_options_.max_bytes_for_level_base = 200;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
|
|
NewVersionStorage(num_levels, kCompactionStyleLevel);
|
|
Add(0, 1U, "150", "200");
|
|
Add(0, 2U, "200", "250");
|
|
|
|
UpdateVersionStorageInfo();
|
|
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() != nullptr);
|
|
ASSERT_EQ(2U, compaction->num_input_files(0));
|
|
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
|
|
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
|
|
ASSERT_EQ(1, static_cast<int>(compaction->num_input_levels()));
|
|
ASSERT_EQ(num_levels - 1, compaction->output_level());
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, Level0TriggerDynamic2) {
|
|
int num_levels = ioptions_.num_levels;
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
|
mutable_cf_options_.max_bytes_for_level_base = 200;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
|
|
NewVersionStorage(num_levels, kCompactionStyleLevel);
|
|
Add(0, 1U, "150", "200");
|
|
Add(0, 2U, "200", "250");
|
|
Add(num_levels - 1, 3U, "200", "250", 300U);
|
|
|
|
UpdateVersionStorageInfo();
|
|
ASSERT_EQ(vstorage_->base_level(), num_levels - 2);
|
|
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() != nullptr);
|
|
ASSERT_EQ(2U, compaction->num_input_files(0));
|
|
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
|
|
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
|
|
ASSERT_EQ(1, static_cast<int>(compaction->num_input_levels()));
|
|
ASSERT_EQ(num_levels - 2, compaction->output_level());
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, Level0TriggerDynamic3) {
|
|
int num_levels = ioptions_.num_levels;
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
|
mutable_cf_options_.max_bytes_for_level_base = 200;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
|
|
NewVersionStorage(num_levels, kCompactionStyleLevel);
|
|
Add(0, 1U, "150", "200");
|
|
Add(0, 2U, "200", "250");
|
|
Add(num_levels - 1, 3U, "200", "250", 300U);
|
|
Add(num_levels - 1, 4U, "300", "350", 3000U);
|
|
|
|
UpdateVersionStorageInfo();
|
|
ASSERT_EQ(vstorage_->base_level(), num_levels - 3);
|
|
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() != nullptr);
|
|
ASSERT_EQ(2U, compaction->num_input_files(0));
|
|
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
|
|
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
|
|
ASSERT_EQ(1, static_cast<int>(compaction->num_input_levels()));
|
|
ASSERT_EQ(num_levels - 3, compaction->output_level());
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, Level0TriggerDynamic4) {
|
|
int num_levels = ioptions_.num_levels;
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
|
mutable_cf_options_.max_bytes_for_level_base = 200;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
|
|
NewVersionStorage(num_levels, kCompactionStyleLevel);
|
|
Add(0, 1U, "150", "200");
|
|
Add(0, 2U, "200", "250");
|
|
Add(num_levels - 1, 3U, "200", "250", 300U);
|
|
Add(num_levels - 1, 4U, "300", "350", 3000U);
|
|
Add(num_levels - 3, 5U, "150", "180", 3U);
|
|
Add(num_levels - 3, 6U, "181", "300", 3U);
|
|
Add(num_levels - 3, 7U, "400", "450", 3U);
|
|
|
|
UpdateVersionStorageInfo();
|
|
ASSERT_EQ(vstorage_->base_level(), num_levels - 3);
|
|
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() != nullptr);
|
|
ASSERT_EQ(2U, compaction->num_input_files(0));
|
|
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
|
|
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
|
|
ASSERT_EQ(2U, compaction->num_input_files(1));
|
|
ASSERT_EQ(num_levels - 3, compaction->level(1));
|
|
ASSERT_EQ(5U, compaction->input(1, 0)->fd.GetNumber());
|
|
ASSERT_EQ(6U, compaction->input(1, 1)->fd.GetNumber());
|
|
ASSERT_EQ(2, static_cast<int>(compaction->num_input_levels()));
|
|
ASSERT_EQ(num_levels - 3, compaction->output_level());
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, LevelTriggerDynamic4) {
|
|
int num_levels = ioptions_.num_levels;
|
|
ioptions_.level_compaction_dynamic_level_bytes = true;
|
|
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
|
mutable_cf_options_.max_bytes_for_level_base = 200;
|
|
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
|
|
NewVersionStorage(num_levels, kCompactionStyleLevel);
|
|
Add(0, 1U, "150", "200");
|
|
Add(num_levels - 1, 3U, "200", "250", 300U);
|
|
Add(num_levels - 1, 4U, "300", "350", 3000U);
|
|
Add(num_levels - 1, 4U, "400", "450", 3U);
|
|
Add(num_levels - 2, 5U, "150", "180", 300U);
|
|
Add(num_levels - 2, 6U, "181", "350", 500U);
|
|
Add(num_levels - 2, 7U, "400", "450", 200U);
|
|
|
|
UpdateVersionStorageInfo();
|
|
|
|
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
|
|
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
|
|
ASSERT_TRUE(compaction.get() != nullptr);
|
|
ASSERT_EQ(1U, compaction->num_input_files(0));
|
|
ASSERT_EQ(6U, compaction->input(0, 0)->fd.GetNumber());
|
|
ASSERT_EQ(2U, compaction->num_input_files(1));
|
|
ASSERT_EQ(3U, compaction->input(1, 0)->fd.GetNumber());
|
|
ASSERT_EQ(4U, compaction->input(1, 1)->fd.GetNumber());
|
|
ASSERT_EQ(2U, compaction->num_input_levels());
|
|
ASSERT_EQ(num_levels - 1, compaction->output_level());
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, NeedsCompactionUniversal) {
|
|
NewVersionStorage(1, kCompactionStyleUniversal);
|
|
UniversalCompactionPicker universal_compaction_picker(
|
|
ioptions_, &icmp_);
|
|
// must return false when there's no files.
|
|
ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()),
|
|
false);
|
|
UpdateVersionStorageInfo();
|
|
|
|
// verify the trigger given different number of L0 files.
|
|
for (int i = 1;
|
|
i <= mutable_cf_options_.level0_file_num_compaction_trigger * 2; ++i) {
|
|
NewVersionStorage(1, kCompactionStyleUniversal);
|
|
Add(0, i, ToString((i + 100) * 1000).c_str(),
|
|
ToString((i + 100) * 1000 + 999).c_str(), 1000000, 0, i * 100,
|
|
i * 100 + 99);
|
|
UpdateVersionStorageInfo();
|
|
ASSERT_EQ(level_compaction_picker.NeedsCompaction(vstorage_.get()),
|
|
vstorage_->CompactionScore(0) >= 1);
|
|
}
|
|
}
|
|
|
|
TEST_F(CompactionPickerTest, NeedsCompactionFIFO) {
|
|
NewVersionStorage(1, kCompactionStyleFIFO);
|
|
const int kFileCount =
|
|
mutable_cf_options_.level0_file_num_compaction_trigger * 3;
|
|
const uint64_t kFileSize = 100000;
|
|
const uint64_t kMaxSize = kFileSize * kFileCount / 2;
|
|
|
|
fifo_options_.max_table_files_size = kMaxSize;
|
|
ioptions_.compaction_options_fifo = fifo_options_;
|
|
FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
|
|
|
|
UpdateVersionStorageInfo();
|
|
// must return false when there's no files.
|
|
ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), false);
|
|
|
|
// verify whether compaction is needed based on the current
|
|
// size of L0 files.
|
|
uint64_t current_size = 0;
|
|
for (int i = 1; i <= kFileCount; ++i) {
|
|
NewVersionStorage(1, kCompactionStyleFIFO);
|
|
Add(0, i, ToString((i + 100) * 1000).c_str(),
|
|
ToString((i + 100) * 1000 + 999).c_str(),
|
|
kFileSize, 0, i * 100, i * 100 + 99);
|
|
current_size += kFileSize;
|
|
UpdateVersionStorageInfo();
|
|
ASSERT_EQ(level_compaction_picker.NeedsCompaction(vstorage_.get()),
|
|
vstorage_->CompactionScore(0) >= 1);
|
|
}
|
|
}
|
|
|
|
|
|
} // namespace rocksdb
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|