Merge pull request #47 from mlin/kCompactionStopStyleSimilarSize
An initial implementation of kCompactionStopStyleSimilarSize for universal compaction
This commit is contained in:
commit
8ca30bd51b
@ -175,41 +175,72 @@ void Compaction::ResetNextCompactionIndex() {
|
|||||||
input_version_->ResetNextCompactionIndex(level_);
|
input_version_->ResetNextCompactionIndex(level_);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void InputSummary(std::vector<FileMetaData*>& files, char* output,
|
/*
|
||||||
|
for sizes >=10TB, print "XXTB"
|
||||||
|
for sizes >=10GB, print "XXGB"
|
||||||
|
etc.
|
||||||
|
*/
|
||||||
|
static void FileSizeSummary(unsigned long long sz, char* output, int len) {
|
||||||
|
const unsigned long long ull10 = 10;
|
||||||
|
if (sz >= ull10<<40) {
|
||||||
|
snprintf(output, len, "%lluTB", sz>>40);
|
||||||
|
} else if (sz >= ull10<<30) {
|
||||||
|
snprintf(output, len, "%lluGB", sz>>30);
|
||||||
|
} else if (sz >= ull10<<20) {
|
||||||
|
snprintf(output, len, "%lluMB", sz>>20);
|
||||||
|
} else if (sz >= ull10<<10) {
|
||||||
|
snprintf(output, len, "%lluKB", sz>>10);
|
||||||
|
} else {
|
||||||
|
snprintf(output, len, "%lluB", sz);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int InputSummary(std::vector<FileMetaData*>& files, char* output,
|
||||||
int len) {
|
int len) {
|
||||||
int write = 0;
|
int write = 0;
|
||||||
for (unsigned int i = 0; i < files.size(); i++) {
|
for (unsigned int i = 0; i < files.size(); i++) {
|
||||||
int sz = len - write;
|
int sz = len - write;
|
||||||
int ret = snprintf(output + write, sz, "%lu(%lu) ",
|
int ret;
|
||||||
(unsigned long)files.at(i)->number,
|
char sztxt[16];
|
||||||
(unsigned long)files.at(i)->file_size);
|
FileSizeSummary((unsigned long long)files.at(i)->file_size, sztxt, 16);
|
||||||
|
ret = snprintf(output + write, sz, "%lu(%s) ",
|
||||||
|
(unsigned long)files.at(i)->number,
|
||||||
|
sztxt);
|
||||||
if (ret < 0 || ret >= sz)
|
if (ret < 0 || ret >= sz)
|
||||||
break;
|
break;
|
||||||
write += ret;
|
write += ret;
|
||||||
}
|
}
|
||||||
|
return write;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Compaction::Summary(char* output, int len) {
|
void Compaction::Summary(char* output, int len) {
|
||||||
int write = snprintf(output, len,
|
int write = snprintf(output, len,
|
||||||
"Base version %lu Base level %d, seek compaction:%d, inputs:",
|
"Base version %lu Base level %d, seek compaction:%d, inputs: [",
|
||||||
(unsigned long)input_version_->GetVersionNumber(),
|
(unsigned long)input_version_->GetVersionNumber(),
|
||||||
level_,
|
level_,
|
||||||
seek_compaction_);
|
seek_compaction_);
|
||||||
if (write < 0 || write > len) {
|
if (write < 0 || write >= len) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
char level_low_summary[100];
|
write += InputSummary(inputs_[0], output+write, len-write);
|
||||||
InputSummary(inputs_[0], level_low_summary, sizeof(level_low_summary));
|
if (write < 0 || write >= len) {
|
||||||
char level_up_summary[100];
|
return;
|
||||||
if (inputs_[1].size()) {
|
|
||||||
InputSummary(inputs_[1], level_up_summary, sizeof(level_up_summary));
|
|
||||||
} else {
|
|
||||||
level_up_summary[0] = '\0';
|
|
||||||
}
|
}
|
||||||
|
|
||||||
snprintf(output + write, len - write, "[%s],[%s]",
|
write += snprintf(output+write, len-write, "],[");
|
||||||
level_low_summary, level_up_summary);
|
if (write < 0 || write >= len) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inputs_[1].size()) {
|
||||||
|
write += InputSummary(inputs_[1], output+write, len-write);
|
||||||
|
}
|
||||||
|
if (write < 0 || write >= len) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
snprintf(output+write, len-write, "]");
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -559,22 +559,27 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
|
|||||||
version->LevelFileSummary(&tmp, 0));
|
version->LevelFileSummary(&tmp, 0));
|
||||||
|
|
||||||
// Check for size amplification first.
|
// Check for size amplification first.
|
||||||
Compaction* c = PickCompactionUniversalSizeAmp(version, score);
|
Compaction* c;
|
||||||
if (c == nullptr) {
|
if ((c = PickCompactionUniversalSizeAmp(version, score)) != nullptr) {
|
||||||
|
Log(options_->info_log, "Universal: compacting for size amp\n");
|
||||||
|
} else {
|
||||||
|
|
||||||
// Size amplification is within limits. Try reducing read
|
// Size amplification is within limits. Try reducing read
|
||||||
// amplification while maintaining file size ratios.
|
// amplification while maintaining file size ratios.
|
||||||
unsigned int ratio = options_->compaction_options_universal.size_ratio;
|
unsigned int ratio = options_->compaction_options_universal.size_ratio;
|
||||||
c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX);
|
|
||||||
|
|
||||||
// Size amplification and file size ratios are within configured limits.
|
if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX)) != nullptr) {
|
||||||
// If max read amplification is exceeding configured limits, then force
|
Log(options_->info_log, "Universal: compacting for size ratio\n");
|
||||||
// compaction without looking at filesize ratios and try to reduce
|
} else {
|
||||||
// the number of files to fewer than level0_file_num_compaction_trigger.
|
// Size amplification and file size ratios are within configured limits.
|
||||||
if (c == nullptr) {
|
// If max read amplification is exceeding configured limits, then force
|
||||||
|
// compaction without looking at filesize ratios and try to reduce
|
||||||
|
// the number of files to fewer than level0_file_num_compaction_trigger.
|
||||||
unsigned int num_files = version->files_[level].size() -
|
unsigned int num_files = version->files_[level].size() -
|
||||||
options_->level0_file_num_compaction_trigger;
|
options_->level0_file_num_compaction_trigger;
|
||||||
c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files);
|
if ((c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files)) != nullptr) {
|
||||||
|
Log(options_->info_log, "Universal: compacting for file num\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (c == nullptr) {
|
if (c == nullptr) {
|
||||||
@ -684,14 +689,32 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
|||||||
if (f->being_compacted) {
|
if (f->being_compacted) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// pick files if the total candidate file size (increased by the
|
// Pick files if the total/last candidate file size (increased by the
|
||||||
// specified ratio) is still larger than the next candidate file.
|
// specified ratio) is still larger than the next candidate file.
|
||||||
|
// candidate_size is the total size of files picked so far with the
|
||||||
|
// default kCompactionStopStyleTotalSize; with
|
||||||
|
// kCompactionStopStyleSimilarSize, it's simply the size of the last
|
||||||
|
// picked file.
|
||||||
uint64_t sz = (candidate_size * (100L + ratio)) /100;
|
uint64_t sz = (candidate_size * (100L + ratio)) /100;
|
||||||
if (sz < f->file_size) {
|
if (sz < f->file_size) {
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) {
|
||||||
|
// Similar-size stopping rule: also check the last picked file isn't
|
||||||
|
// far larger than the next candidate file.
|
||||||
|
sz = (f->file_size * (100L + ratio)) / 100;
|
||||||
|
if (sz < candidate_size) {
|
||||||
|
// If the small file we've encountered begins a run of similar-size
|
||||||
|
// files, we'll pick them up on a future iteration of the outer
|
||||||
|
// loop. If it's some lonely straggler, it'll eventually get picked
|
||||||
|
// by the last-resort read amp strategy which disregards size ratios.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
candidate_size = f->file_size;
|
||||||
|
} else { // default kCompactionStopStyleTotalSize
|
||||||
|
candidate_size += f->file_size;
|
||||||
}
|
}
|
||||||
candidate_count++;
|
candidate_count++;
|
||||||
candidate_size += f->file_size;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Found a series of consecutive files that need compaction.
|
// Found a series of consecutive files that need compaction.
|
||||||
|
@ -2332,7 +2332,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
|||||||
compact->compaction->output_level(),
|
compact->compaction->output_level(),
|
||||||
compact->compaction->score(),
|
compact->compaction->score(),
|
||||||
options_.max_background_compactions - bg_compaction_scheduled_);
|
options_.max_background_compactions - bg_compaction_scheduled_);
|
||||||
char scratch[256];
|
char scratch[2345];
|
||||||
compact->compaction->Summary(scratch, sizeof(scratch));
|
compact->compaction->Summary(scratch, sizeof(scratch));
|
||||||
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
|
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
|
||||||
|
|
||||||
|
@ -2427,6 +2427,89 @@ TEST(DBTest, UniversalCompactionOptions) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.compaction_style = kCompactionStyleUniversal;
|
||||||
|
options.write_buffer_size = 100<<10; //100KB
|
||||||
|
// trigger compaction if there are >= 4 files
|
||||||
|
options.level0_file_num_compaction_trigger = 4;
|
||||||
|
options.compaction_options_universal.size_ratio = 10;
|
||||||
|
options.compaction_options_universal.stop_style = kCompactionStopStyleSimilarSize;
|
||||||
|
options.num_levels=1;
|
||||||
|
Reopen(&options);
|
||||||
|
|
||||||
|
Random rnd(301);
|
||||||
|
int key_idx = 0;
|
||||||
|
|
||||||
|
// Stage 1:
|
||||||
|
// Generate a set of files at level 0, but don't trigger level-0
|
||||||
|
// compaction.
|
||||||
|
for (int num = 0;
|
||||||
|
num < options.level0_file_num_compaction_trigger-1;
|
||||||
|
num++) {
|
||||||
|
// Write 120KB (12 values, each 10K)
|
||||||
|
for (int i = 0; i < 12; i++) {
|
||||||
|
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
|
||||||
|
key_idx++;
|
||||||
|
}
|
||||||
|
dbfull()->TEST_WaitForFlushMemTable();
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), num + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate one more file at level-0, which should trigger level-0
|
||||||
|
// compaction.
|
||||||
|
for (int i = 0; i < 12; i++) {
|
||||||
|
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
|
||||||
|
key_idx++;
|
||||||
|
}
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
// Suppose each file flushed from mem table has size 1. Now we compact
|
||||||
|
// (level0_file_num_compaction_trigger+1)=4 files and should have a big
|
||||||
|
// file of size 4.
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
|
||||||
|
|
||||||
|
// Stage 2:
|
||||||
|
// Now we have one file at level 0, with size 4. We also have some data in
|
||||||
|
// mem table. Let's continue generating new files at level 0, but don't
|
||||||
|
// trigger level-0 compaction.
|
||||||
|
// First, clean up memtable before inserting new data. This will generate
|
||||||
|
// a level-0 file, with size around 0.4 (according to previously written
|
||||||
|
// data amount).
|
||||||
|
dbfull()->Flush(FlushOptions());
|
||||||
|
for (int num = 0;
|
||||||
|
num < options.level0_file_num_compaction_trigger-3;
|
||||||
|
num++) {
|
||||||
|
// Write 120KB (12 values, each 10K)
|
||||||
|
for (int i = 0; i < 12; i++) {
|
||||||
|
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
|
||||||
|
key_idx++;
|
||||||
|
}
|
||||||
|
dbfull()->TEST_WaitForFlushMemTable();
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), num + 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate one more file at level-0, which should trigger level-0
|
||||||
|
// compaction.
|
||||||
|
for (int i = 0; i < 12; i++) {
|
||||||
|
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
|
||||||
|
key_idx++;
|
||||||
|
}
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
// Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
|
||||||
|
// After compaction, we should have 3 files, with size 4, 0.4, 2.
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), 3);
|
||||||
|
// Stage 3:
|
||||||
|
// Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one
|
||||||
|
// more file at level-0, which should trigger level-0 compaction.
|
||||||
|
for (int i = 0; i < 12; i++) {
|
||||||
|
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
|
||||||
|
key_idx++;
|
||||||
|
}
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
// Level-0 compaction is triggered, but no file will be picked up.
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), 4);
|
||||||
|
}
|
||||||
|
|
||||||
#if defined(SNAPPY) && defined(ZLIB) && defined(BZIP2)
|
#if defined(SNAPPY) && defined(ZLIB) && defined(BZIP2)
|
||||||
TEST(DBTest, CompressedCache) {
|
TEST(DBTest, CompressedCache) {
|
||||||
int num_iter = 80;
|
int num_iter = 80;
|
||||||
|
Loading…
Reference in New Issue
Block a user