Don't let two L0->L1 compactions run in parallel

Summary: With experimental feature SuggestCompactRange() we don't restrict running two L0->L1 compactions in parallel. This diff fixes this.

Test Plan: added a unit test to reproduce the failure. fixed the unit test

Reviewers: yhchiang, rven, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D39981
This commit is contained in:
Igor Canadi 2015-06-11 15:42:16 -07:00
parent d6ce0f7c61
commit a84df655f3
2 changed files with 54 additions and 1 deletions

View File

@ -763,6 +763,10 @@ void LevelCompactionPicker::PickFilesMarkedForCompactionExperimental(
*level = level_file.first; *level = level_file.first;
*output_level = (*level == 0) ? vstorage->base_level() : *level + 1; *output_level = (*level == 0) ? vstorage->base_level() : *level + 1;
if (*level == 0 && !level0_compactions_in_progress_.empty()) {
return false;
}
inputs->files = {level_file.second}; inputs->files = {level_file.second};
inputs->level = *level; inputs->level = *level;
return ExpandWhileOverlapping(cf_name, vstorage, inputs); return ExpandWhileOverlapping(cf_name, vstorage, inputs);

View File

@ -13357,7 +13357,6 @@ TEST_F(DBTest, FlushesInParallelWithCompactRange) {
// iter == 1 -- leveled, but throw in a flush between two levels compacting // iter == 1 -- leveled, but throw in a flush between two levels compacting
// iter == 2 -- universal // iter == 2 -- universal
for (int iter = 0; iter < 3; ++iter) { for (int iter = 0; iter < 3; ++iter) {
printf("iter %d\n", iter);
Options options = CurrentOptions(); Options options = CurrentOptions();
if (iter < 2) { if (iter < 2) {
options.compaction_style = kCompactionStyleLevel; options.compaction_style = kCompactionStyleLevel;
@ -13447,6 +13446,56 @@ TEST_F(DBTest, UniversalCompactionTargetLevel) {
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
} }
// This tests for a bug that could cause two level0 compactions running
// concurrently
TEST_F(DBTest, SuggestCompactRangeNoTwoLevel0Compactions) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 110 << 10;
options.level0_file_num_compaction_trigger = 4;
options.num_levels = 4;
options.compression = kNoCompression;
options.max_bytes_for_level_base = 450 << 10;
options.target_file_size_base = 98 << 10;
options.max_write_buffer_number = 2;
options.max_background_compactions = 2;
DestroyAndReopen(options);
// fill up the DB
Random rnd(301);
for (int num = 0; num < 10; num++) {
GenerateNewRandomFile(&rnd);
}
db_->CompactRange(nullptr, nullptr);
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"CompactionJob::Run():Start",
"DBTest::SuggestCompactRangeNoTwoLevel0Compactions:1"},
{"DBTest::SuggestCompactRangeNoTwoLevel0Compactions:2",
"CompactionJob::Run():End"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// trigger L0 compaction
for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
num++) {
GenerateNewRandomFile(&rnd, /* nowait */ true);
}
TEST_SYNC_POINT("DBTest::SuggestCompactRangeNoTwoLevel0Compactions:1");
GenerateNewRandomFile(&rnd, /* nowait */ true);
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
num++) {
GenerateNewRandomFile(&rnd, /* nowait */ true);
}
TEST_SYNC_POINT("DBTest::SuggestCompactRangeNoTwoLevel0Compactions:2");
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {