Optimize GetRange Function

Summary: Optimize GetRange Function by checking the level of the files

Test Plan: pass make all check

Reviewers: rven, yhchiang, igor

Reviewed By: igor

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D37977
This commit is contained in:
Liangjun Feng 2015-05-05 09:57:47 -07:00
parent 36a7408896
commit 9aa011fa36
2 changed files with 50 additions and 31 deletions

View File

@ -82,11 +82,14 @@ void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
}
}
void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs,
void CompactionPicker::GetRange(const CompactionInputFiles& inputs,
InternalKey* smallest, InternalKey* largest) {
const int level = inputs.level;
assert(!inputs.empty());
smallest->Clear();
largest->Clear();
if (level == 0) {
for (size_t i = 0; i < inputs.size(); i++) {
FileMetaData* f = inputs[i];
if (i == 0) {
@ -101,14 +104,29 @@ void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs,
}
}
}
} else {
*smallest = inputs[0]->smallest;
*largest = inputs[inputs.size() - 1]->largest;
}
}
void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
void CompactionPicker::GetRange(const CompactionInputFiles& inputs1,
const CompactionInputFiles& inputs2,
InternalKey* smallest, InternalKey* largest) {
std::vector<FileMetaData*> all = inputs1;
all.insert(all.end(), inputs2.begin(), inputs2.end());
GetRange(all, smallest, largest);
assert(!inputs1.empty() || !inputs2.empty());
if (inputs1.empty()) {
GetRange(inputs2, smallest, largest);
} else if (inputs2.empty()) {
GetRange(inputs1, smallest, largest);
} else {
InternalKey smallest1, smallest2, largest1, largest2;
GetRange(inputs1, &smallest1, &largest1);
GetRange(inputs2, &smallest2, &largest2);
*smallest = icmp_->Compare(smallest1, smallest2) < 0 ?
smallest1 : smallest2;
*largest = icmp_->Compare(largest1, largest2) < 0 ?
largest2 : largest1;
}
}
bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name,
@ -133,7 +151,7 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name,
size_t old_size;
do {
old_size = inputs->size();
GetRange(inputs->files, &smallest, &largest);
GetRange(*inputs, &smallest, &largest);
inputs->clear();
vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files,
hint_index, &hint_index);
@ -277,7 +295,7 @@ bool CompactionPicker::SetupOtherInputs(
InternalKey smallest, largest;
// Get the range one last time.
GetRange(inputs->files, &smallest, &largest);
GetRange(*inputs, &smallest, &largest);
// Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
// include in compaction
@ -295,23 +313,24 @@ bool CompactionPicker::SetupOtherInputs(
// user key, while excluding other entries for the same user key. This
// can happen when one user key spans multiple files.
if (!output_level_inputs->empty()) {
std::vector<FileMetaData*> expanded0;
CompactionInputFiles expanded0;
expanded0.level = input_level;
// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange(inputs->files, output_level_inputs->files, &all_start, &all_limit);
GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
&expanded0, base_index, nullptr);
&expanded0.files, base_index, nullptr);
const uint64_t inputs0_size = TotalCompensatedFileSize(inputs->files);
const uint64_t inputs1_size =
TotalCompensatedFileSize(output_level_inputs->files);
const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0);
const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0.files);
uint64_t limit =
mutable_cf_options.ExpandedCompactionByteSizeLimit(input_level);
if (expanded0.size() > inputs->size() &&
inputs1_size + expanded0_size < limit &&
!FilesInCompaction(expanded0) &&
!vstorage->HasOverlappingUserKey(&expanded0, input_level)) {
!FilesInCompaction(expanded0.files) &&
!vstorage->HasOverlappingUserKey(&expanded0.files, input_level)) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
@ -327,7 +346,7 @@ bool CompactionPicker::SetupOtherInputs(
expanded0.size(), expanded1.size(), expanded0_size, inputs1_size);
smallest = new_start;
largest = new_limit;
inputs->files = expanded0;
inputs->files = expanded0.files;
output_level_inputs->files = expanded1;
}
}
@ -341,7 +360,7 @@ void CompactionPicker::GetGrandparents(
const CompactionInputFiles& output_level_inputs,
std::vector<FileMetaData*>* grandparents) {
InternalKey start, limit;
GetRange(inputs.files, output_level_inputs.files, &start, &limit);
GetRange(inputs, output_level_inputs, &start, &limit);
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (output_level_inputs.level + 1 < NumberLevels()) {
@ -788,7 +807,7 @@ Compaction* LevelCompactionPicker::PickCompaction(
if (level == 0) {
assert(level0_compactions_in_progress_.empty());
InternalKey smallest, largest;
GetRange(inputs.files, &smallest, &largest);
GetRange(inputs, &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
@ -798,7 +817,7 @@ Compaction* LevelCompactionPicker::PickCompaction(
// If we include more L0 files in the same compaction run it can
// cause the 'smallest' and 'largest' key to get extended to a
// larger range. So, re-invoke GetRange to get the new key range
GetRange(inputs.files, &smallest, &largest);
GetRange(inputs, &smallest, &largest);
if (RangeInCompaction(vstorage, &smallest, &largest, output_level,
&parent_index)) {
return nullptr;

View File

@ -111,14 +111,14 @@ class CompactionPicker {
// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
// REQUIRES: inputs is not empty
void GetRange(const std::vector<FileMetaData*>& inputs, InternalKey* smallest,
InternalKey* largest);
void GetRange(const CompactionInputFiles& inputs,
InternalKey* smallest, InternalKey* largest);
// Stores the minimal range that covers all entries in inputs1 and inputs2
// in *smallest, *largest.
// REQUIRES: inputs is not empty
void GetRange(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
void GetRange(const CompactionInputFiles& inputs1,
const CompactionInputFiles& inputs2,
InternalKey* smallest, InternalKey* largest);
// Add more files to the inputs on "level" to make sure that