Move NeedsCompaction() from VersionStorageInfo to CompactionPicker

Summary:
Move NeedsCompaction() from VersionStorageInfo to CompactionPicker
to allow different compaction strategy to have their own way to
determine whether doing compaction is necessary.

When compaction style is set to kCompactionStyleNone, then
NeedsCompaction() will always return false.

Test Plan:
export ROCKSDB_TESTS=Compact
./db_test

Reviewers: ljin, sdong, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D28719
This commit is contained in:
Yueh-Hsuan Chiang 2014-11-13 13:41:43 -08:00
parent cd0980150b
commit 1d1a64f58a
9 changed files with 219 additions and 76 deletions

View File

@ -675,6 +675,17 @@ Status CompactionPicker::SanitizeCompactionInputFiles(
return Status::OK();
}
bool LevelCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options) const {
for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
if (vstorage->CompactionScore(i) >= 1) {
return true;
}
}
return false;
}
Compaction* LevelCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
@ -829,6 +840,19 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
return c;
}
bool UniversalCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options) const {
const int kLevel0 = 0;
if (vstorage->LevelFiles(kLevel0).size() <
static_cast<size_t>(
mutable_cf_options.level0_file_num_compaction_trigger)) {
return false;
}
return true;
}
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
//
@ -1228,6 +1252,27 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
return c;
}
bool FIFOCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options) const {
const int kLevel0 = 0;
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
if (level_files.size() == 0) {
return false;
}
uint64_t total_size = 0;
for (const auto& file : level_files) {
total_size += file->fd.file_size;
}
if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) {
return false;
}
return true;
}
Compaction* FIFOCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
@ -1236,7 +1281,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
uint64_t total_size = 0;
for (const auto& file : level_files) {
total_size += file->compensated_file_size;
total_size += file->fd.file_size;
}
if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size ||

View File

@ -73,6 +73,10 @@ class CompactionPicker {
return NumberLevels() - 1;
}
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage,
const MutableCFOptions& cf_options) const = 0;
// Sanitize the input set of compaction input files.
// When the input parameters do not describe a valid compaction, the
// function will try to fix the input_files by adding necessary
@ -109,7 +113,6 @@ class CompactionPicker {
const VersionStorageInfo* vstorage,
const CompactionOptions& compact_options) const;
protected:
int NumberLevels() const { return ioptions_.num_levels; }
@ -184,6 +187,10 @@ class UniversalCompactionPicker : public CompactionPicker {
return 0;
}
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage,
const MutableCFOptions& cf_options) const override;
private:
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(
@ -218,6 +225,10 @@ class LevelCompactionPicker : public CompactionPicker {
return current_num_levels - 2;
}
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage,
const MutableCFOptions& cf_options) const override;
private:
// For the specfied level, pick a compaction.
// Returns nullptr if there is no compaction to be done.
@ -254,6 +265,10 @@ class FIFOCompactionPicker : public CompactionPicker {
virtual int MaxOutputLevel() const override {
return 0;
}
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage,
const MutableCFOptions& cf_options) const override;
};
class NullCompactionPicker : public CompactionPicker {
@ -285,6 +300,13 @@ class NullCompactionPicker : public CompactionPicker {
virtual int MaxInputLevel(int current_num_levels) const {
return current_num_levels - 2;
}
// Always returns false.
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage,
const MutableCFOptions& cf_options) const override {
return false;
}
};
// Utility function

View File

