Delete triggered compaction for universal style

Summary:
This is still WIP, but I'm hoping for early feedback on the overall approach.

This patch implements deletion triggered compaction, which till now only
worked for leveled, for universal style. SST files are marked for
compaction by the CompactOnDeletionCollertor table property. This is
expected to be used when free disk space is low and the user wants to
reclaim space by deleting a bunch of keys. The deletions are expected to
be dense. In such a situation, we want to avoid a full compaction due to
its space overhead.

The strategy used in this case is similar to leveled. We pick one file
from the set of files marked for compaction. We then expand the inputs
to a clean cut on the same level, and then pick overlapping files from
the next non-mepty level. Picking files from the next level can cause
the key range to expand, and we opportunistically expand inputs in the
source level to include files wholly in this key range.

The main side effect of this is that it breaks the property of no time
range overlap between levels. This shouldn't break any functionality.
Closes https://github.com/facebook/rocksdb/pull/3860

Differential Revision: D8124397

Pulled By: anand1976

fbshipit-source-id: bfa2a9dd6817930e991b35d3a8e7e61304ed3dcf
This commit is contained in:
Anand Ananthabhotla 2018-05-29 15:42:14 -07:00 committed by Facebook Github Bot
parent 724855c7da
commit a736255de8
6 changed files with 526 additions and 120 deletions

View File

