Handling edge cases for ReFitLevel

Summary:
Right now the level we pass to ReFitLevel is the maximum level with files (before compaction), there are multiple cases where this maximum level have changed after compaction
- all files where in L0 (now maximum level is L1)
- using kCompactionStyleUniversal (now maximum level in the last level)
- level_compaction_dynamic_level_bytes ??

We can handle each of these cases individually, but I felt it's safer to calculate max_level_with_files again if we want to do a ReFitLevel

Test Plan:
adding some tests
make -j64 check

Reviewers: igor, sdong

Reviewed By: sdong

Subscribers: ott, dhruba

Differential Revision: https://reviews.facebook.net/D39663
This commit is contained in:
Islam AbdelRahman 2015-06-11 14:15:52 -07:00
parent bffaf0a8b2
commit 73faa3d41d
2 changed files with 99 additions and 5 deletions

View File

@ -1353,14 +1353,17 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
} }
} }
int final_output_level = 0;
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal && if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
cfd->NumberLevels() > 1) { cfd->NumberLevels() > 1) {
// Always compact all files together. // Always compact all files together.
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
cfd->NumberLevels() - 1, target_path_id, begin, cfd->NumberLevels() - 1, target_path_id, begin,
end); end);
final_output_level = cfd->NumberLevels() - 1;
} else { } else {
for (int level = 0; level <= max_level_with_files; level++) { for (int level = 0; level <= max_level_with_files; level++) {
int output_level;
// in case the compaction is unversal or if we're compacting the // in case the compaction is unversal or if we're compacting the
// bottom-most level, the output level will be the same as input one. // bottom-most level, the output level will be the same as input one.
// level 0 can never be the bottommost level (i.e. if all files are in // level 0 can never be the bottommost level (i.e. if all files are in
@ -1368,20 +1371,25 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO || cfd->ioptions()->compaction_style == kCompactionStyleFIFO ||
(level == max_level_with_files && level > 0)) { (level == max_level_with_files && level > 0)) {
s = RunManualCompaction(cfd, level, level, target_path_id, begin, end); output_level = level;
} else { } else {
int output_level = level + 1; output_level = level + 1;
if (cfd->ioptions()->compaction_style == kCompactionStyleLevel && if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
cfd->ioptions()->level_compaction_dynamic_level_bytes && cfd->ioptions()->level_compaction_dynamic_level_bytes &&
level == 0) { level == 0) {
output_level = ColumnFamilyData::kCompactToBaseLevel; output_level = ColumnFamilyData::kCompactToBaseLevel;
} }
s = RunManualCompaction(cfd, level, output_level, target_path_id, begin,
end);
} }
s = RunManualCompaction(cfd, level, output_level, target_path_id, begin,
end);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
final_output_level = cfd->NumberLevels() - 1;
} else if (output_level > final_output_level) {
final_output_level = output_level;
}
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1"); TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2"); TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
} }
@ -1392,7 +1400,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
} }
if (change_level) { if (change_level) {
s = ReFitLevel(cfd, max_level_with_files, target_level); s = ReFitLevel(cfd, final_output_level, target_level);
} }
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);

View File

@ -3895,6 +3895,60 @@ TEST_F(DBTest, TrivialMoveNonOverlappingFiles) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(DBTest, TrivialMoveTargetLevel) {
int32_t trivial_move = 0;
int32_t non_trivial_move = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:TrivialMove",
[&](void* arg) { trivial_move++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial",
[&](void* arg) { non_trivial_move++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.write_buffer_size = 10 * 1024 * 1024;
options.num_levels = 7;
DestroyAndReopen(options);
int32_t value_size = 10 * 1024; // 10 KB
// Add 2 non-overlapping files
Random rnd(301);
std::map<int32_t, std::string> values;
// file 1 [0 => 300]
for (int32_t i = 0; i <= 300; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// file 2 [600 => 700]
for (int32_t i = 600; i <= 700; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// 2 files in L0
ASSERT_EQ("2", FilesPerLevel(0));
ASSERT_OK(db_->CompactRange(nullptr, nullptr, true, 6));
// 2 files in L6
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
ASSERT_EQ(trivial_move, 1);
ASSERT_EQ(non_trivial_move, 0);
for (int32_t i = 0; i <= 300; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
for (int32_t i = 600; i <= 700; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
}
TEST_F(DBTest, CompactionTrigger) { TEST_F(DBTest, CompactionTrigger) {
Options options; Options options;
options.write_buffer_size = 100<<10; //100KB options.write_buffer_size = 100<<10; //100KB
@ -13361,6 +13415,38 @@ TEST_F(DBTest, FlushesInParallelWithCompactRange) {
} }
} }
TEST_F(DBTest, UniversalCompactionTargetLevel) {
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100 << 10; // 100KB
options.num_levels = 7;
options.disable_auto_compactions = true;
options = CurrentOptions(options);
DestroyAndReopen(options);
// Generate 3 overlapping files
Random rnd(301);
for (int i = 0; i < 210; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
}
ASSERT_OK(Flush());
for (int i = 200; i < 300; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
}
ASSERT_OK(Flush());
for (int i = 250; i < 260; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
}
ASSERT_OK(Flush());
ASSERT_EQ("3", FilesPerLevel(0));
// Compact all files into 1 file and put it in L4
db_->CompactRange(nullptr, nullptr, true, 4);
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {