Fix a bug that may cause a deleted row to appear again

Summary:
The previous fix of reappearing of a deleted row 0ce258f9b3 missed a corner case, which can be reproduced using test CompactionPickerTest.OverlappingUserKeys7. Consider such an example:

input level file: 1[B E] 2[F H]
output level file: 3[A C] 4[D I] 5[I K]

First file 2 is picked, which overlaps to file 4. 4 expands to 5. Now the all range is [D K] with 2 output level files. When we try to expand that, [D K] overlaps with file 1 and 2 in the input level, and 1 and 2 overlaps with 3 and 4 in the output level. So we end up with picking 3 and 4 in the output level. Without expanding, it also has 2 files, so we determine the output level doesn't change, although they are the different two files.

The fix is to expand the output level files after we picked 3 and 4. In that case, there will be three output level files so we will abort the expanding.

I also added two unit tests related to marked_for_compaction and being_compacted. They have been passing though.

Test Plan: Run the new unit test, as well as all other tests.

Reviewers: andrewkr, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: yoshinorim, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D65373
This commit is contained in:
sdong 2016-10-21 12:50:01 -07:00
parent 99c052a34f
commit 1168cb810a
2 changed files with 79 additions and 9 deletions

View File

@ -451,13 +451,14 @@ bool CompactionPicker::SetupOtherInputs(
vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
&output_level_inputs->files, *parent_index,
parent_index);
if (!output_level_inputs->empty()) {
ExpandWhileOverlapping(cf_name, vstorage, output_level_inputs);
}
if (FilesInCompaction(output_level_inputs->files)) {
return false;
}
if (!output_level_inputs->empty()) {
if (!ExpandWhileOverlapping(cf_name, vstorage, output_level_inputs)) {
return false;
}
}
// See if we can further grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up. We also choose NOT
@ -484,11 +485,15 @@ bool CompactionPicker::SetupOtherInputs(
!vstorage->HasOverlappingUserKey(&expanded0.files, input_level)) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
CompactionInputFiles expanded1;
expanded1.level = output_level;
vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
&expanded1, *parent_index, parent_index);
if (expanded1.size() == output_level_inputs->size() &&
!FilesInCompaction(expanded1)) {
&expanded1.files, *parent_index,
parent_index);
assert(!expanded1.empty());
if (!FilesInCompaction(expanded1.files) &&
ExpandWhileOverlapping(cf_name, vstorage, &expanded1) &&
expanded1.size() == output_level_inputs->size()) {
Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
"[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt "(%" PRIu64
"+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
@ -499,7 +504,7 @@ bool CompactionPicker::SetupOtherInputs(
smallest = new_start;
largest = new_limit;
inputs->files = expanded0.files;
output_level_inputs->files = expanded1;
output_level_inputs->files = expanded1.files;
}
}
}

View File

@ -121,6 +121,7 @@ class CompactionPickerTest : public testing::Test {
vstorage_->GenerateLevelFilesBrief();
vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
vstorage_->GenerateLevel0NonOverlapping();
vstorage_->ComputeFilesMarkedForCompaction();
vstorage_->SetFinalized();
}
};
@ -693,6 +694,70 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys4) {
ASSERT_EQ(7U, compaction->input(1, 0)->fd.GetNumber());
}
TEST_F(CompactionPickerTest, OverlappingUserKeys5) {
NewVersionStorage(6, kCompactionStyleLevel);
// Overlapping user keys on same level and output level
Add(1, 1U, "200", "400", 1000000000U);
Add(1, 2U, "400", "500", 1U, 0, 0);
Add(2, 3U, "000", "100", 1U);
Add(2, 4U, "100", "600", 1U, 0, 0);
Add(2, 5U, "600", "700", 1U, 0, 0);
vstorage_->LevelFiles(2)[2]->being_compacted = true;
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() == nullptr);
}
TEST_F(CompactionPickerTest, OverlappingUserKeys6) {
NewVersionStorage(6, kCompactionStyleLevel);
// Overlapping user keys on same level and output level
Add(1, 1U, "200", "400", 1U, 0, 0);
Add(1, 2U, "401", "500", 1U, 0, 0);
Add(2, 3U, "000", "100", 1U);
Add(2, 4U, "100", "300", 1U, 0, 0);
Add(2, 5U, "305", "450", 1U, 0, 0);
Add(2, 6U, "460", "600", 1U, 0, 0);
Add(2, 7U, "600", "700", 1U, 0, 0);
vstorage_->LevelFiles(1)[0]->marked_for_compaction = true;
vstorage_->LevelFiles(1)[1]->marked_for_compaction = true;
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(3U, compaction->num_input_files(1));
}
TEST_F(CompactionPickerTest, OverlappingUserKeys7) {
NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.max_compaction_bytes = 100000000000u;
// Overlapping user keys on same level and output level
Add(1, 1U, "200", "400", 1U, 0, 0);
Add(1, 2U, "401", "500", 1000000000U, 0, 0);
Add(2, 3U, "100", "250", 1U);
Add(2, 4U, "300", "600", 1U, 0, 0);
Add(2, 5U, "600", "800", 1U, 0, 0);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_GE(1U, compaction->num_input_files(0));
ASSERT_GE(2U, compaction->num_input_files(1));
// File 5 has to be included in the compaction
ASSERT_EQ(5U, compaction->inputs(1)->back()->fd.GetNumber());
}
TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri1) {
NewVersionStorage(6, kCompactionStyleLevel);
mutable_cf_options_.level0_file_num_compaction_trigger = 2;