@ -366,7 +366,7 @@ bool CompactionPicker::IsRangeInCompaction(VersionStorageInfo* vstorage,
assert(level < NumberLevels());
vstorage->GetOverlappingInputs(level, smallest, largest, &inputs,
*level_index, level_index);
level_index ? *level_index : 0, level_index);
return AreFilesInCompaction(inputs);
}
@ -947,6 +947,78 @@ void CompactionPicker::UnregisterCompaction(Compaction* c) {
compactions_in_progress_.erase(c);
}
void CompactionPicker::PickFilesMarkedForCompaction(
const std::string& cf_name, VersionStorageInfo* vstorage, int* start_level,
int* output_level, CompactionInputFiles* start_level_inputs) {
if (vstorage->FilesMarkedForCompaction().empty()) {
return;
}
auto continuation = [&, cf_name](std::pair<int, FileMetaData*> level_file) {
// If it's being compacted it has nothing to do here.
// If this assert() fails that means that some function marked some
// files as being_compacted, but didn't call ComputeCompactionScore()
assert(!level_file.second->being_compacted);
*start_level = level_file.first;
*output_level =
(*start_level == 0) ? vstorage->base_level() : *start_level + 1;
if (*start_level == 0 && !level0_compactions_in_progress()->empty()) {
return false;
}
start_level_inputs->files = {level_file.second};
start_level_inputs->level = *start_level;
return ExpandInputsToCleanCut(cf_name, vstorage, start_level_inputs);
};
// take a chance on a random file first
Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage));
size_t random_file_index = static_cast<size_t>(rnd.Uniform(
static_cast<uint64_t>(vstorage->FilesMarkedForCompaction().size())));
if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) {
// found the compaction!
return;
}
for (auto& level_file : vstorage->FilesMarkedForCompaction()) {
if (continuation(level_file)) {
// found the compaction!
return;
}
}
start_level_inputs->files.clear();
}
bool CompactionPicker::GetOverlappingL0Files(
VersionStorageInfo* vstorage, CompactionInputFiles* start_level_inputs,
int output_level, int* parent_index) {
// Two level 0 compaction won't run at the same time, so don't need to worry
// about files on level 0 being compacted.
assert(level0_compactions_in_progress()->empty());
InternalKey smallest, largest;
GetRange(*start_level_inputs, &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
start_level_inputs->files.clear();
vstorage->GetOverlappingInputs(0, &smallest, &largest,
&(start_level_inputs->files));
// If we include more L0 files in the same compaction run it can
// cause the 'smallest' and 'largest' key to get extended to a
// larger range. So, re-invoke GetRange to get the new key range
GetRange(*start_level_inputs, &smallest, &largest);
if (IsRangeInCompaction(vstorage, &smallest, &largest, output_level,
parent_index)) {
return false;
}
assert(!start_level_inputs->files.empty());
return true;
}
bool LevelCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const {
if (!vstorage->ExpiredTtlFiles().empty()) {
@ -1018,9 +1090,6 @@ class LevelCompactionBuilder {
// otherwise, returns false.
bool PickIntraL0Compaction();
// If there is any file marked for compaction, put put it into inputs.
void PickFilesMarkedForCompaction();
void PickExpiredTtlFiles();
const std::string& cf_name_;
@ -1049,50 +1118,6 @@ class LevelCompactionBuilder {
static const int kMinFilesForIntraL0Compaction = 4;
};
void LevelCompactionBuilder::PickFilesMarkedForCompaction() {
if (vstorage_->FilesMarkedForCompaction().empty()) {
return;
}
auto continuation = [&](std::pair<int, FileMetaData*> level_file) {
// If it's being compacted it has nothing to do here.
// If this assert() fails that means that some function marked some
// files as being_compacted, but didn't call ComputeCompactionScore()
assert(!level_file.second->being_compacted);
start_level_ = level_file.first;
output_level_ =
(start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
if (start_level_ == 0 &&
!compaction_picker_->level0_compactions_in_progress()->empty()) {
return false;
}
start_level_inputs_.files = {level_file.second};
start_level_inputs_.level = start_level_;
return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&start_level_inputs_);
};
// take a chance on a random file first
Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage_));
size_t random_file_index = static_cast<size_t>(rnd.Uniform(
static_cast<uint64_t>(vstorage_->FilesMarkedForCompaction().size())));
if (continuation(vstorage_->FilesMarkedForCompaction()[random_file_index])) {
// found the compaction!
return;
}
for (auto& level_file : vstorage_->FilesMarkedForCompaction()) {
if (continuation(level_file)) {
// found the compaction!
return;
}
}
start_level_inputs_.files.clear();
}
void LevelCompactionBuilder::PickExpiredTtlFiles() {
if (vstorage_->ExpiredTtlFiles().empty()) {
return;
@ -1182,7 +1207,9 @@ void LevelCompactionBuilder::SetupInitialFiles() {
if (start_level_inputs_.empty()) {
parent_index_ = base_index_ = -1;
PickFilesMarkedForCompaction();
// PickFilesMarkedForCompaction();
compaction_picker_->PickFilesMarkedForCompaction(
cf_name_, vstorage_, &start_level_, &output_level_, &start_level_inputs_);
if (!start_level_inputs_.empty()) {
is_manual_ = true;
compaction_reason_ = CompactionReason::kFilesMarkedForCompaction;
@ -1220,29 +1247,9 @@ void LevelCompactionBuilder::SetupInitialFiles() {
bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() {
if (start_level_ == 0 && output_level_ != 0) {
// Two level 0 compaction won't run at the same time, so don't need to worry
// about files on level 0 being compacted.
assert(compaction_picker_->level0_compactions_in_progress()->empty());
InternalKey smallest, largest;
compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
start_level_inputs_.files.clear();
vstorage_->GetOverlappingInputs(0, &smallest, &largest,
&start_level_inputs_.files);
// If we include more L0 files in the same compaction run it can
// cause the 'smallest' and 'largest' key to get extended to a
// larger range. So, re-invoke GetRange to get the new key range
compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
if (compaction_picker_->IsRangeInCompaction(
vstorage_, &smallest, &largest, output_level_, &parent_index_)) {
return false;
}
return compaction_picker_->GetOverlappingL0Files(
vstorage_, &start_level_inputs_, output_level_, &parent_index_);
}
assert(!start_level_inputs_.files.empty());
return true;
}

View File

@ -175,6 +175,15 @@ class CompactionPicker {
const CompactionInputFiles& output_level_inputs,
std::vector<FileMetaData*>* grandparents);
void PickFilesMarkedForCompaction(const std::string& cf_name,
VersionStorageInfo* vstorage,
int* start_level, int* output_level,
CompactionInputFiles* start_level_inputs);
bool GetOverlappingL0Files(VersionStorageInfo* vstorage,
CompactionInputFiles* start_level_inputs,
int output_level, int* parent_index);
// Register this compaction in the set of running compactions
void RegisterCompaction(Compaction* c);

View File

@ -391,10 +391,10 @@ TEST_F(CompactionPickerTest, NeedsCompactionUniversal) {
NewVersionStorage(1, kCompactionStyleUniversal);
UniversalCompactionPicker universal_compaction_picker(
ioptions_, &icmp_);
UpdateVersionStorageInfo();
// must return false when there's no files.
ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()),
false);
UpdateVersionStorageInfo();
// verify the trigger given different number of L0 files.
for (int i = 1;
@ -415,6 +415,7 @@ TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) {
ioptions_.allow_ingest_behind = true;
ioptions_.num_levels = 3;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
UpdateVersionStorageInfo();
// must return false when there's no files.
ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()),
false);
@ -448,6 +449,7 @@ TEST_F(CompactionPickerTest, CannotTrivialMoveUniversal) {
mutable_cf_options_.compaction_options_universal.allow_trivial_move = true;
NewVersionStorage(1, kCompactionStyleUniversal);
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
UpdateVersionStorageInfo();
// must return false when there's no files.
ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()),
false);

View File

@ -162,7 +162,13 @@ bool UniversalCompactionPicker::IsInputFilesNonOverlapping(Compaction* c) {
bool UniversalCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const {
const int kLevel0 = 0;
return vstorage->CompactionScore(kLevel0) >= 1;
if (vstorage->CompactionScore(kLevel0) >= 1) {
return true;
}
if (!vstorage->FilesMarkedForCompaction().empty()) {
return true;
}
return false;
}
void UniversalCompactionPicker::SortedRun::Dump(char* out_buf,
@ -257,8 +263,9 @@ Compaction* UniversalCompactionPicker::PickCompaction(
CalculateSortedRuns(*vstorage, ioptions_, mutable_cf_options);
if (sorted_runs.size() == 0 ||
sorted_runs.size() <
(unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) {
(vstorage->FilesMarkedForCompaction().empty() &&
sorted_runs.size() < (unsigned int)mutable_cf_options
.level0_file_num_compaction_trigger)) {
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n",
cf_name.c_str());
TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
@ -272,58 +279,73 @@ Compaction* UniversalCompactionPicker::PickCompaction(
cf_name.c_str(), sorted_runs.size(), vstorage->LevelSummary(&tmp));
// Check for size amplification first.
Compaction* c;
if ((c = PickCompactionToReduceSizeAmp(cf_name, mutable_cf_options, vstorage,
score, sorted_runs, log_buffer)) !=
nullptr) {
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n",
cf_name.c_str());
} else {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio =
mutable_cf_options.compaction_options_universal.size_ratio;
if ((c = PickCompactionToReduceSortedRuns(
cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX,
sorted_runs, log_buffer)) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Universal: compacting for size ratio\n",
Compaction* c = nullptr;
if (sorted_runs.size() >=
static_cast<size_t>(
mutable_cf_options.level0_file_num_compaction_trigger)) {
if ((c = PickCompactionToReduceSizeAmp(cf_name, mutable_cf_options,
vstorage, score, sorted_runs,
log_buffer)) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n",
cf_name.c_str());
} else {
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
// This is guaranteed by NeedsCompaction()
assert(sorted_runs.size() >=
static_cast<size_t>(
mutable_cf_options.level0_file_num_compaction_trigger));
// Get the total number of sorted runs that are not being compacted
int num_sr_not_compacted = 0;
for (size_t i = 0; i < sorted_runs.size(); i++) {
if (sorted_runs[i].being_compacted == false) {
num_sr_not_compacted++;
}
}
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio =
mutable_cf_options.compaction_options_universal.size_ratio;
// The number of sorted runs that are not being compacted is greater than
// the maximum allowed number of sorted runs
if (num_sr_not_compacted >
mutable_cf_options.level0_file_num_compaction_trigger) {
unsigned int num_files =
num_sr_not_compacted -
mutable_cf_options.level0_file_num_compaction_trigger + 1;
if ((c = PickCompactionToReduceSortedRuns(
cf_name, mutable_cf_options, vstorage, score, UINT_MAX,
num_files, sorted_runs, log_buffer)) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Universal: compacting for file num -- %u\n",
cf_name.c_str(), num_files);
if ((c = PickCompactionToReduceSortedRuns(
cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX,
sorted_runs, log_buffer)) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Universal: compacting for size ratio\n",
cf_name.c_str());
} else {
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
// This is guaranteed by NeedsCompaction()
assert(sorted_runs.size() >=
static_cast<size_t>(
mutable_cf_options.level0_file_num_compaction_trigger));
// Get the total number of sorted runs that are not being compacted
int num_sr_not_compacted = 0;
for (size_t i = 0; i < sorted_runs.size(); i++) {
if (sorted_runs[i].being_compacted == false) {
num_sr_not_compacted++;
}
}
// The number of sorted runs that are not being compacted is greater
// than the maximum allowed number of sorted runs
if (num_sr_not_compacted >
mutable_cf_options.level0_file_num_compaction_trigger) {
unsigned int num_files =
num_sr_not_compacted -
mutable_cf_options.level0_file_num_compaction_trigger + 1;
if ((c = PickCompactionToReduceSortedRuns(
cf_name, mutable_cf_options, vstorage, score, UINT_MAX,
num_files, sorted_runs, log_buffer)) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Universal: compacting for file num -- %u\n",
cf_name.c_str(), num_files);
}
}
}
}
}
if (c == nullptr) {
if ((c = PickDeleteTriggeredCompaction(cf_name, mutable_cf_options,
vstorage, score, sorted_runs,
log_buffer)) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Universal: delete triggered compaction\n",
cf_name.c_str());
}
}
if (c == nullptr) {
TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
nullptr);
@ -753,6 +775,125 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
score, false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification);
}
// Pick files marked for compaction. Typically, files are marked by
// CompactOnDeleteCollector due to the presence of tombstones.
Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& /*sorted_runs*/, LogBuffer* /*log_buffer*/) {
CompactionInputFiles start_level_inputs;
int output_level;
std::vector<CompactionInputFiles> inputs;
if (vstorage->num_levels() == 1) {
// This is single level universal. Since we're basically trying to reclaim
// space by processing files marked for compaction due to high tombstone
// density, let's do the same thing as compaction to reduce size amp which
// has the same goals.
bool compact = false;
start_level_inputs.level = 0;
start_level_inputs.files.clear();
output_level = 0;
for (FileMetaData* f : vstorage->LevelFiles(0)) {
if (f->marked_for_compaction) {
compact = true;
}
if (compact) {
start_level_inputs.files.push_back(f);
}
}
if (start_level_inputs.size() <= 1) {
// If only the last file in L0 is marked for compaction, ignore it
return nullptr;
}
inputs.push_back(start_level_inputs);
} else {
int start_level;
// For multi-level universal, the strategy is to make this look more like
// leveled. We pick one of the files marked for compaction and compact with
// overlapping files in the adjacent level.
PickFilesMarkedForCompaction(cf_name, vstorage, &start_level, &output_level,
&start_level_inputs);
if (start_level_inputs.empty()) {
return nullptr;
}
// Pick the first non-empty level after the start_level
for (output_level = start_level + 1; output_level < vstorage->num_levels();
output_level++) {
if (vstorage->NumLevelFiles(output_level) != 0) {
break;
}
}
// If all higher levels are empty, pick the highest level as output level
if (output_level == vstorage->num_levels()) {
if (start_level == 0) {
output_level = vstorage->num_levels() - 1;
} else {
// If start level is non-zero and all higher levels are empty, this
// compaction will translate into a trivial move. Since the idea is
// to reclaim space and trivial move doesn't help with that, we
// skip compaction in this case and return nullptr
return nullptr;
}
}
if (ioptions_.allow_ingest_behind &&
output_level == vstorage->num_levels() - 1) {
assert(output_level > 1);
output_level--;
}
if (output_level != 0) {
if (start_level == 0) {
if (!GetOverlappingL0Files(vstorage, &start_level_inputs, output_level,
nullptr)) {
return nullptr;
}
}
CompactionInputFiles output_level_inputs;
int parent_index = -1;
output_level_inputs.level = output_level;
if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage,
&start_level_inputs, &output_level_inputs,
&parent_index, -1)) {
return nullptr;
}
inputs.push_back(start_level_inputs);
if (!output_level_inputs.empty()) {
inputs.push_back(output_level_inputs);
}
if (FilesRangeOverlapWithCompaction(inputs, output_level)) {
return nullptr;
}
} else {
inputs.push_back(start_level_inputs);
}
}
uint64_t estimated_total_size = 0;
// Use size of the output level as estimated file size
for (FileMetaData* f : vstorage->LevelFiles(output_level)) {
estimated_total_size += f->fd.GetFileSize();
}
uint32_t path_id =
GetPathId(ioptions_, mutable_cf_options, estimated_total_size);
return new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options, output_level,
kCompactionStyleUniversal),
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
1),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true,
score, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction);
}
} // namespace rocksdb
#endif // !ROCKSDB_LITE

View File

@ -73,6 +73,11 @@ class UniversalCompactionPicker : public CompactionPicker {
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer);
Compaction* PickDeleteTriggeredCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer);
// Used in universal compaction when the enabled_trivial_move
// option is set. Checks whether there are any overlapping files
// in the input. Returns true if the input files are non

View File

@ -10,6 +10,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#if !defined(ROCKSDB_LITE)
#include "rocksdb/utilities/table_properties_collectors.h"
#include "util/sync_point.h"
namespace rocksdb {
@ -40,6 +41,12 @@ class DBTestUniversalCompaction : public DBTestUniversalCompactionBase {
DBTestUniversalCompactionBase("/db_universal_compaction_test") {}
};
class DBTestUniversalDeleteTrigCompaction : public DBTestBase {
public:
DBTestUniversalDeleteTrigCompaction()
: DBTestBase("/db_universal_compaction_test") {}
};
namespace {
void VerifyCompactionResult(
const ColumnFamilyMetaData& cf_meta,
@ -1845,6 +1852,241 @@ INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId,
::testing::Combine(::testing::Values(1, 8),
::testing::Bool()));
TEST_F(DBTestUniversalDeleteTrigCompaction, BasicL0toL1) {
const int kNumKeys = 3000;
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 2;
opts.compression = kNoCompression;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
Reopen(opts);
// add an L1 file to prevent tombstones from dropping due to obsolescence
// during flush
int i;
for (i = 0; i < 2000; ++i) {
Put(Key(i), "val");
}
Flush();
// MoveFilesToLevel(6);
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
for (i = 1999; i < kNumKeys; ++i) {
if (i >= kNumKeys - kWindowSize &&
i < kNumKeys - kWindowSize + kNumDelsTrigger) {
Delete(Key(i));
} else {
Put(Key(i), "val");
}
}
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
TEST_F(DBTestUniversalDeleteTrigCompaction, SingleLevel) {
const int kNumKeys = 3000;
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 2;
opts.compression = kNoCompression;
opts.num_levels = 1;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
Reopen(opts);
// add an L1 file to prevent tombstones from dropping due to obsolescence
// during flush
int i;
for (i = 0; i < 2000; ++i) {
Put(Key(i), "val");
}
Flush();
for (i = 1999; i < kNumKeys; ++i) {
if (i >= kNumKeys - kWindowSize &&
i < kNumKeys - kWindowSize + kNumDelsTrigger) {
Delete(Key(i));
} else {
Put(Key(i), "val");
}
}
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
TEST_F(DBTestUniversalDeleteTrigCompaction, MultipleLevels) {
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 4;
opts.compression = kNoCompression;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
Reopen(opts);
// add an L1 file to prevent tombstones from dropping due to obsolescence
// during flush
int i;
for (i = 0; i < 500; ++i) {
Put(Key(i), "val");
}
Flush();
for (i = 500; i < 1000; ++i) {
Put(Key(i), "val");
}
Flush();
for (i = 1000; i < 1500; ++i) {
Put(Key(i), "val");
}
Flush();
for (i = 1500; i < 2000; ++i) {
Put(Key(i), "val");
}
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(6), 0);
for (i = 1999; i < 2333; ++i) {
Put(Key(i), "val");
}
Flush();
for (i = 2333; i < 2666; ++i) {
Put(Key(i), "val");
}
Flush();
for (i = 2666; i < 2999; ++i) {
Put(Key(i), "val");
}
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(6), 0);
ASSERT_GT(NumTableFilesAtLevel(5), 0);
for (i = 1900; i < 2100; ++i) {
Delete(Key(i));
}
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
ASSERT_EQ(0, NumTableFilesAtLevel(2));
ASSERT_EQ(0, NumTableFilesAtLevel(3));
ASSERT_EQ(0, NumTableFilesAtLevel(4));
ASSERT_EQ(0, NumTableFilesAtLevel(5));
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
TEST_F(DBTestUniversalDeleteTrigCompaction, OverlappingL0) {
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 5;
opts.compression = kNoCompression;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
Reopen(opts);
// add an L1 file to prevent tombstones from dropping due to obsolescence
// during flush
int i;
for (i = 0; i < 2000; ++i) {
Put(Key(i), "val");
}
Flush();
for (i = 2000; i < 3000; ++i) {
Put(Key(i), "val");
}
Flush();
for (i = 3500; i < 4000; ++i) {
Put(Key(i), "val");
}
Flush();
for (i = 2900; i < 3100; ++i) {
Delete(Key(i));
}
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(2, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
TEST_F(DBTestUniversalDeleteTrigCompaction, IngestBehind) {
const int kNumKeys = 3000;
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 2;
opts.compression = kNoCompression;
opts.allow_ingest_behind = true;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
Reopen(opts);
// add an L1 file to prevent tombstones from dropping due to obsolescence
// during flush
int i;
for (i = 0; i < 2000; ++i) {
Put(Key(i), "val");
}
Flush();
// MoveFilesToLevel(6);
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
for (i = 1999; i < kNumKeys; ++i) {
if (i >= kNumKeys - kWindowSize &&
i < kNumKeys - kWindowSize + kNumDelsTrigger) {
Delete(Key(i));
} else {
Put(Key(i), "val");
}
}
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(6));
ASSERT_GT(NumTableFilesAtLevel(5), 0);
}
} // namespace rocksdb
#endif // !defined(ROCKSDB_LITE)