diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index dfa0081e9..f005d979f 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -82,33 +82,51 @@ void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) { } } -void CompactionPicker::GetRange(const std::vector& inputs, +void CompactionPicker::GetRange(const CompactionInputFiles& inputs, InternalKey* smallest, InternalKey* largest) { + const int level = inputs.level; assert(!inputs.empty()); smallest->Clear(); largest->Clear(); - for (size_t i = 0; i < inputs.size(); i++) { - FileMetaData* f = inputs[i]; - if (i == 0) { - *smallest = f->smallest; - *largest = f->largest; - } else { - if (icmp_->Compare(f->smallest, *smallest) < 0) { + + if (level == 0) { + for (size_t i = 0; i < inputs.size(); i++) { + FileMetaData* f = inputs[i]; + if (i == 0) { *smallest = f->smallest; - } - if (icmp_->Compare(f->largest, *largest) > 0) { *largest = f->largest; + } else { + if (icmp_->Compare(f->smallest, *smallest) < 0) { + *smallest = f->smallest; + } + if (icmp_->Compare(f->largest, *largest) > 0) { + *largest = f->largest; + } } } + } else { + *smallest = inputs[0]->smallest; + *largest = inputs[inputs.size() - 1]->largest; } } -void CompactionPicker::GetRange(const std::vector& inputs1, - const std::vector& inputs2, +void CompactionPicker::GetRange(const CompactionInputFiles& inputs1, + const CompactionInputFiles& inputs2, InternalKey* smallest, InternalKey* largest) { - std::vector all = inputs1; - all.insert(all.end(), inputs2.begin(), inputs2.end()); - GetRange(all, smallest, largest); + assert(!inputs1.empty() || !inputs2.empty()); + if (inputs1.empty()) { + GetRange(inputs2, smallest, largest); + } else if (inputs2.empty()) { + GetRange(inputs1, smallest, largest); + } else { + InternalKey smallest1, smallest2, largest1, largest2; + GetRange(inputs1, &smallest1, &largest1); + GetRange(inputs2, &smallest2, &largest2); + *smallest = icmp_->Compare(smallest1, smallest2) < 0 ? + smallest1 : smallest2; + *largest = icmp_->Compare(largest1, largest2) < 0 ? + largest2 : largest1; + } } bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, @@ -133,7 +151,7 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, size_t old_size; do { old_size = inputs->size(); - GetRange(inputs->files, &smallest, &largest); + GetRange(*inputs, &smallest, &largest); inputs->clear(); vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files, hint_index, &hint_index); @@ -277,7 +295,7 @@ bool CompactionPicker::SetupOtherInputs( InternalKey smallest, largest; // Get the range one last time. - GetRange(inputs->files, &smallest, &largest); + GetRange(*inputs, &smallest, &largest); // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to // include in compaction @@ -295,23 +313,24 @@ bool CompactionPicker::SetupOtherInputs( // user key, while excluding other entries for the same user key. This // can happen when one user key spans multiple files. if (!output_level_inputs->empty()) { - std::vector expanded0; + CompactionInputFiles expanded0; + expanded0.level = input_level; // Get entire range covered by compaction InternalKey all_start, all_limit; - GetRange(inputs->files, output_level_inputs->files, &all_start, &all_limit); + GetRange(*inputs, *output_level_inputs, &all_start, &all_limit); vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit, - &expanded0, base_index, nullptr); + &expanded0.files, base_index, nullptr); const uint64_t inputs0_size = TotalCompensatedFileSize(inputs->files); const uint64_t inputs1_size = TotalCompensatedFileSize(output_level_inputs->files); - const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0); + const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0.files); uint64_t limit = mutable_cf_options.ExpandedCompactionByteSizeLimit(input_level); if (expanded0.size() > inputs->size() && inputs1_size + expanded0_size < limit && - !FilesInCompaction(expanded0) && - !vstorage->HasOverlappingUserKey(&expanded0, input_level)) { + !FilesInCompaction(expanded0.files) && + !vstorage->HasOverlappingUserKey(&expanded0.files, input_level)) { InternalKey new_start, new_limit; GetRange(expanded0, &new_start, &new_limit); std::vector expanded1; @@ -327,7 +346,7 @@ bool CompactionPicker::SetupOtherInputs( expanded0.size(), expanded1.size(), expanded0_size, inputs1_size); smallest = new_start; largest = new_limit; - inputs->files = expanded0; + inputs->files = expanded0.files; output_level_inputs->files = expanded1; } } @@ -341,7 +360,7 @@ void CompactionPicker::GetGrandparents( const CompactionInputFiles& output_level_inputs, std::vector* grandparents) { InternalKey start, limit; - GetRange(inputs.files, output_level_inputs.files, &start, &limit); + GetRange(inputs, output_level_inputs, &start, &limit); // Compute the set of grandparent files that overlap this compaction // (parent == level+1; grandparent == level+2) if (output_level_inputs.level + 1 < NumberLevels()) { @@ -788,7 +807,7 @@ Compaction* LevelCompactionPicker::PickCompaction( if (level == 0) { assert(level0_compactions_in_progress_.empty()); InternalKey smallest, largest; - GetRange(inputs.files, &smallest, &largest); + GetRange(inputs, &smallest, &largest); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. @@ -798,7 +817,7 @@ Compaction* LevelCompactionPicker::PickCompaction( // If we include more L0 files in the same compaction run it can // cause the 'smallest' and 'largest' key to get extended to a // larger range. So, re-invoke GetRange to get the new key range - GetRange(inputs.files, &smallest, &largest); + GetRange(inputs, &smallest, &largest); if (RangeInCompaction(vstorage, &smallest, &largest, output_level, &parent_index)) { return nullptr; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index c1e9e712a..403410196 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -111,14 +111,14 @@ class CompactionPicker { // Stores the minimal range that covers all entries in inputs in // *smallest, *largest. // REQUIRES: inputs is not empty - void GetRange(const std::vector& inputs, InternalKey* smallest, - InternalKey* largest); + void GetRange(const CompactionInputFiles& inputs, + InternalKey* smallest, InternalKey* largest); // Stores the minimal range that covers all entries in inputs1 and inputs2 // in *smallest, *largest. // REQUIRES: inputs is not empty - void GetRange(const std::vector& inputs1, - const std::vector& inputs2, + void GetRange(const CompactionInputFiles& inputs1, + const CompactionInputFiles& inputs2, InternalKey* smallest, InternalKey* largest); // Add more files to the inputs on "level" to make sure that