@ -19,95 +19,106 @@ class CountingLogger : public Logger {
class CompactionPickerTest {
public:
const Comparator* ucmp;
InternalKeyComparator icmp;
Options options;
ImmutableCFOptions ioptions;
MutableCFOptions mutable_cf_options;
const Comparator* ucmp_;
InternalKeyComparator icmp_;
Options options_;
ImmutableCFOptions ioptions_;
MutableCFOptions mutable_cf_options_;
LevelCompactionPicker level_compaction_picker;
std::string cf_name;
CountingLogger logger;
LogBuffer log_buffer;
VersionStorageInfo vstorage;
uint32_t file_num;
CompactionOptionsFIFO fifo_options;
std::vector<uint64_t> size_being_compacted;
std::string cf_name_;
CountingLogger logger_;
LogBuffer log_buffer_;
uint32_t file_num_;
CompactionOptionsFIFO fifo_options_;
std::vector<uint64_t> size_being_compacted_;
std::unique_ptr<VersionStorageInfo> vstorage_;
std::vector<std::unique_ptr<FileMetaData>> files_;
CompactionPickerTest()
: ucmp(BytewiseComparator()),
icmp(ucmp),
ioptions(options),
mutable_cf_options(options, ioptions),
level_compaction_picker(ioptions, &icmp),
cf_name("dummy"),
log_buffer(InfoLogLevel::INFO_LEVEL, &logger),
vstorage(&icmp, ucmp, options.num_levels, kCompactionStyleLevel,
nullptr),
file_num(1) {
fifo_options.max_table_files_size = 1;
mutable_cf_options.RefreshDerivedOptions(ioptions);
size_being_compacted.resize(options.num_levels);
: ucmp_(BytewiseComparator()),
icmp_(ucmp_),
ioptions_(options_),
mutable_cf_options_(options_, ioptions_),
level_compaction_picker(ioptions_, &icmp_),
cf_name_("dummy"),
log_buffer_(InfoLogLevel::INFO_LEVEL, &logger_),
file_num_(1),
vstorage_(nullptr) {
fifo_options_.max_table_files_size = 1;
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
size_being_compacted_.resize(options_.num_levels);
}
~CompactionPickerTest() {
for (int i = 0; i < vstorage.num_levels(); i++) {
for (auto* f : vstorage.LevelFiles(i)) {
delete f;
}
}
}
void NewVersionStorage(int num_levels, CompactionStyle style) {
DeleteVersionStorage();
options_.num_levels = num_levels;
vstorage_.reset(new VersionStorageInfo(
&icmp_, ucmp_, options_.num_levels, style, nullptr));
}
void DeleteVersionStorage() {
vstorage_.reset();
files_.clear();
}
void Add(int level, uint32_t file_number, const char* smallest,
const char* largest, uint64_t file_size = 0, uint32_t path_id = 0,
SequenceNumber smallest_seq = 100,
SequenceNumber largest_seq = 100) {
assert(level < vstorage.num_levels());
assert(level < vstorage_->num_levels());
FileMetaData* f = new FileMetaData;
f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
f->largest = InternalKey(largest, largest_seq, kTypeValue);
f->compensated_file_size = file_size;
f->refs = 0;
vstorage.MaybeAddFile(level, f);
vstorage_->AddFile(level, f);
files_.emplace_back(f);
}
void UpdateVersionStorageInfo() {
vstorage.ComputeCompactionScore(mutable_cf_options, fifo_options,
size_being_compacted);
vstorage.UpdateFilesBySize();
vstorage.UpdateNumNonEmptyLevels();
vstorage.GenerateFileIndexer();
vstorage.GenerateLevelFilesBrief();
vstorage.SetFinalized();
vstorage_->ComputeCompactionScore(mutable_cf_options_, fifo_options_,
size_being_compacted_);
vstorage_->UpdateFilesBySize();
vstorage_->UpdateNumNonEmptyLevels();
vstorage_->GenerateFileIndexer();
vstorage_->GenerateLevelFilesBrief();
vstorage_->SetFinalized();
}
};
TEST(CompactionPickerTest, Empty) {
NewVersionStorage(6, kCompactionStyleLevel);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() == nullptr);
}
TEST(CompactionPickerTest, Single) {
mutable_cf_options.level0_file_num_compaction_trigger = 2;
NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
Add(0, 1U, "p", "q");
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() == nullptr);
}
TEST(CompactionPickerTest, Level0Trigger) {
mutable_cf_options.level0_file_num_compaction_trigger = 2;
NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
Add(0, 1U, "150", "200");
Add(0, 2U, "200", "250");
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
@ -115,17 +126,19 @@ TEST(CompactionPickerTest, Level0Trigger) {
}
TEST(CompactionPickerTest, Level1Trigger) {
NewVersionStorage(6, kCompactionStyleLevel);
Add(1, 66U, "150", "200", 1000000000U);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(66U, compaction->input(0, 0)->fd.GetNumber());
}
TEST(CompactionPickerTest, Level1Trigger2) {
NewVersionStorage(6, kCompactionStyleLevel);
Add(1, 66U, "150", "200", 1000000001U);
Add(1, 88U, "201", "300", 1000000000U);
Add(2, 6U, "150", "179", 1000000000U);
@ -134,7 +147,7 @@ TEST(CompactionPickerTest, Level1Trigger2) {
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->num_input_files(1));
@ -144,8 +157,9 @@ TEST(CompactionPickerTest, Level1Trigger2) {
}
TEST(CompactionPickerTest, LevelMaxScore) {
mutable_cf_options.target_file_size_base = 10000000;
mutable_cf_options.target_file_size_multiplier = 10;
NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.target_file_size_base = 10000000;
mutable_cf_options_.target_file_size_multiplier = 10;
Add(0, 1U, "150", "200", 1000000000U);
// Level 1 score 1.2
Add(1, 66U, "150", "200", 6000000U);
@ -162,12 +176,90 @@ TEST(CompactionPickerTest, LevelMaxScore) {
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber());
}
TEST(CompactionPickerTest, NeedsCompactionLevel) {
const int kLevels = 6;
const int kFileCount = 20;
for (int level = 0; level < kLevels - 1; ++level) {
uint64_t file_size =
mutable_cf_options_.MaxBytesForLevel(level) * 2 / kFileCount;
for (int file_count = 1; file_count <= kFileCount; ++file_count) {
// start a brand new version in each test.
NewVersionStorage(kLevels, kCompactionStyleLevel);
for (int i = 0; i < file_count; ++i) {
Add(level, i, std::to_string((i + 100) * 1000).c_str(),
std::to_string((i + 100) * 1000 + 999).c_str(),
file_size, 0, i * 100, i * 100 + 99);
}
UpdateVersionStorageInfo();
ASSERT_EQ(vstorage_->CompactionScoreLevel(0), level);
ASSERT_EQ(level_compaction_picker.NeedsCompaction(
vstorage_.get(), mutable_cf_options_),
vstorage_->CompactionScore(0) >= 1);
// release the version storage
DeleteVersionStorage();
}
}
}
TEST(CompactionPickerTest, NeedsCompactionUniversal) {
NewVersionStorage(1, kCompactionStyleUniversal);
UniversalCompactionPicker universal_compaction_picker(
ioptions_, &icmp_);
// must return false when there's no files.
ASSERT_EQ(universal_compaction_picker.NeedsCompaction(
vstorage_.get(), mutable_cf_options_), false);
// verify the trigger given different number of L0 files.
for (int i = 1;
i <= mutable_cf_options_.level0_file_num_compaction_trigger * 2;
++i) {
Add(0, i, std::to_string((i + 100) * 1000).c_str(),
std::to_string((i + 100) * 1000 + 999).c_str(),
1000000, 0, i * 100, i * 100 + 99);
ASSERT_EQ(
universal_compaction_picker.NeedsCompaction(
vstorage_.get(), mutable_cf_options_),
i >= mutable_cf_options_.level0_file_num_compaction_trigger);
}
}
TEST(CompactionPickerTest, NeedsCompactionFIFO) {
NewVersionStorage(1, kCompactionStyleFIFO);
const int kFileCount =
mutable_cf_options_.level0_file_num_compaction_trigger * 3;
const uint64_t kFileSize = 100000;
const uint64_t kMaxSize = kFileSize * kFileCount / 2;
fifo_options_.max_table_files_size = kMaxSize;
ioptions_.compaction_options_fifo = fifo_options_;
FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
// must return false when there's no files.
ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(
vstorage_.get(), mutable_cf_options_), false);
// verify whether compaction is needed based on the current
// size of L0 files.
uint64_t current_size = 0;
for (int i = 1; i <= kFileCount; ++i) {
Add(0, i, std::to_string((i + 100) * 1000).c_str(),
std::to_string((i + 100) * 1000 + 999).c_str(),
kFileSize, 0, i * 100, i * 100 + 99);
current_size += kFileSize;
ASSERT_EQ(
fifo_compaction_picker.NeedsCompaction(
vstorage_.get(), mutable_cf_options_),
current_size > fifo_options_.max_table_files_size);
}
}
} // namespace rocksdb
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }

View File

@ -1669,7 +1669,9 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
bool is_compaction_needed = false;
// no need to refcount since we're under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->current()->storage_info()->NeedsCompaction()) {
if (cfd->compaction_picker()->NeedsCompaction(
cfd->current()->storage_info(),
*cfd->GetCurrentMutableCFOptions())) {
is_compaction_needed = true;
break;
}

View File

@ -235,7 +235,8 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type,
case kCompactionPending:
// 1 if the system already determines at least one compacdtion is needed.
// 0 otherwise,
*value = (vstorage->NeedsCompaction() ? 1 : 0);
*value = (cfd_->compaction_picker()->NeedsCompaction(
vstorage, *cfd_->GetCurrentMutableCFOptions()) ? 1 : 0);
return true;
case kBackgroundErrors:
// Accumulated number of errors in background flushes or compactions.

View File

@ -284,7 +284,7 @@ class VersionBuilder::Rep {
if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
// File is deleted: do nothing
} else {
vstorage->MaybeAddFile(level, f);
vstorage->AddFile(level, f);
}
}
};

View File

@ -66,7 +66,7 @@ class VersionBuilderTest {
f->refs = 0;
f->num_entries = num_entries;
f->num_deletions = num_deletions;
vstorage_.MaybeAddFile(level, f);
vstorage_.AddFile(level, f);
if (sampled) {
f->init_stats_from_file = true;
vstorage_.UpdateAccumulatedStats(f);

View File

@ -1051,7 +1051,7 @@ bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
} // anonymous namespace
void VersionStorageInfo::MaybeAddFile(int level, FileMetaData* f) {
void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
assert(level < num_levels());
auto* level_files = &files_[level];
// Must not overlap
@ -1125,22 +1125,6 @@ bool Version::Unref() {
return false;
}
bool VersionStorageInfo::NeedsCompaction() const {
// In universal compaction case, this check doesn't really
// check the compaction condition, but checks num of files threshold
// only. We are not going to miss any compaction opportunity
// but it's likely that more compactions are scheduled but
// ending up with nothing to do. We can improve it later.
// TODO(sdong): improve this function to be accurate for universal
// compactions.
for (int i = 0; i <= MaxInputLevel(); i++) {
if (compaction_score_[i] >= 1) {
return true;
}
}
return false;
}
bool VersionStorageInfo::OverlapInLevel(int level,
const Slice* smallest_user_key,
const Slice* largest_user_key) {

View File

@ -94,7 +94,7 @@ class VersionStorageInfo {
void Reserve(int level, size_t size) { files_[level].reserve(size); }
void MaybeAddFile(int level, FileMetaData* f);
void AddFile(int level, FileMetaData* f);
void SetFinalized() { finalized_ = true; }
@ -128,9 +128,6 @@ class VersionStorageInfo {
int MaxInputLevel() const;
// Returns true iff some level needs a compaction.
bool NeedsCompaction() const;
// Returns the maxmimum compaction score for levels 1 to max
double max_compaction_score() const { return max_compaction_score_; }