Merge branch 'master' into performance
This patch merges master's changes on build_tools/format-diff.sh. Conflicts: db/version_edit.cc
This commit is contained in:
commit
d4f65f1683
@ -1,5 +1,4 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
set -e
|
|
||||||
# If clang_format_diff.py command is not specfied, we assume we are able to
|
# If clang_format_diff.py command is not specfied, we assume we are able to
|
||||||
# access directly without any path.
|
# access directly without any path.
|
||||||
if [ -z $CLANG_FORMAT_DIFF ]
|
if [ -z $CLANG_FORMAT_DIFF ]
|
||||||
@ -12,7 +11,7 @@ if ! which $CLANG_FORMAT_DIFF &> /dev/null
|
|||||||
then
|
then
|
||||||
echo "You didn't have clang-format-diff.py available in your computer!"
|
echo "You didn't have clang-format-diff.py available in your computer!"
|
||||||
echo "You can download it by running: "
|
echo "You can download it by running: "
|
||||||
echo " curl https://fburl.com/clang-format-diff"
|
echo " curl http://goo.gl/iUW1u2"
|
||||||
exit 128
|
exit 128
|
||||||
fi
|
fi
|
||||||
|
|
||||||
@ -49,8 +48,22 @@ fi
|
|||||||
# fi
|
# fi
|
||||||
# fi
|
# fi
|
||||||
|
|
||||||
# Check the format of recently changed lines,
|
set -e
|
||||||
diffs=$(git diff -U0 HEAD^ | $CLANG_FORMAT_DIFF -p 1)
|
|
||||||
|
uncommitted_code=`git diff HEAD`
|
||||||
|
|
||||||
|
# If there's no uncommitted changes, we assume user are doing post-commit
|
||||||
|
# format check, in which case we'll check the modified lines from latest commit.
|
||||||
|
# Otherwise, we'll check format of the uncommitted code only.
|
||||||
|
format_last_commit=0
|
||||||
|
if [ -z "$uncommitted_code" ]
|
||||||
|
then
|
||||||
|
# Check the format of last commit
|
||||||
|
diffs=$(git diff -U0 HEAD^ | $CLANG_FORMAT_DIFF -p 1)
|
||||||
|
else
|
||||||
|
# Check the format of uncommitted lines,
|
||||||
|
diffs=$(git diff -U0 HEAD | $CLANG_FORMAT_DIFF -p 1)
|
||||||
|
fi
|
||||||
|
|
||||||
if [ -z "$diffs" ]
|
if [ -z "$diffs" ]
|
||||||
then
|
then
|
||||||
@ -81,3 +94,16 @@ fi
|
|||||||
|
|
||||||
# Do in-place format adjustment.
|
# Do in-place format adjustment.
|
||||||
git diff -U0 HEAD^ | $CLANG_FORMAT_DIFF -i -p 1
|
git diff -U0 HEAD^ | $CLANG_FORMAT_DIFF -i -p 1
|
||||||
|
echo "Files reformatted!"
|
||||||
|
|
||||||
|
# Amend to last commit if user do the post-commit format check
|
||||||
|
if [ -z "$uncommitted_code" ]; then
|
||||||
|
echo -e "Would you like to amend the changes to last commit (`git log HEAD --oneline | head -1`)? (y/n): \c"
|
||||||
|
read to_amend
|
||||||
|
|
||||||
|
if [ "$to_amend" == "y" ]
|
||||||
|
then
|
||||||
|
git commit -a --amend --reuse-message HEAD
|
||||||
|
echo "Amended to last commit"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
@ -76,6 +76,9 @@ class Compaction {
|
|||||||
private:
|
private:
|
||||||
friend class Version;
|
friend class Version;
|
||||||
friend class VersionSet;
|
friend class VersionSet;
|
||||||
|
friend class CompactionPicker;
|
||||||
|
friend class UniversalCompactionPicker;
|
||||||
|
friend class LevelCompactionPicker;
|
||||||
|
|
||||||
Compaction(Version* input_version, int level, int out_level,
|
Compaction(Version* input_version, int level, int out_level,
|
||||||
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
|
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
|
||||||
|
854
db/compaction_picker.cc
Normal file
854
db/compaction_picker.cc
Normal file
@ -0,0 +1,854 @@
|
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
|
||||||
|
#include "db/compaction_picker.h"
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
|
||||||
|
uint64_t sum = 0;
|
||||||
|
for (size_t i = 0; i < files.size() && files[i]; i++) {
|
||||||
|
sum += files[i]->file_size;
|
||||||
|
}
|
||||||
|
return sum;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // anonymous namespace
|
||||||
|
|
||||||
|
CompactionPicker::CompactionPicker(const Options* options,
|
||||||
|
const InternalKeyComparator* icmp)
|
||||||
|
: compactions_in_progress_(options->num_levels),
|
||||||
|
options_(options),
|
||||||
|
num_levels_(options->num_levels),
|
||||||
|
icmp_(icmp) {
|
||||||
|
Init();
|
||||||
|
}
|
||||||
|
|
||||||
|
void CompactionPicker::ReduceNumberOfLevels(int new_levels) {
|
||||||
|
num_levels_ = new_levels;
|
||||||
|
Init();
|
||||||
|
}
|
||||||
|
|
||||||
|
void CompactionPicker::Init() {
|
||||||
|
max_file_size_.reset(new uint64_t[NumberLevels()]);
|
||||||
|
level_max_bytes_.reset(new uint64_t[NumberLevels()]);
|
||||||
|
int target_file_size_multiplier = options_->target_file_size_multiplier;
|
||||||
|
int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
|
||||||
|
for (int i = 0; i < NumberLevels(); i++) {
|
||||||
|
if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) {
|
||||||
|
max_file_size_[i] = ULLONG_MAX;
|
||||||
|
level_max_bytes_[i] = options_->max_bytes_for_level_base;
|
||||||
|
} else if (i > 1) {
|
||||||
|
max_file_size_[i] = max_file_size_[i - 1] * target_file_size_multiplier;
|
||||||
|
level_max_bytes_[i] =
|
||||||
|
level_max_bytes_[i - 1] * max_bytes_multiplier *
|
||||||
|
options_->max_bytes_for_level_multiplier_additional[i - 1];
|
||||||
|
} else {
|
||||||
|
max_file_size_[i] = options_->target_file_size_base;
|
||||||
|
level_max_bytes_[i] = options_->max_bytes_for_level_base;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
CompactionPicker::~CompactionPicker() {}
|
||||||
|
|
||||||
|
void CompactionPicker::SizeBeingCompacted(std::vector<uint64_t>& sizes) {
|
||||||
|
for (int level = 0; level < NumberLevels() - 1; level++) {
|
||||||
|
uint64_t total = 0;
|
||||||
|
for (auto c : compactions_in_progress_[level]) {
|
||||||
|
assert(c->level() == level);
|
||||||
|
for (int i = 0; i < c->num_input_files(0); i++) {
|
||||||
|
total += c->input(0,i)->file_size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sizes[level] = total;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear all files to indicate that they are not being compacted
|
||||||
|
// Delete this compaction from the list of running compactions.
|
||||||
|
void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
|
||||||
|
c->MarkFilesBeingCompacted(false);
|
||||||
|
compactions_in_progress_[c->level()].erase(c);
|
||||||
|
if (!status.ok()) {
|
||||||
|
c->ResetNextCompactionIndex();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t CompactionPicker::MaxFileSizeForLevel(int level) const {
|
||||||
|
assert(level >= 0);
|
||||||
|
assert(level < NumberLevels());
|
||||||
|
return max_file_size_[level];
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t CompactionPicker::MaxGrandParentOverlapBytes(int level) {
|
||||||
|
uint64_t result = MaxFileSizeForLevel(level);
|
||||||
|
result *= options_->max_grandparent_overlap_factor;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
double CompactionPicker::MaxBytesForLevel(int level) {
|
||||||
|
// Note: the result for level zero is not really used since we set
|
||||||
|
// the level-0 compaction threshold based on number of files.
|
||||||
|
assert(level >= 0);
|
||||||
|
assert(level < NumberLevels());
|
||||||
|
return level_max_bytes_[level];
|
||||||
|
}
|
||||||
|
|
||||||
|
void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs,
|
||||||
|
InternalKey* smallest, InternalKey* largest) {
|
||||||
|
assert(!inputs.empty());
|
||||||
|
smallest->Clear();
|
||||||
|
largest->Clear();
|
||||||
|
for (size_t i = 0; i < inputs.size(); i++) {
|
||||||
|
FileMetaData* f = inputs[i];
|
||||||
|
if (i == 0) {
|
||||||
|
*smallest = f->smallest;
|
||||||
|
*largest = f->largest;
|
||||||
|
} else {
|
||||||
|
if (icmp_->Compare(f->smallest, *smallest) < 0) {
|
||||||
|
*smallest = f->smallest;
|
||||||
|
}
|
||||||
|
if (icmp_->Compare(f->largest, *largest) > 0) {
|
||||||
|
*largest = f->largest;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs1,
|
||||||
|
const std::vector<FileMetaData*>& inputs2,
|
||||||
|
InternalKey* smallest, InternalKey* largest) {
|
||||||
|
std::vector<FileMetaData*> all = inputs1;
|
||||||
|
all.insert(all.end(), inputs2.begin(), inputs2.end());
|
||||||
|
GetRange(all, smallest, largest);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add more files to the inputs on "level" to make sure that
|
||||||
|
// no newer version of a key is compacted to "level+1" while leaving an older
|
||||||
|
// version in a "level". Otherwise, any Get() will search "level" first,
|
||||||
|
// and will likely return an old/stale value for the key, since it always
|
||||||
|
// searches in increasing order of level to find the value. This could
|
||||||
|
// also scramble the order of merge operands. This function should be
|
||||||
|
// called any time a new Compaction is created, and its inputs_[0] are
|
||||||
|
// populated.
|
||||||
|
//
|
||||||
|
// Will set c to nullptr if it is impossible to apply this compaction.
|
||||||
|
void CompactionPicker::ExpandWhileOverlapping(Compaction* c) {
|
||||||
|
// If inputs are empty then there is nothing to expand.
|
||||||
|
if (!c || c->inputs_[0].empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOverlappingInputs will always do the right thing for level-0.
|
||||||
|
// So we don't need to do any expansion if level == 0.
|
||||||
|
if (c->level() == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const int level = c->level();
|
||||||
|
InternalKey smallest, largest;
|
||||||
|
|
||||||
|
// Keep expanding c->inputs_[0] until we are sure that there is a
|
||||||
|
// "clean cut" boundary between the files in input and the surrounding files.
|
||||||
|
// This will ensure that no parts of a key are lost during compaction.
|
||||||
|
int hint_index = -1;
|
||||||
|
size_t old_size;
|
||||||
|
do {
|
||||||
|
old_size = c->inputs_[0].size();
|
||||||
|
GetRange(c->inputs_[0], &smallest, &largest);
|
||||||
|
c->inputs_[0].clear();
|
||||||
|
c->input_version_->GetOverlappingInputs(
|
||||||
|
level, &smallest, &largest, &c->inputs_[0], hint_index, &hint_index);
|
||||||
|
} while(c->inputs_[0].size() > old_size);
|
||||||
|
|
||||||
|
// Get the new range
|
||||||
|
GetRange(c->inputs_[0], &smallest, &largest);
|
||||||
|
|
||||||
|
// If, after the expansion, there are files that are already under
|
||||||
|
// compaction, then we must drop/cancel this compaction.
|
||||||
|
int parent_index = -1;
|
||||||
|
if (FilesInCompaction(c->inputs_[0]) ||
|
||||||
|
(c->level() != c->output_level() &&
|
||||||
|
ParentRangeInCompaction(c->input_version_, &smallest, &largest, level,
|
||||||
|
&parent_index))) {
|
||||||
|
c->inputs_[0].clear();
|
||||||
|
c->inputs_[1].clear();
|
||||||
|
delete c;
|
||||||
|
c = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t CompactionPicker::ExpandedCompactionByteSizeLimit(int level) {
|
||||||
|
uint64_t result = MaxFileSizeForLevel(level);
|
||||||
|
result *= options_->expanded_compaction_factor;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns true if any one of specified files are being compacted
|
||||||
|
bool CompactionPicker::FilesInCompaction(std::vector<FileMetaData*>& files) {
|
||||||
|
for (unsigned int i = 0; i < files.size(); i++) {
|
||||||
|
if (files[i]->being_compacted) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns true if any one of the parent files are being compacted
|
||||||
|
bool CompactionPicker::ParentRangeInCompaction(Version* version,
|
||||||
|
const InternalKey* smallest,
|
||||||
|
const InternalKey* largest,
|
||||||
|
int level, int* parent_index) {
|
||||||
|
std::vector<FileMetaData*> inputs;
|
||||||
|
assert(level + 1 < NumberLevels());
|
||||||
|
|
||||||
|
version->GetOverlappingInputs(level + 1, smallest, largest, &inputs,
|
||||||
|
*parent_index, parent_index);
|
||||||
|
return FilesInCompaction(inputs);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populates the set of inputs from "level+1" that overlap with "level".
|
||||||
|
// Will also attempt to expand "level" if that doesn't expand "level+1"
|
||||||
|
// or cause "level" to include a file for compaction that has an overlapping
|
||||||
|
// user-key with another file.
|
||||||
|
void CompactionPicker::SetupOtherInputs(Compaction* c) {
|
||||||
|
// If inputs are empty, then there is nothing to expand.
|
||||||
|
// If both input and output levels are the same, no need to consider
|
||||||
|
// files at level "level+1"
|
||||||
|
if (c->inputs_[0].empty() || c->level() == c->output_level()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const int level = c->level();
|
||||||
|
InternalKey smallest, largest;
|
||||||
|
|
||||||
|
// Get the range one last time.
|
||||||
|
GetRange(c->inputs_[0], &smallest, &largest);
|
||||||
|
|
||||||
|
// Populate the set of next-level files (inputs_[1]) to include in compaction
|
||||||
|
c->input_version_->GetOverlappingInputs(level + 1, &smallest, &largest,
|
||||||
|
&c->inputs_[1], c->parent_index_,
|
||||||
|
&c->parent_index_);
|
||||||
|
|
||||||
|
// Get entire range covered by compaction
|
||||||
|
InternalKey all_start, all_limit;
|
||||||
|
GetRange(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
|
||||||
|
|
||||||
|
// See if we can further grow the number of inputs in "level" without
|
||||||
|
// changing the number of "level+1" files we pick up. We also choose NOT
|
||||||
|
// to expand if this would cause "level" to include some entries for some
|
||||||
|
// user key, while excluding other entries for the same user key. This
|
||||||
|
// can happen when one user key spans multiple files.
|
||||||
|
if (!c->inputs_[1].empty()) {
|
||||||
|
std::vector<FileMetaData*> expanded0;
|
||||||
|
c->input_version_->GetOverlappingInputs(
|
||||||
|
level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr);
|
||||||
|
const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]);
|
||||||
|
const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]);
|
||||||
|
const uint64_t expanded0_size = TotalFileSize(expanded0);
|
||||||
|
uint64_t limit = ExpandedCompactionByteSizeLimit(level);
|
||||||
|
if (expanded0.size() > c->inputs_[0].size() &&
|
||||||
|
inputs1_size + expanded0_size < limit &&
|
||||||
|
!FilesInCompaction(expanded0) &&
|
||||||
|
!c->input_version_->HasOverlappingUserKey(&expanded0, level)) {
|
||||||
|
InternalKey new_start, new_limit;
|
||||||
|
GetRange(expanded0, &new_start, &new_limit);
|
||||||
|
std::vector<FileMetaData*> expanded1;
|
||||||
|
c->input_version_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
|
||||||
|
&expanded1, c->parent_index_,
|
||||||
|
&c->parent_index_);
|
||||||
|
if (expanded1.size() == c->inputs_[1].size() &&
|
||||||
|
!FilesInCompaction(expanded1)) {
|
||||||
|
Log(options_->info_log,
|
||||||
|
"Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)"
|
||||||
|
"\n",
|
||||||
|
(unsigned long)level,
|
||||||
|
(unsigned long)(c->inputs_[0].size()),
|
||||||
|
(unsigned long)(c->inputs_[1].size()),
|
||||||
|
(unsigned long)inputs0_size,
|
||||||
|
(unsigned long)inputs1_size,
|
||||||
|
(unsigned long)(expanded0.size()),
|
||||||
|
(unsigned long)(expanded1.size()),
|
||||||
|
(unsigned long)expanded0_size,
|
||||||
|
(unsigned long)inputs1_size);
|
||||||
|
smallest = new_start;
|
||||||
|
largest = new_limit;
|
||||||
|
c->inputs_[0] = expanded0;
|
||||||
|
c->inputs_[1] = expanded1;
|
||||||
|
GetRange(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute the set of grandparent files that overlap this compaction
|
||||||
|
// (parent == level+1; grandparent == level+2)
|
||||||
|
if (level + 2 < NumberLevels()) {
|
||||||
|
c->input_version_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
|
||||||
|
&c->grandparents_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
|
||||||
|
int output_level,
|
||||||
|
const InternalKey* begin,
|
||||||
|
const InternalKey* end,
|
||||||
|
InternalKey** compaction_end) {
|
||||||
|
std::vector<FileMetaData*> inputs;
|
||||||
|
bool covering_the_whole_range = true;
|
||||||
|
|
||||||
|
// All files are 'overlapping' in universal style compaction.
|
||||||
|
// We have to compact the entire range in one shot.
|
||||||
|
if (options_->compaction_style == kCompactionStyleUniversal) {
|
||||||
|
begin = nullptr;
|
||||||
|
end = nullptr;
|
||||||
|
}
|
||||||
|
version->GetOverlappingInputs(input_level, begin, end, &inputs);
|
||||||
|
if (inputs.empty()) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Avoid compacting too much in one shot in case the range is large.
|
||||||
|
// But we cannot do this for level-0 since level-0 files can overlap
|
||||||
|
// and we must not pick one file and drop another older file if the
|
||||||
|
// two files overlap.
|
||||||
|
if (input_level > 0) {
|
||||||
|
const uint64_t limit =
|
||||||
|
MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
|
||||||
|
uint64_t total = 0;
|
||||||
|
for (size_t i = 0; i + 1 < inputs.size(); ++i) {
|
||||||
|
uint64_t s = inputs[i]->file_size;
|
||||||
|
total += s;
|
||||||
|
if (total >= limit) {
|
||||||
|
**compaction_end = inputs[i + 1]->smallest;
|
||||||
|
covering_the_whole_range = false;
|
||||||
|
inputs.resize(i + 1);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Compaction* c = new Compaction(version, input_level, output_level,
|
||||||
|
MaxFileSizeForLevel(output_level),
|
||||||
|
MaxGrandParentOverlapBytes(input_level));
|
||||||
|
|
||||||
|
c->inputs_[0] = inputs;
|
||||||
|
ExpandWhileOverlapping(c);
|
||||||
|
if (c == nullptr) {
|
||||||
|
Log(options_->info_log, "Could not compact due to expansion failure.\n");
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
SetupOtherInputs(c);
|
||||||
|
|
||||||
|
if (covering_the_whole_range) {
|
||||||
|
*compaction_end = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// These files that are to be manaully compacted do not trample
|
||||||
|
// upon other files because manual compactions are processed when
|
||||||
|
// the system has a max of 1 background compaction thread.
|
||||||
|
c->MarkFilesBeingCompacted(true);
|
||||||
|
|
||||||
|
// Is this compaction creating a file at the bottommost level
|
||||||
|
c->SetupBottomMostLevel(true);
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
Compaction* LevelCompactionPicker::PickCompaction(Version* version) {
|
||||||
|
Compaction* c = nullptr;
|
||||||
|
int level = -1;
|
||||||
|
|
||||||
|
// Compute the compactions needed. It is better to do it here
|
||||||
|
// and also in LogAndApply(), otherwise the values could be stale.
|
||||||
|
std::vector<uint64_t> size_being_compacted(NumberLevels() - 1);
|
||||||
|
SizeBeingCompacted(size_being_compacted);
|
||||||
|
version->Finalize(size_being_compacted);
|
||||||
|
|
||||||
|
// We prefer compactions triggered by too much data in a level over
|
||||||
|
// the compactions triggered by seeks.
|
||||||
|
//
|
||||||
|
// Find the compactions by size on all levels.
|
||||||
|
for (int i = 0; i < NumberLevels() - 1; i++) {
|
||||||
|
assert(i == 0 ||
|
||||||
|
version->compaction_score_[i] <= version->compaction_score_[i - 1]);
|
||||||
|
level = version->compaction_level_[i];
|
||||||
|
if ((version->compaction_score_[i] >= 1)) {
|
||||||
|
c = PickCompactionBySize(version, level, version->compaction_score_[i]);
|
||||||
|
ExpandWhileOverlapping(c);
|
||||||
|
if (c != nullptr) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find compactions needed by seeks
|
||||||
|
FileMetaData* f = version->file_to_compact_;
|
||||||
|
if (c == nullptr && f != nullptr && !f->being_compacted) {
|
||||||
|
|
||||||
|
level = version->file_to_compact_level_;
|
||||||
|
int parent_index = -1;
|
||||||
|
|
||||||
|
// Only allow one level 0 compaction at a time.
|
||||||
|
// Do not pick this file if its parents at level+1 are being compacted.
|
||||||
|
if (level != 0 || compactions_in_progress_[0].empty()) {
|
||||||
|
if (!ParentRangeInCompaction(version, &f->smallest, &f->largest, level,
|
||||||
|
&parent_index)) {
|
||||||
|
c = new Compaction(version, level, level + 1,
|
||||||
|
MaxFileSizeForLevel(level + 1),
|
||||||
|
MaxGrandParentOverlapBytes(level), true);
|
||||||
|
c->inputs_[0].push_back(f);
|
||||||
|
c->parent_index_ = parent_index;
|
||||||
|
c->input_version_->file_to_compact_ = nullptr;
|
||||||
|
ExpandWhileOverlapping(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c == nullptr) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Two level 0 compaction won't run at the same time, so don't need to worry
|
||||||
|
// about files on level 0 being compacted.
|
||||||
|
if (level == 0) {
|
||||||
|
assert(compactions_in_progress_[0].empty());
|
||||||
|
InternalKey smallest, largest;
|
||||||
|
GetRange(c->inputs_[0], &smallest, &largest);
|
||||||
|
// Note that the next call will discard the file we placed in
|
||||||
|
// c->inputs_[0] earlier and replace it with an overlapping set
|
||||||
|
// which will include the picked file.
|
||||||
|
c->inputs_[0].clear();
|
||||||
|
c->input_version_->GetOverlappingInputs(0, &smallest, &largest,
|
||||||
|
&c->inputs_[0]);
|
||||||
|
|
||||||
|
// If we include more L0 files in the same compaction run it can
|
||||||
|
// cause the 'smallest' and 'largest' key to get extended to a
|
||||||
|
// larger range. So, re-invoke GetRange to get the new key range
|
||||||
|
GetRange(c->inputs_[0], &smallest, &largest);
|
||||||
|
if (ParentRangeInCompaction(c->input_version_, &smallest, &largest, level,
|
||||||
|
&c->parent_index_)) {
|
||||||
|
delete c;
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
assert(!c->inputs_[0].empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup "level+1" files (inputs_[1])
|
||||||
|
SetupOtherInputs(c);
|
||||||
|
|
||||||
|
// mark all the files that are being compacted
|
||||||
|
c->MarkFilesBeingCompacted(true);
|
||||||
|
|
||||||
|
// Is this compaction creating a file at the bottommost level
|
||||||
|
c->SetupBottomMostLevel(false);
|
||||||
|
|
||||||
|
// remember this currently undergoing compaction
|
||||||
|
compactions_in_progress_[level].insert(c);
|
||||||
|
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
|
||||||
|
int level,
|
||||||
|
double score) {
|
||||||
|
Compaction* c = nullptr;
|
||||||
|
|
||||||
|
// level 0 files are overlapping. So we cannot pick more
|
||||||
|
// than one concurrent compactions at this level. This
|
||||||
|
// could be made better by looking at key-ranges that are
|
||||||
|
// being compacted at level 0.
|
||||||
|
if (level == 0 && compactions_in_progress_[level].size() == 1) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(level >= 0);
|
||||||
|
assert(level + 1 < NumberLevels());
|
||||||
|
c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1),
|
||||||
|
MaxGrandParentOverlapBytes(level));
|
||||||
|
c->score_ = score;
|
||||||
|
|
||||||
|
// Pick the largest file in this level that is not already
|
||||||
|
// being compacted
|
||||||
|
std::vector<int>& file_size = c->input_version_->files_by_size_[level];
|
||||||
|
|
||||||
|
// record the first file that is not yet compacted
|
||||||
|
int nextIndex = -1;
|
||||||
|
|
||||||
|
for (unsigned int i = c->input_version_->next_file_to_compact_by_size_[level];
|
||||||
|
i < file_size.size(); i++) {
|
||||||
|
int index = file_size[i];
|
||||||
|
FileMetaData* f = c->input_version_->files_[level][index];
|
||||||
|
|
||||||
|
// check to verify files are arranged in descending size
|
||||||
|
assert((i == file_size.size() - 1) ||
|
||||||
|
(i >= Version::number_of_files_to_sort_ - 1) ||
|
||||||
|
(f->file_size >=
|
||||||
|
c->input_version_->files_[level][file_size[i + 1]]->file_size));
|
||||||
|
|
||||||
|
// do not pick a file to compact if it is being compacted
|
||||||
|
// from n-1 level.
|
||||||
|
if (f->being_compacted) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// remember the startIndex for the next call to PickCompaction
|
||||||
|
if (nextIndex == -1) {
|
||||||
|
nextIndex = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
//if (i > Version::number_of_files_to_sort_) {
|
||||||
|
// Log(options_->info_log, "XXX Looking at index %d", i);
|
||||||
|
//}
|
||||||
|
|
||||||
|
// Do not pick this file if its parents at level+1 are being compacted.
|
||||||
|
// Maybe we can avoid redoing this work in SetupOtherInputs
|
||||||
|
int parent_index = -1;
|
||||||
|
if (ParentRangeInCompaction(c->input_version_, &f->smallest, &f->largest,
|
||||||
|
level, &parent_index)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
c->inputs_[0].push_back(f);
|
||||||
|
c->base_index_ = index;
|
||||||
|
c->parent_index_ = parent_index;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c->inputs_[0].empty()) {
|
||||||
|
delete c;
|
||||||
|
c = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// store where to start the iteration in the next call to PickCompaction
|
||||||
|
c->input_version_->next_file_to_compact_by_size_[level] = nextIndex;
|
||||||
|
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Universal style of compaction. Pick files that are contiguous in
|
||||||
|
// time-range to compact.
|
||||||
|
//
|
||||||
|
Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
|
||||||
|
int level = 0;
|
||||||
|
double score = version->compaction_score_[0];
|
||||||
|
|
||||||
|
if ((version->files_[level].size() <
|
||||||
|
(unsigned int)options_->level0_file_num_compaction_trigger)) {
|
||||||
|
Log(options_->info_log, "Universal: nothing to do\n");
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
Version::FileSummaryStorage tmp;
|
||||||
|
Log(options_->info_log, "Universal: candidate files(%lu): %s\n",
|
||||||
|
version->files_[level].size(),
|
||||||
|
version->LevelFileSummary(&tmp, 0));
|
||||||
|
|
||||||
|
// Check for size amplification first.
|
||||||
|
Compaction* c = PickCompactionUniversalSizeAmp(version, score);
|
||||||
|
if (c == nullptr) {
|
||||||
|
|
||||||
|
// Size amplification is within limits. Try reducing read
|
||||||
|
// amplification while maintaining file size ratios.
|
||||||
|
unsigned int ratio = options_->compaction_options_universal.size_ratio;
|
||||||
|
c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX);
|
||||||
|
|
||||||
|
// Size amplification and file size ratios are within configured limits.
|
||||||
|
// If max read amplification is exceeding configured limits, then force
|
||||||
|
// compaction without looking at filesize ratios and try to reduce
|
||||||
|
// the number of files to fewer than level0_file_num_compaction_trigger.
|
||||||
|
if (c == nullptr) {
|
||||||
|
unsigned int num_files = version->files_[level].size() -
|
||||||
|
options_->level0_file_num_compaction_trigger;
|
||||||
|
c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (c == nullptr) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
assert(c->inputs_[0].size() > 1);
|
||||||
|
|
||||||
|
// validate that all the chosen files are non overlapping in time
|
||||||
|
FileMetaData* newerfile __attribute__((unused)) = nullptr;
|
||||||
|
for (unsigned int i = 0; i < c->inputs_[0].size(); i++) {
|
||||||
|
FileMetaData* f = c->inputs_[0][i];
|
||||||
|
assert (f->smallest_seqno <= f->largest_seqno);
|
||||||
|
assert(newerfile == nullptr ||
|
||||||
|
newerfile->smallest_seqno > f->largest_seqno);
|
||||||
|
newerfile = f;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The files are sorted from newest first to oldest last.
|
||||||
|
std::vector<int>& file_by_time = c->input_version_->files_by_size_[level];
|
||||||
|
|
||||||
|
// Is the earliest file part of this compaction?
|
||||||
|
int last_index = file_by_time[file_by_time.size()-1];
|
||||||
|
FileMetaData* last_file = c->input_version_->files_[level][last_index];
|
||||||
|
if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) {
|
||||||
|
c->bottommost_level_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// update statistics
|
||||||
|
if (options_->statistics != nullptr) {
|
||||||
|
options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION,
|
||||||
|
c->inputs_[0].size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark all the files that are being compacted
|
||||||
|
c->MarkFilesBeingCompacted(true);
|
||||||
|
|
||||||
|
// remember this currently undergoing compaction
|
||||||
|
compactions_in_progress_[level].insert(c);
|
||||||
|
|
||||||
|
// Record whether this compaction includes all sst files.
|
||||||
|
// For now, it is only relevant in universal compaction mode.
|
||||||
|
c->is_full_compaction_ =
|
||||||
|
(c->inputs_[0].size() == c->input_version_->files_[0].size());
|
||||||
|
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Consider compaction files based on their size differences with
|
||||||
|
// the next file in time order.
|
||||||
|
//
|
||||||
|
Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||||
|
Version* version, double score, unsigned int ratio,
|
||||||
|
unsigned int max_number_of_files_to_compact) {
|
||||||
|
int level = 0;
|
||||||
|
|
||||||
|
unsigned int min_merge_width =
|
||||||
|
options_->compaction_options_universal.min_merge_width;
|
||||||
|
unsigned int max_merge_width =
|
||||||
|
options_->compaction_options_universal.max_merge_width;
|
||||||
|
|
||||||
|
// The files are sorted from newest first to oldest last.
|
||||||
|
std::vector<int>& file_by_time = version->files_by_size_[level];
|
||||||
|
FileMetaData* f = nullptr;
|
||||||
|
bool done = false;
|
||||||
|
int start_index = 0;
|
||||||
|
unsigned int candidate_count;
|
||||||
|
assert(file_by_time.size() == version->files_[level].size());
|
||||||
|
|
||||||
|
unsigned int max_files_to_compact = std::min(max_merge_width,
|
||||||
|
max_number_of_files_to_compact);
|
||||||
|
min_merge_width = std::max(min_merge_width, 2U);
|
||||||
|
|
||||||
|
// Considers a candidate file only if it is smaller than the
|
||||||
|
// total size accumulated so far.
|
||||||
|
for (unsigned int loop = 0; loop < file_by_time.size(); loop++) {
|
||||||
|
|
||||||
|
candidate_count = 0;
|
||||||
|
|
||||||
|
// Skip files that are already being compacted
|
||||||
|
for (f = nullptr; loop < file_by_time.size(); loop++) {
|
||||||
|
int index = file_by_time[loop];
|
||||||
|
f = version->files_[level][index];
|
||||||
|
|
||||||
|
if (!f->being_compacted) {
|
||||||
|
candidate_count = 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Log(options_->info_log,
|
||||||
|
"Universal: file %lu[%d] being compacted, skipping",
|
||||||
|
(unsigned long)f->number, loop);
|
||||||
|
f = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This file is not being compacted. Consider it as the
|
||||||
|
// first candidate to be compacted.
|
||||||
|
uint64_t candidate_size = f != nullptr? f->file_size : 0;
|
||||||
|
if (f != nullptr) {
|
||||||
|
Log(options_->info_log, "Universal: Possible candidate file %lu[%d].",
|
||||||
|
(unsigned long)f->number, loop);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the suceeding files need compaction.
|
||||||
|
for (unsigned int i = loop+1;
|
||||||
|
candidate_count < max_files_to_compact && i < file_by_time.size();
|
||||||
|
i++) {
|
||||||
|
int index = file_by_time[i];
|
||||||
|
FileMetaData* f = version->files_[level][index];
|
||||||
|
if (f->being_compacted) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// pick files if the total candidate file size (increased by the
|
||||||
|
// specified ratio) is still larger than the next candidate file.
|
||||||
|
uint64_t sz = (candidate_size * (100L + ratio)) /100;
|
||||||
|
if (sz < f->file_size) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
candidate_count++;
|
||||||
|
candidate_size += f->file_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Found a series of consecutive files that need compaction.
|
||||||
|
if (candidate_count >= (unsigned int)min_merge_width) {
|
||||||
|
start_index = loop;
|
||||||
|
done = true;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
for (unsigned int i = loop;
|
||||||
|
i < loop + candidate_count && i < file_by_time.size(); i++) {
|
||||||
|
int index = file_by_time[i];
|
||||||
|
FileMetaData* f = version->files_[level][index];
|
||||||
|
Log(options_->info_log,
|
||||||
|
"Universal: Skipping file %lu[%d] with size %lu %d\n",
|
||||||
|
(unsigned long)f->number,
|
||||||
|
i,
|
||||||
|
(unsigned long)f->file_size,
|
||||||
|
f->being_compacted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!done || candidate_count <= 1) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
unsigned int first_index_after = start_index + candidate_count;
|
||||||
|
// Compression is enabled if files compacted earlier already reached
|
||||||
|
// size ratio of compression.
|
||||||
|
bool enable_compression = true;
|
||||||
|
int ratio_to_compress =
|
||||||
|
options_->compaction_options_universal.compression_size_percent;
|
||||||
|
if (ratio_to_compress >= 0) {
|
||||||
|
uint64_t total_size = version->NumLevelBytes(level);
|
||||||
|
uint64_t older_file_size = 0;
|
||||||
|
for (unsigned int i = file_by_time.size() - 1; i >= first_index_after;
|
||||||
|
i--) {
|
||||||
|
older_file_size += version->files_[level][file_by_time[i]]->file_size;
|
||||||
|
if (older_file_size * 100L >= total_size * (long) ratio_to_compress) {
|
||||||
|
enable_compression = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Compaction* c =
|
||||||
|
new Compaction(version, level, level, MaxFileSizeForLevel(level),
|
||||||
|
LLONG_MAX, false, enable_compression);
|
||||||
|
c->score_ = score;
|
||||||
|
|
||||||
|
for (unsigned int i = start_index; i < first_index_after; i++) {
|
||||||
|
int index = file_by_time[i];
|
||||||
|
FileMetaData* f = c->input_version_->files_[level][index];
|
||||||
|
c->inputs_[0].push_back(f);
|
||||||
|
Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n",
|
||||||
|
(unsigned long)f->number,
|
||||||
|
i,
|
||||||
|
(unsigned long)f->file_size);
|
||||||
|
}
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look at overall size amplification. If size amplification
|
||||||
|
// exceeeds the configured value, then do a compaction
|
||||||
|
// of the candidate files all the way upto the earliest
|
||||||
|
// base file (overrides configured values of file-size ratios,
|
||||||
|
// min_merge_width and max_merge_width).
|
||||||
|
//
|
||||||
|
Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
||||||
|
Version* version, double score) {
|
||||||
|
int level = 0;
|
||||||
|
|
||||||
|
// percentage flexibilty while reducing size amplification
|
||||||
|
uint64_t ratio = options_->compaction_options_universal.
|
||||||
|
max_size_amplification_percent;
|
||||||
|
|
||||||
|
// The files are sorted from newest first to oldest last.
|
||||||
|
std::vector<int>& file_by_time = version->files_by_size_[level];
|
||||||
|
assert(file_by_time.size() == version->files_[level].size());
|
||||||
|
|
||||||
|
unsigned int candidate_count = 0;
|
||||||
|
uint64_t candidate_size = 0;
|
||||||
|
unsigned int start_index = 0;
|
||||||
|
FileMetaData* f = nullptr;
|
||||||
|
|
||||||
|
// Skip files that are already being compacted
|
||||||
|
for (unsigned int loop = 0; loop < file_by_time.size() - 1; loop++) {
|
||||||
|
int index = file_by_time[loop];
|
||||||
|
f = version->files_[level][index];
|
||||||
|
if (!f->being_compacted) {
|
||||||
|
start_index = loop; // Consider this as the first candidate.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s",
|
||||||
|
(unsigned long)f->number,
|
||||||
|
loop,
|
||||||
|
" cannot be a candidate to reduce size amp.\n");
|
||||||
|
f = nullptr;
|
||||||
|
}
|
||||||
|
if (f == nullptr) {
|
||||||
|
return nullptr; // no candidate files
|
||||||
|
}
|
||||||
|
|
||||||
|
Log(options_->info_log, "Universal: First candidate file %lu[%d] %s",
|
||||||
|
(unsigned long)f->number,
|
||||||
|
start_index,
|
||||||
|
" to reduce size amp.\n");
|
||||||
|
|
||||||
|
// keep adding up all the remaining files
|
||||||
|
for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
|
||||||
|
loop++) {
|
||||||
|
int index = file_by_time[loop];
|
||||||
|
f = version->files_[level][index];
|
||||||
|
if (f->being_compacted) {
|
||||||
|
Log(options_->info_log,
|
||||||
|
"Universal: Possible candidate file %lu[%d] %s.",
|
||||||
|
(unsigned long)f->number,
|
||||||
|
loop,
|
||||||
|
" is already being compacted. No size amp reduction possible.\n");
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
candidate_size += f->file_size;
|
||||||
|
candidate_count++;
|
||||||
|
}
|
||||||
|
if (candidate_count == 0) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// size of earliest file
|
||||||
|
int index = file_by_time[file_by_time.size() - 1];
|
||||||
|
uint64_t earliest_file_size = version->files_[level][index]->file_size;
|
||||||
|
|
||||||
|
// size amplification = percentage of additional size
|
||||||
|
if (candidate_size * 100 < ratio * earliest_file_size) {
|
||||||
|
Log(options_->info_log,
|
||||||
|
"Universal: size amp not needed. newer-files-total-size %lu "
|
||||||
|
"earliest-file-size %lu",
|
||||||
|
(unsigned long)candidate_size,
|
||||||
|
(unsigned long)earliest_file_size);
|
||||||
|
return nullptr;
|
||||||
|
} else {
|
||||||
|
Log(options_->info_log,
|
||||||
|
"Universal: size amp needed. newer-files-total-size %lu "
|
||||||
|
"earliest-file-size %lu",
|
||||||
|
(unsigned long)candidate_size,
|
||||||
|
(unsigned long)earliest_file_size);
|
||||||
|
}
|
||||||
|
assert(start_index >= 0 && start_index < file_by_time.size() - 1);
|
||||||
|
|
||||||
|
// create a compaction request
|
||||||
|
// We always compact all the files, so always compress.
|
||||||
|
Compaction* c =
|
||||||
|
new Compaction(version, level, level, MaxFileSizeForLevel(level),
|
||||||
|
LLONG_MAX, false, true);
|
||||||
|
c->score_ = score;
|
||||||
|
for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) {
|
||||||
|
int index = file_by_time[loop];
|
||||||
|
f = c->input_version_->files_[level][index];
|
||||||
|
c->inputs_[0].push_back(f);
|
||||||
|
Log(options_->info_log,
|
||||||
|
"Universal: size amp picking file %lu[%d] with size %lu",
|
||||||
|
(unsigned long)f->number,
|
||||||
|
index,
|
||||||
|
(unsigned long)f->file_size);
|
||||||
|
}
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace rocksdb
|
152
db/compaction_picker.h
Normal file
152
db/compaction_picker.h
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
#include "db/version_set.h"
|
||||||
|
#include "db/compaction.h"
|
||||||
|
#include "rocksdb/status.h"
|
||||||
|
#include "rocksdb/options.h"
|
||||||
|
|
||||||
|
#include <vector>
|
||||||
|
#include <memory>
|
||||||
|
#include <set>
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
|
||||||
|
class Compaction;
|
||||||
|
class Version;
|
||||||
|
|
||||||
|
class CompactionPicker {
|
||||||
|
public:
|
||||||
|
CompactionPicker(const Options* options, const InternalKeyComparator* icmp);
|
||||||
|
virtual ~CompactionPicker();
|
||||||
|
|
||||||
|
// See VersionSet::ReduceNumberOfLevels()
|
||||||
|
void ReduceNumberOfLevels(int new_levels);
|
||||||
|
|
||||||
|
// Pick level and inputs for a new compaction.
|
||||||
|
// Returns nullptr if there is no compaction to be done.
|
||||||
|
// Otherwise returns a pointer to a heap-allocated object that
|
||||||
|
// describes the compaction. Caller should delete the result.
|
||||||
|
virtual Compaction* PickCompaction(Version* version) = 0;
|
||||||
|
|
||||||
|
// Return a compaction object for compacting the range [begin,end] in
|
||||||
|
// the specified level. Returns nullptr if there is nothing in that
|
||||||
|
// level that overlaps the specified range. Caller should delete
|
||||||
|
// the result.
|
||||||
|
//
|
||||||
|
// The returned Compaction might not include the whole requested range.
|
||||||
|
// In that case, compaction_end will be set to the next key that needs
|
||||||
|
// compacting. In case the compaction will compact the whole range,
|
||||||
|
// compaction_end will be set to nullptr.
|
||||||
|
// Client is responsible for compaction_end storage -- when called,
|
||||||
|
// *compaction_end should point to valid InternalKey!
|
||||||
|
Compaction* CompactRange(Version* version, int input_level, int output_level,
|
||||||
|
const InternalKey* begin, const InternalKey* end,
|
||||||
|
InternalKey** compaction_end);
|
||||||
|
|
||||||
|
// Free up the files that participated in a compaction
|
||||||
|
void ReleaseCompactionFiles(Compaction* c, Status status);
|
||||||
|
|
||||||
|
// Return the total amount of data that is undergoing
|
||||||
|
// compactions per level
|
||||||
|
void SizeBeingCompacted(std::vector<uint64_t>& sizes);
|
||||||
|
|
||||||
|
// Returns maximum total overlap bytes with grandparent
|
||||||
|
// level (i.e., level+2) before we stop building a single
|
||||||
|
// file in level->level+1 compaction.
|
||||||
|
uint64_t MaxGrandParentOverlapBytes(int level);
|
||||||
|
|
||||||
|
// Returns maximum total bytes of data on a given level.
|
||||||
|
double MaxBytesForLevel(int level);
|
||||||
|
|
||||||
|
// Get the max file size in a given level.
|
||||||
|
uint64_t MaxFileSizeForLevel(int level) const;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
int NumberLevels() const { return num_levels_; }
|
||||||
|
|
||||||
|
// Stores the minimal range that covers all entries in inputs in
|
||||||
|
// *smallest, *largest.
|
||||||
|
// REQUIRES: inputs is not empty
|
||||||
|
void GetRange(const std::vector<FileMetaData*>& inputs, InternalKey* smallest,
|
||||||
|
InternalKey* largest);
|
||||||
|
|
||||||
|
// Stores the minimal range that covers all entries in inputs1 and inputs2
|
||||||
|
// in *smallest, *largest.
|
||||||
|
// REQUIRES: inputs is not empty
|
||||||
|
void GetRange(const std::vector<FileMetaData*>& inputs1,
|
||||||
|
const std::vector<FileMetaData*>& inputs2,
|
||||||
|
InternalKey* smallest, InternalKey* largest);
|
||||||
|
|
||||||
|
void ExpandWhileOverlapping(Compaction* c);
|
||||||
|
|
||||||
|
uint64_t ExpandedCompactionByteSizeLimit(int level);
|
||||||
|
|
||||||
|
// Returns true if any one of the specified files are being compacted
|
||||||
|
bool FilesInCompaction(std::vector<FileMetaData*>& files);
|
||||||
|
|
||||||
|
// Returns true if any one of the parent files are being compacted
|
||||||
|
bool ParentRangeInCompaction(Version* version, const InternalKey* smallest,
|
||||||
|
const InternalKey* largest, int level,
|
||||||
|
int* index);
|
||||||
|
|
||||||
|
void SetupOtherInputs(Compaction* c);
|
||||||
|
|
||||||
|
// record all the ongoing compactions for all levels
|
||||||
|
std::vector<std::set<Compaction*>> compactions_in_progress_;
|
||||||
|
|
||||||
|
// Per-level target file size.
|
||||||
|
std::unique_ptr<uint64_t[]> max_file_size_;
|
||||||
|
|
||||||
|
// Per-level max bytes
|
||||||
|
std::unique_ptr<uint64_t[]> level_max_bytes_;
|
||||||
|
|
||||||
|
const Options* const options_;
|
||||||
|
private:
|
||||||
|
void Init();
|
||||||
|
|
||||||
|
int num_levels_;
|
||||||
|
|
||||||
|
const InternalKeyComparator* const icmp_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class UniversalCompactionPicker : public CompactionPicker {
|
||||||
|
public:
|
||||||
|
UniversalCompactionPicker(const Options* options,
|
||||||
|
const InternalKeyComparator* icmp)
|
||||||
|
: CompactionPicker(options, icmp) {}
|
||||||
|
virtual Compaction* PickCompaction(Version* version) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Pick Universal compaction to limit read amplification
|
||||||
|
Compaction* PickCompactionUniversalReadAmp(Version* version, double score,
|
||||||
|
unsigned int ratio,
|
||||||
|
unsigned int num_files);
|
||||||
|
|
||||||
|
// Pick Universal compaction to limit space amplification.
|
||||||
|
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score);
|
||||||
|
};
|
||||||
|
|
||||||
|
class LevelCompactionPicker : public CompactionPicker {
|
||||||
|
public:
|
||||||
|
LevelCompactionPicker(const Options* options,
|
||||||
|
const InternalKeyComparator* icmp)
|
||||||
|
: CompactionPicker(options, icmp) {}
|
||||||
|
virtual Compaction* PickCompaction(Version* version) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// For the specfied level, pick a compaction.
|
||||||
|
// Returns nullptr if there is no compaction to be done.
|
||||||
|
// If level is 0 and there is already a compaction on that level, this
|
||||||
|
// function will return nullptr.
|
||||||
|
Compaction* PickCompactionBySize(Version* version, int level, double score);
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace rocksdb
|
@ -69,12 +69,6 @@ void VersionEdit::EncodeTo(std::string* dst) const {
|
|||||||
PutVarint64(dst, last_sequence_);
|
PutVarint64(dst, last_sequence_);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < compact_pointers_.size(); i++) {
|
|
||||||
PutVarint32(dst, kCompactPointer);
|
|
||||||
PutVarint32(dst, compact_pointers_[i].first); // level
|
|
||||||
PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode());
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const auto& deleted : deleted_files_) {
|
for (const auto& deleted : deleted_files_) {
|
||||||
PutVarint32(dst, kDeletedFile);
|
PutVarint32(dst, kDeletedFile);
|
||||||
PutVarint32(dst, deleted.first /* level */);
|
PutVarint32(dst, deleted.first /* level */);
|
||||||
@ -176,7 +170,9 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
|||||||
case kCompactPointer:
|
case kCompactPointer:
|
||||||
if (GetLevel(&input, &level, &msg) &&
|
if (GetLevel(&input, &level, &msg) &&
|
||||||
GetInternalKey(&input, &key)) {
|
GetInternalKey(&input, &key)) {
|
||||||
compact_pointers_.push_back(std::make_pair(level, key));
|
// we don't use compact pointers anymore,
|
||||||
|
// but we should not fail if they are still
|
||||||
|
// in manifest
|
||||||
} else {
|
} else {
|
||||||
if (!msg) {
|
if (!msg) {
|
||||||
msg = "compaction pointer";
|
msg = "compaction pointer";
|
||||||
@ -265,12 +261,6 @@ std::string VersionEdit::DebugString(bool hex_key) const {
|
|||||||
r.append("\n LastSeq: ");
|
r.append("\n LastSeq: ");
|
||||||
AppendNumberTo(&r, last_sequence_);
|
AppendNumberTo(&r, last_sequence_);
|
||||||
}
|
}
|
||||||
for (size_t i = 0; i < compact_pointers_.size(); i++) {
|
|
||||||
r.append("\n CompactPointer: ");
|
|
||||||
AppendNumberTo(&r, compact_pointers_[i].first);
|
|
||||||
r.append(" ");
|
|
||||||
r.append(compact_pointers_[i].second.DebugString(hex_key));
|
|
||||||
}
|
|
||||||
for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
|
for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
|
||||||
iter != deleted_files_.end();
|
iter != deleted_files_.end();
|
||||||
++iter) {
|
++iter) {
|
||||||
|
@ -66,9 +66,6 @@ class VersionEdit {
|
|||||||
has_last_sequence_ = true;
|
has_last_sequence_ = true;
|
||||||
last_sequence_ = seq;
|
last_sequence_ = seq;
|
||||||
}
|
}
|
||||||
void SetCompactPointer(int level, const InternalKey& key) {
|
|
||||||
compact_pointers_.push_back(std::make_pair(level, key));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the specified file at the specified number.
|
// Add the specified file at the specified number.
|
||||||
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
|
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
|
||||||
@ -124,7 +121,6 @@ class VersionEdit {
|
|||||||
bool has_next_file_number_;
|
bool has_next_file_number_;
|
||||||
bool has_last_sequence_;
|
bool has_last_sequence_;
|
||||||
|
|
||||||
std::vector<std::pair<int, InternalKey> > compact_pointers_;
|
|
||||||
DeletedFileSet deleted_files_;
|
DeletedFileSet deleted_files_;
|
||||||
std::vector<std::pair<int, FileMetaData> > new_files_;
|
std::vector<std::pair<int, FileMetaData> > new_files_;
|
||||||
};
|
};
|
||||||
|
@ -36,7 +36,6 @@ TEST(VersionEditTest, EncodeDecode) {
|
|||||||
kBig + 500 + i,
|
kBig + 500 + i,
|
||||||
kBig + 600 + i);
|
kBig + 600 + i);
|
||||||
edit.DeleteFile(4, kBig + 700 + i);
|
edit.DeleteFile(4, kBig + 700 + i);
|
||||||
edit.SetCompactPointer(i, InternalKey("x", kBig + 900 + i, kTypeValue));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
edit.SetComparatorName("foo");
|
edit.SetComparatorName("foo");
|
||||||
|
@ -789,7 +789,7 @@ int Version::PickLevelForMemTableOutput(
|
|||||||
}
|
}
|
||||||
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
|
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
|
||||||
const uint64_t sum = TotalFileSize(overlaps);
|
const uint64_t sum = TotalFileSize(overlaps);
|
||||||
if (sum > vset_->MaxGrandParentOverlapBytes(level)) {
|
if (sum > vset_->compaction_picker_->MaxGrandParentOverlapBytes(level)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
level++;
|
level++;
|
||||||
@ -1256,13 +1256,6 @@ class VersionSet::Builder {
|
|||||||
void Apply(VersionEdit* edit) {
|
void Apply(VersionEdit* edit) {
|
||||||
CheckConsistency(base_);
|
CheckConsistency(base_);
|
||||||
|
|
||||||
// Update compaction pointers
|
|
||||||
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
|
|
||||||
const int level = edit->compact_pointers_[i].first;
|
|
||||||
vset_->compact_pointer_[level] =
|
|
||||||
edit->compact_pointers_[i].second.Encode().ToString();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete files
|
// Delete files
|
||||||
const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
|
const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
|
||||||
for (const auto& del_file : del) {
|
for (const auto& del_file : del) {
|
||||||
@ -1382,13 +1375,15 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
|
|||||||
dummy_versions_(this),
|
dummy_versions_(this),
|
||||||
current_(nullptr),
|
current_(nullptr),
|
||||||
need_slowdown_for_num_level0_files_(false),
|
need_slowdown_for_num_level0_files_(false),
|
||||||
compactions_in_progress_(options_->num_levels),
|
|
||||||
current_version_number_(0),
|
current_version_number_(0),
|
||||||
manifest_file_size_(0),
|
manifest_file_size_(0),
|
||||||
storage_options_(storage_options),
|
storage_options_(storage_options),
|
||||||
storage_options_compactions_(storage_options_) {
|
storage_options_compactions_(storage_options_) {
|
||||||
compact_pointer_ = new std::string[options_->num_levels];
|
if (options_->compaction_style == kCompactionStyleUniversal) {
|
||||||
Init(options_->num_levels);
|
compaction_picker_.reset(new UniversalCompactionPicker(options_, &icmp_));
|
||||||
|
} else {
|
||||||
|
compaction_picker_.reset(new LevelCompactionPicker(options_, &icmp_));
|
||||||
|
}
|
||||||
AppendVersion(new Version(this, current_version_number_++));
|
AppendVersion(new Version(this, current_version_number_++));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1399,29 +1394,6 @@ VersionSet::~VersionSet() {
|
|||||||
delete file;
|
delete file;
|
||||||
}
|
}
|
||||||
obsolete_files_.clear();
|
obsolete_files_.clear();
|
||||||
delete[] compact_pointer_;
|
|
||||||
delete[] max_file_size_;
|
|
||||||
delete[] level_max_bytes_;
|
|
||||||
}
|
|
||||||
|
|
||||||
void VersionSet::Init(int num_levels) {
|
|
||||||
max_file_size_ = new uint64_t[num_levels];
|
|
||||||
level_max_bytes_ = new uint64_t[num_levels];
|
|
||||||
int target_file_size_multiplier = options_->target_file_size_multiplier;
|
|
||||||
int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
|
|
||||||
for (int i = 0; i < num_levels; i++) {
|
|
||||||
if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) {
|
|
||||||
max_file_size_[i] = ULLONG_MAX;
|
|
||||||
level_max_bytes_[i] = options_->max_bytes_for_level_base;
|
|
||||||
} else if (i > 1) {
|
|
||||||
max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier;
|
|
||||||
level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier *
|
|
||||||
options_->max_bytes_for_level_multiplier_additional[i-1];
|
|
||||||
} else {
|
|
||||||
max_file_size_[i] = options_->target_file_size_base;
|
|
||||||
level_max_bytes_[i] = options_->max_bytes_for_level_base;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void VersionSet::AppendVersion(Version* v) {
|
void VersionSet::AppendVersion(Version* v) {
|
||||||
@ -1499,7 +1471,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
|
|||||||
{
|
{
|
||||||
// calculate the amount of data being compacted at every level
|
// calculate the amount of data being compacted at every level
|
||||||
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
|
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
|
||||||
SizeBeingCompacted(size_being_compacted);
|
compaction_picker_->SizeBeingCompacted(size_being_compacted);
|
||||||
|
|
||||||
mu->Unlock();
|
mu->Unlock();
|
||||||
|
|
||||||
@ -1764,7 +1736,7 @@ Status VersionSet::Recover() {
|
|||||||
|
|
||||||
// Install recovered version
|
// Install recovered version
|
||||||
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
|
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
|
||||||
SizeBeingCompacted(size_being_compacted);
|
compaction_picker_->SizeBeingCompacted(size_being_compacted);
|
||||||
v->Finalize(size_being_compacted);
|
v->Finalize(size_being_compacted);
|
||||||
|
|
||||||
manifest_file_size_ = manifest_file_size;
|
manifest_file_size_ = manifest_file_size;
|
||||||
@ -1896,7 +1868,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
|
|||||||
|
|
||||||
// Install recovered version
|
// Install recovered version
|
||||||
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
|
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
|
||||||
SizeBeingCompacted(size_being_compacted);
|
compaction_picker_->SizeBeingCompacted(size_being_compacted);
|
||||||
v->Finalize(size_being_compacted);
|
v->Finalize(size_being_compacted);
|
||||||
|
|
||||||
AppendVersion(v);
|
AppendVersion(v);
|
||||||
@ -1932,15 +1904,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
|
|||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
edit.SetComparatorName(icmp_.user_comparator()->Name());
|
edit.SetComparatorName(icmp_.user_comparator()->Name());
|
||||||
|
|
||||||
// Save compaction pointers
|
|
||||||
for (int level = 0; level < NumberLevels(); level++) {
|
|
||||||
if (!compact_pointer_[level].empty()) {
|
|
||||||
InternalKey key;
|
|
||||||
key.DecodeFrom(compact_pointer_[level]);
|
|
||||||
edit.SetCompactPointer(level, key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save files
|
// Save files
|
||||||
for (int level = 0; level < current_->NumberLevels(); level++) {
|
for (int level = 0; level < current_->NumberLevels(); level++) {
|
||||||
const auto& files = current_->files_[level];
|
const auto& files = current_->files_[level];
|
||||||
@ -2042,41 +2005,16 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stores the minimal range that covers all entries in inputs in
|
Compaction* VersionSet::PickCompaction() {
|
||||||
// *smallest, *largest.
|
return compaction_picker_->PickCompaction(current_);
|
||||||
// REQUIRES: inputs is not empty
|
|
||||||
void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
|
|
||||||
InternalKey* smallest,
|
|
||||||
InternalKey* largest) {
|
|
||||||
assert(!inputs.empty());
|
|
||||||
smallest->Clear();
|
|
||||||
largest->Clear();
|
|
||||||
for (size_t i = 0; i < inputs.size(); i++) {
|
|
||||||
FileMetaData* f = inputs[i];
|
|
||||||
if (i == 0) {
|
|
||||||
*smallest = f->smallest;
|
|
||||||
*largest = f->largest;
|
|
||||||
} else {
|
|
||||||
if (icmp_.Compare(f->smallest, *smallest) < 0) {
|
|
||||||
*smallest = f->smallest;
|
|
||||||
}
|
|
||||||
if (icmp_.Compare(f->largest, *largest) > 0) {
|
|
||||||
*largest = f->largest;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stores the minimal range that covers all entries in inputs1 and inputs2
|
Compaction* VersionSet::CompactRange(int input_level, int output_level,
|
||||||
// in *smallest, *largest.
|
const InternalKey* begin,
|
||||||
// REQUIRES: inputs is not empty
|
const InternalKey* end,
|
||||||
void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
|
InternalKey** compaction_end) {
|
||||||
const std::vector<FileMetaData*>& inputs2,
|
return compaction_picker_->CompactRange(current_, input_level, output_level,
|
||||||
InternalKey* smallest,
|
begin, end, compaction_end);
|
||||||
InternalKey* largest) {
|
|
||||||
std::vector<FileMetaData*> all = inputs1;
|
|
||||||
all.insert(all.end(), inputs2.begin(), inputs2.end());
|
|
||||||
GetRange(all, smallest, largest);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
|
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
|
||||||
@ -2115,29 +2053,11 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
double VersionSet::MaxBytesForLevel(int level) {
|
double VersionSet::MaxBytesForLevel(int level) {
|
||||||
// Note: the result for level zero is not really used since we set
|
return compaction_picker_->MaxBytesForLevel(level);
|
||||||
// the level-0 compaction threshold based on number of files.
|
|
||||||
assert(level >= 0);
|
|
||||||
assert(level < NumberLevels());
|
|
||||||
return level_max_bytes_[level];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t VersionSet::MaxFileSizeForLevel(int level) {
|
uint64_t VersionSet::MaxFileSizeForLevel(int level) {
|
||||||
assert(level >= 0);
|
return compaction_picker_->MaxFileSizeForLevel(level);
|
||||||
assert(level < NumberLevels());
|
|
||||||
return max_file_size_[level];
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t VersionSet::ExpandedCompactionByteSizeLimit(int level) {
|
|
||||||
uint64_t result = MaxFileSizeForLevel(level);
|
|
||||||
result *= options_->expanded_compaction_factor;
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t VersionSet::MaxGrandParentOverlapBytes(int level) {
|
|
||||||
uint64_t result = MaxFileSizeForLevel(level);
|
|
||||||
result *= options_->max_grandparent_overlap_factor;
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify that the files listed in this compaction are present
|
// verify that the files listed in this compaction are present
|
||||||
@ -2188,697 +2108,8 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
|
|||||||
return true; // everything good
|
return true; // everything good
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear all files to indicate that they are not being compacted
|
|
||||||
// Delete this compaction from the list of running compactions.
|
|
||||||
void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) {
|
void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) {
|
||||||
c->MarkFilesBeingCompacted(false);
|
compaction_picker_->ReleaseCompactionFiles(c, status);
|
||||||
compactions_in_progress_[c->level()].erase(c);
|
|
||||||
if (!status.ok()) {
|
|
||||||
c->ResetNextCompactionIndex();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The total size of files that are currently being compacted
|
|
||||||
// at at every level upto the penultimate level.
|
|
||||||
void VersionSet::SizeBeingCompacted(std::vector<uint64_t>& sizes) {
|
|
||||||
for (int level = 0; level < NumberLevels() - 1; level++) {
|
|
||||||
uint64_t total = 0;
|
|
||||||
for (std::set<Compaction*>::iterator it =
|
|
||||||
compactions_in_progress_[level].begin();
|
|
||||||
it != compactions_in_progress_[level].end();
|
|
||||||
++it) {
|
|
||||||
Compaction* c = (*it);
|
|
||||||
assert(c->level() == level);
|
|
||||||
for (int i = 0; i < c->num_input_files(0); i++) {
|
|
||||||
total += c->input(0,i)->file_size;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sizes[level] = total;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Look at overall size amplification. If size amplification
|
|
||||||
// exceeeds the configured value, then do a compaction
|
|
||||||
// of the candidate files all the way upto the earliest
|
|
||||||
// base file (overrides configured values of file-size ratios,
|
|
||||||
// min_merge_width and max_merge_width).
|
|
||||||
//
|
|
||||||
Compaction* VersionSet::PickCompactionUniversalSizeAmp(int level,
|
|
||||||
double score) {
|
|
||||||
assert (level == 0);
|
|
||||||
|
|
||||||
// percentage flexibilty while reducing size amplification
|
|
||||||
uint64_t ratio = options_->compaction_options_universal.
|
|
||||||
max_size_amplification_percent;
|
|
||||||
|
|
||||||
// The files are sorted from newest first to oldest last.
|
|
||||||
std::vector<int>& file_by_time = current_->files_by_size_[level];
|
|
||||||
assert(file_by_time.size() == current_->files_[level].size());
|
|
||||||
|
|
||||||
unsigned int candidate_count = 0;
|
|
||||||
uint64_t candidate_size = 0;
|
|
||||||
unsigned int start_index = 0;
|
|
||||||
FileMetaData* f = nullptr;
|
|
||||||
|
|
||||||
// Skip files that are already being compacted
|
|
||||||
for (unsigned int loop = 0; loop < file_by_time.size() - 1; loop++) {
|
|
||||||
int index = file_by_time[loop];
|
|
||||||
f = current_->files_[level][index];
|
|
||||||
if (!f->being_compacted) {
|
|
||||||
start_index = loop; // Consider this as the first candidate.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s",
|
|
||||||
(unsigned long)f->number,
|
|
||||||
loop,
|
|
||||||
" cannot be a candidate to reduce size amp.\n");
|
|
||||||
f = nullptr;
|
|
||||||
}
|
|
||||||
if (f == nullptr) {
|
|
||||||
return nullptr; // no candidate files
|
|
||||||
}
|
|
||||||
|
|
||||||
Log(options_->info_log, "Universal: First candidate file %lu[%d] %s",
|
|
||||||
(unsigned long)f->number,
|
|
||||||
start_index,
|
|
||||||
" to reduce size amp.\n");
|
|
||||||
|
|
||||||
// keep adding up all the remaining files
|
|
||||||
for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
|
|
||||||
loop++) {
|
|
||||||
int index = file_by_time[loop];
|
|
||||||
f = current_->files_[level][index];
|
|
||||||
if (f->being_compacted) {
|
|
||||||
Log(options_->info_log,
|
|
||||||
"Universal: Possible candidate file %lu[%d] %s.",
|
|
||||||
(unsigned long)f->number,
|
|
||||||
loop,
|
|
||||||
" is already being compacted. No size amp reduction possible.\n");
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
candidate_size += f->file_size;
|
|
||||||
candidate_count++;
|
|
||||||
}
|
|
||||||
if (candidate_count == 0) {
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
// size of earliest file
|
|
||||||
int index = file_by_time[file_by_time.size() - 1];
|
|
||||||
uint64_t earliest_file_size = current_->files_[level][index]->file_size;
|
|
||||||
|
|
||||||
// size amplification = percentage of additional size
|
|
||||||
if (candidate_size * 100 < ratio * earliest_file_size) {
|
|
||||||
Log(options_->info_log,
|
|
||||||
"Universal: size amp not needed. newer-files-total-size %lu "
|
|
||||||
"earliest-file-size %lu",
|
|
||||||
(unsigned long)candidate_size,
|
|
||||||
(unsigned long)earliest_file_size);
|
|
||||||
return nullptr;
|
|
||||||
} else {
|
|
||||||
Log(options_->info_log,
|
|
||||||
"Universal: size amp needed. newer-files-total-size %lu "
|
|
||||||
"earliest-file-size %lu",
|
|
||||||
(unsigned long)candidate_size,
|
|
||||||
(unsigned long)earliest_file_size);
|
|
||||||
}
|
|
||||||
assert(start_index >= 0 && start_index < file_by_time.size() - 1);
|
|
||||||
|
|
||||||
// create a compaction request
|
|
||||||
// We always compact all the files, so always compress.
|
|
||||||
Compaction* c =
|
|
||||||
new Compaction(current_, level, level, MaxFileSizeForLevel(level),
|
|
||||||
LLONG_MAX, false, true);
|
|
||||||
c->score_ = score;
|
|
||||||
for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) {
|
|
||||||
int index = file_by_time[loop];
|
|
||||||
f = c->input_version_->files_[level][index];
|
|
||||||
c->inputs_[0].push_back(f);
|
|
||||||
Log(options_->info_log,
|
|
||||||
"Universal: size amp picking file %lu[%d] with size %lu",
|
|
||||||
(unsigned long)f->number,
|
|
||||||
index,
|
|
||||||
(unsigned long)f->file_size);
|
|
||||||
}
|
|
||||||
return c;
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Consider compaction files based on their size differences with
|
|
||||||
// the next file in time order.
|
|
||||||
//
|
|
||||||
Compaction* VersionSet::PickCompactionUniversalReadAmp(
|
|
||||||
int level, double score, unsigned int ratio,
|
|
||||||
unsigned int max_number_of_files_to_compact) {
|
|
||||||
|
|
||||||
unsigned int min_merge_width =
|
|
||||||
options_->compaction_options_universal.min_merge_width;
|
|
||||||
unsigned int max_merge_width =
|
|
||||||
options_->compaction_options_universal.max_merge_width;
|
|
||||||
|
|
||||||
// The files are sorted from newest first to oldest last.
|
|
||||||
std::vector<int>& file_by_time = current_->files_by_size_[level];
|
|
||||||
FileMetaData* f = nullptr;
|
|
||||||
bool done = false;
|
|
||||||
int start_index = 0;
|
|
||||||
unsigned int candidate_count;
|
|
||||||
assert(file_by_time.size() == current_->files_[level].size());
|
|
||||||
|
|
||||||
unsigned int max_files_to_compact = std::min(max_merge_width,
|
|
||||||
max_number_of_files_to_compact);
|
|
||||||
min_merge_width = std::max(min_merge_width, 2U);
|
|
||||||
|
|
||||||
// Considers a candidate file only if it is smaller than the
|
|
||||||
// total size accumulated so far.
|
|
||||||
for (unsigned int loop = 0; loop < file_by_time.size(); loop++) {
|
|
||||||
|
|
||||||
candidate_count = 0;
|
|
||||||
|
|
||||||
// Skip files that are already being compacted
|
|
||||||
for (f = nullptr; loop < file_by_time.size(); loop++) {
|
|
||||||
int index = file_by_time[loop];
|
|
||||||
f = current_->files_[level][index];
|
|
||||||
|
|
||||||
if (!f->being_compacted) {
|
|
||||||
candidate_count = 1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Log(options_->info_log,
|
|
||||||
"Universal: file %lu[%d] being compacted, skipping",
|
|
||||||
(unsigned long)f->number, loop);
|
|
||||||
f = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
// This file is not being compacted. Consider it as the
|
|
||||||
// first candidate to be compacted.
|
|
||||||
uint64_t candidate_size = f != nullptr? f->file_size : 0;
|
|
||||||
if (f != nullptr) {
|
|
||||||
Log(options_->info_log, "Universal: Possible candidate file %lu[%d].",
|
|
||||||
(unsigned long)f->number, loop);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the suceeding files need compaction.
|
|
||||||
for (unsigned int i = loop+1;
|
|
||||||
candidate_count < max_files_to_compact && i < file_by_time.size();
|
|
||||||
i++) {
|
|
||||||
int index = file_by_time[i];
|
|
||||||
FileMetaData* f = current_->files_[level][index];
|
|
||||||
if (f->being_compacted) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// pick files if the total candidate file size (increased by the
|
|
||||||
// specified ratio) is still larger than the next candidate file.
|
|
||||||
uint64_t sz = (candidate_size * (100L + ratio)) /100;
|
|
||||||
if (sz < f->file_size) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
candidate_count++;
|
|
||||||
candidate_size += f->file_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Found a series of consecutive files that need compaction.
|
|
||||||
if (candidate_count >= (unsigned int)min_merge_width) {
|
|
||||||
start_index = loop;
|
|
||||||
done = true;
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
for (unsigned int i = loop;
|
|
||||||
i < loop + candidate_count && i < file_by_time.size(); i++) {
|
|
||||||
int index = file_by_time[i];
|
|
||||||
FileMetaData* f = current_->files_[level][index];
|
|
||||||
Log(options_->info_log,
|
|
||||||
"Universal: Skipping file %lu[%d] with size %lu %d\n",
|
|
||||||
(unsigned long)f->number,
|
|
||||||
i,
|
|
||||||
(unsigned long)f->file_size,
|
|
||||||
f->being_compacted);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!done || candidate_count <= 1) {
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
unsigned int first_index_after = start_index + candidate_count;
|
|
||||||
// Compression is enabled if files compacted earlier already reached
|
|
||||||
// size ratio of compression.
|
|
||||||
bool enable_compression = true;
|
|
||||||
int ratio_to_compress =
|
|
||||||
options_->compaction_options_universal.compression_size_percent;
|
|
||||||
if (ratio_to_compress >= 0) {
|
|
||||||
uint64_t total_size = TotalFileSize(current_->files_[level]);
|
|
||||||
uint64_t older_file_size = 0;
|
|
||||||
for (unsigned int i = file_by_time.size() - 1; i >= first_index_after;
|
|
||||||
i--) {
|
|
||||||
older_file_size += current_->files_[level][file_by_time[i]]->file_size;
|
|
||||||
if (older_file_size * 100L >= total_size * (long) ratio_to_compress) {
|
|
||||||
enable_compression = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Compaction* c =
|
|
||||||
new Compaction(current_, level, level, MaxFileSizeForLevel(level),
|
|
||||||
LLONG_MAX, false, enable_compression);
|
|
||||||
c->score_ = score;
|
|
||||||
|
|
||||||
for (unsigned int i = start_index; i < first_index_after; i++) {
|
|
||||||
int index = file_by_time[i];
|
|
||||||
FileMetaData* f = c->input_version_->files_[level][index];
|
|
||||||
c->inputs_[0].push_back(f);
|
|
||||||
Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n",
|
|
||||||
(unsigned long)f->number,
|
|
||||||
i,
|
|
||||||
(unsigned long)f->file_size);
|
|
||||||
}
|
|
||||||
return c;
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Universal style of compaction. Pick files that are contiguous in
|
|
||||||
// time-range to compact.
|
|
||||||
//
|
|
||||||
Compaction* VersionSet::PickCompactionUniversal(int level, double score) {
|
|
||||||
assert (level == 0);
|
|
||||||
|
|
||||||
if ((current_->files_[level].size() <
|
|
||||||
(unsigned int)options_->level0_file_num_compaction_trigger)) {
|
|
||||||
Log(options_->info_log, "Universal: nothing to do\n");
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
Version::FileSummaryStorage tmp;
|
|
||||||
Log(options_->info_log, "Universal: candidate files(%lu): %s\n",
|
|
||||||
current_->files_[level].size(),
|
|
||||||
current_->LevelFileSummary(&tmp, 0));
|
|
||||||
|
|
||||||
// Check for size amplification first.
|
|
||||||
Compaction* c = PickCompactionUniversalSizeAmp(level, score);
|
|
||||||
if (c == nullptr) {
|
|
||||||
|
|
||||||
// Size amplification is within limits. Try reducing read
|
|
||||||
// amplification while maintaining file size ratios.
|
|
||||||
unsigned int ratio = options_->compaction_options_universal.size_ratio;
|
|
||||||
c = PickCompactionUniversalReadAmp(level, score, ratio, UINT_MAX);
|
|
||||||
|
|
||||||
// Size amplification and file size ratios are within configured limits.
|
|
||||||
// If max read amplification is exceeding configured limits, then force
|
|
||||||
// compaction without looking at filesize ratios and try to reduce
|
|
||||||
// the number of files to fewer than level0_file_num_compaction_trigger.
|
|
||||||
if (c == nullptr) {
|
|
||||||
unsigned int num_files = current_->files_[level].size() -
|
|
||||||
options_->level0_file_num_compaction_trigger;
|
|
||||||
c = PickCompactionUniversalReadAmp(level, score, UINT_MAX, num_files);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (c == nullptr) {
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
assert(c->inputs_[0].size() > 1);
|
|
||||||
|
|
||||||
// validate that all the chosen files are non overlapping in time
|
|
||||||
FileMetaData* newerfile __attribute__((unused)) = nullptr;
|
|
||||||
for (unsigned int i = 0; i < c->inputs_[0].size(); i++) {
|
|
||||||
FileMetaData* f = c->inputs_[0][i];
|
|
||||||
assert (f->smallest_seqno <= f->largest_seqno);
|
|
||||||
assert(newerfile == nullptr ||
|
|
||||||
newerfile->smallest_seqno > f->largest_seqno);
|
|
||||||
newerfile = f;
|
|
||||||
}
|
|
||||||
|
|
||||||
// The files are sorted from newest first to oldest last.
|
|
||||||
std::vector<int>& file_by_time = c->input_version_->files_by_size_[level];
|
|
||||||
|
|
||||||
// Is the earliest file part of this compaction?
|
|
||||||
int last_index = file_by_time[file_by_time.size()-1];
|
|
||||||
FileMetaData* last_file = c->input_version_->files_[level][last_index];
|
|
||||||
if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) {
|
|
||||||
c->bottommost_level_ = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// update statistics
|
|
||||||
if (options_->statistics != nullptr) {
|
|
||||||
options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION,
|
|
||||||
c->inputs_[0].size());
|
|
||||||
}
|
|
||||||
|
|
||||||
// mark all the files that are being compacted
|
|
||||||
c->MarkFilesBeingCompacted(true);
|
|
||||||
|
|
||||||
// remember this currently undergoing compaction
|
|
||||||
compactions_in_progress_[level].insert(c);
|
|
||||||
|
|
||||||
// Record whether this compaction includes all sst files.
|
|
||||||
// For now, it is only relevant in universal compaction mode.
|
|
||||||
c->is_full_compaction_ =
|
|
||||||
(c->inputs_[0].size() == c->input_version_->files_[0].size());
|
|
||||||
|
|
||||||
return c;
|
|
||||||
}
|
|
||||||
|
|
||||||
Compaction* VersionSet::PickCompactionBySize(int level, double score) {
|
|
||||||
Compaction* c = nullptr;
|
|
||||||
|
|
||||||
// level 0 files are overlapping. So we cannot pick more
|
|
||||||
// than one concurrent compactions at this level. This
|
|
||||||
// could be made better by looking at key-ranges that are
|
|
||||||
// being compacted at level 0.
|
|
||||||
if (level == 0 && compactions_in_progress_[level].size() == 1) {
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(level >= 0);
|
|
||||||
assert(level + 1 < current_->NumberLevels());
|
|
||||||
c = new Compaction(current_, level, level + 1, MaxFileSizeForLevel(level + 1),
|
|
||||||
MaxGrandParentOverlapBytes(level));
|
|
||||||
c->score_ = score;
|
|
||||||
|
|
||||||
// Pick the largest file in this level that is not already
|
|
||||||
// being compacted
|
|
||||||
std::vector<int>& file_size = c->input_version_->files_by_size_[level];
|
|
||||||
|
|
||||||
// record the first file that is not yet compacted
|
|
||||||
int nextIndex = -1;
|
|
||||||
|
|
||||||
for (unsigned int i = c->input_version_->next_file_to_compact_by_size_[level];
|
|
||||||
i < file_size.size(); i++) {
|
|
||||||
int index = file_size[i];
|
|
||||||
FileMetaData* f = c->input_version_->files_[level][index];
|
|
||||||
|
|
||||||
// check to verify files are arranged in descending size
|
|
||||||
assert((i == file_size.size() - 1) ||
|
|
||||||
(i >= Version::number_of_files_to_sort_ - 1) ||
|
|
||||||
(f->file_size >=
|
|
||||||
c->input_version_->files_[level][file_size[i + 1]]->file_size));
|
|
||||||
|
|
||||||
// do not pick a file to compact if it is being compacted
|
|
||||||
// from n-1 level.
|
|
||||||
if (f->being_compacted) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// remember the startIndex for the next call to PickCompaction
|
|
||||||
if (nextIndex == -1) {
|
|
||||||
nextIndex = i;
|
|
||||||
}
|
|
||||||
|
|
||||||
//if (i > Version::number_of_files_to_sort_) {
|
|
||||||
// Log(options_->info_log, "XXX Looking at index %d", i);
|
|
||||||
//}
|
|
||||||
|
|
||||||
// Do not pick this file if its parents at level+1 are being compacted.
|
|
||||||
// Maybe we can avoid redoing this work in SetupOtherInputs
|
|
||||||
int parent_index = -1;
|
|
||||||
if (ParentRangeInCompaction(&f->smallest, &f->largest, level,
|
|
||||||
&parent_index)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
c->inputs_[0].push_back(f);
|
|
||||||
c->base_index_ = index;
|
|
||||||
c->parent_index_ = parent_index;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (c->inputs_[0].empty()) {
|
|
||||||
delete c;
|
|
||||||
c = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
// store where to start the iteration in the next call to PickCompaction
|
|
||||||
c->input_version_->next_file_to_compact_by_size_[level] = nextIndex;
|
|
||||||
|
|
||||||
return c;
|
|
||||||
}
|
|
||||||
|
|
||||||
Compaction* VersionSet::PickCompaction() {
|
|
||||||
Compaction* c = nullptr;
|
|
||||||
int level = -1;
|
|
||||||
|
|
||||||
// Compute the compactions needed. It is better to do it here
|
|
||||||
// and also in LogAndApply(), otherwise the values could be stale.
|
|
||||||
std::vector<uint64_t> size_being_compacted(NumberLevels()-1);
|
|
||||||
current_->vset_->SizeBeingCompacted(size_being_compacted);
|
|
||||||
current_->Finalize(size_being_compacted);
|
|
||||||
|
|
||||||
// In universal style of compaction, compact L0 files back into L0.
|
|
||||||
if (options_->compaction_style == kCompactionStyleUniversal) {
|
|
||||||
int level = 0;
|
|
||||||
c = PickCompactionUniversal(level, current_->compaction_score_[level]);
|
|
||||||
return c;
|
|
||||||
}
|
|
||||||
|
|
||||||
// We prefer compactions triggered by too much data in a level over
|
|
||||||
// the compactions triggered by seeks.
|
|
||||||
//
|
|
||||||
// Find the compactions by size on all levels.
|
|
||||||
for (int i = 0; i < NumberLevels()-1; i++) {
|
|
||||||
assert(i == 0 || current_->compaction_score_[i] <=
|
|
||||||
current_->compaction_score_[i-1]);
|
|
||||||
level = current_->compaction_level_[i];
|
|
||||||
if ((current_->compaction_score_[i] >= 1)) {
|
|
||||||
c = PickCompactionBySize(level, current_->compaction_score_[i]);
|
|
||||||
ExpandWhileOverlapping(c);
|
|
||||||
if (c != nullptr) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find compactions needed by seeks
|
|
||||||
FileMetaData* f = current_->file_to_compact_;
|
|
||||||
if (c == nullptr && f != nullptr && !f->being_compacted) {
|
|
||||||
|
|
||||||
level = current_->file_to_compact_level_;
|
|
||||||
int parent_index = -1;
|
|
||||||
|
|
||||||
// Only allow one level 0 compaction at a time.
|
|
||||||
// Do not pick this file if its parents at level+1 are being compacted.
|
|
||||||
if (level != 0 || compactions_in_progress_[0].empty()) {
|
|
||||||
if(!ParentRangeInCompaction(&f->smallest, &f->largest, level,
|
|
||||||
&parent_index)) {
|
|
||||||
c = new Compaction(current_, level, level + 1,
|
|
||||||
MaxFileSizeForLevel(level + 1),
|
|
||||||
MaxGrandParentOverlapBytes(level), true);
|
|
||||||
c->inputs_[0].push_back(f);
|
|
||||||
c->parent_index_ = parent_index;
|
|
||||||
c->input_version_->file_to_compact_ = nullptr;
|
|
||||||
ExpandWhileOverlapping(c);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (c == nullptr) {
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Two level 0 compaction won't run at the same time, so don't need to worry
|
|
||||||
// about files on level 0 being compacted.
|
|
||||||
if (level == 0) {
|
|
||||||
assert(compactions_in_progress_[0].empty());
|
|
||||||
InternalKey smallest, largest;
|
|
||||||
GetRange(c->inputs_[0], &smallest, &largest);
|
|
||||||
// Note that the next call will discard the file we placed in
|
|
||||||
// c->inputs_[0] earlier and replace it with an overlapping set
|
|
||||||
// which will include the picked file.
|
|
||||||
c->inputs_[0].clear();
|
|
||||||
c->input_version_->GetOverlappingInputs(0, &smallest, &largest,
|
|
||||||
&c->inputs_[0]);
|
|
||||||
|
|
||||||
// If we include more L0 files in the same compaction run it can
|
|
||||||
// cause the 'smallest' and 'largest' key to get extended to a
|
|
||||||
// larger range. So, re-invoke GetRange to get the new key range
|
|
||||||
GetRange(c->inputs_[0], &smallest, &largest);
|
|
||||||
if (ParentRangeInCompaction(&smallest, &largest,
|
|
||||||
level, &c->parent_index_)) {
|
|
||||||
delete c;
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
assert(!c->inputs_[0].empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup "level+1" files (inputs_[1])
|
|
||||||
SetupOtherInputs(c);
|
|
||||||
|
|
||||||
// mark all the files that are being compacted
|
|
||||||
c->MarkFilesBeingCompacted(true);
|
|
||||||
|
|
||||||
// Is this compaction creating a file at the bottommost level
|
|
||||||
c->SetupBottomMostLevel(false);
|
|
||||||
|
|
||||||
// remember this currently undergoing compaction
|
|
||||||
compactions_in_progress_[level].insert(c);
|
|
||||||
|
|
||||||
return c;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns true if any one of the parent files are being compacted
|
|
||||||
bool VersionSet::ParentRangeInCompaction(const InternalKey* smallest,
|
|
||||||
const InternalKey* largest, int level,
|
|
||||||
int* parent_index) {
|
|
||||||
std::vector<FileMetaData*> inputs;
|
|
||||||
assert(level + 1 < current_->NumberLevels());
|
|
||||||
|
|
||||||
current_->GetOverlappingInputs(level + 1, smallest, largest, &inputs,
|
|
||||||
*parent_index, parent_index);
|
|
||||||
return FilesInCompaction(inputs);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns true if any one of specified files are being compacted
|
|
||||||
bool VersionSet::FilesInCompaction(std::vector<FileMetaData*>& files) {
|
|
||||||
for (unsigned int i = 0; i < files.size(); i++) {
|
|
||||||
if (files[i]->being_compacted) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add more files to the inputs on "level" to make sure that
|
|
||||||
// no newer version of a key is compacted to "level+1" while leaving an older
|
|
||||||
// version in a "level". Otherwise, any Get() will search "level" first,
|
|
||||||
// and will likely return an old/stale value for the key, since it always
|
|
||||||
// searches in increasing order of level to find the value. This could
|
|
||||||
// also scramble the order of merge operands. This function should be
|
|
||||||
// called any time a new Compaction is created, and its inputs_[0] are
|
|
||||||
// populated.
|
|
||||||
//
|
|
||||||
// Will set c to nullptr if it is impossible to apply this compaction.
|
|
||||||
void VersionSet::ExpandWhileOverlapping(Compaction* c) {
|
|
||||||
// If inputs are empty then there is nothing to expand.
|
|
||||||
if (!c || c->inputs_[0].empty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetOverlappingInputs will always do the right thing for level-0.
|
|
||||||
// So we don't need to do any expansion if level == 0.
|
|
||||||
if (c->level() == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const int level = c->level();
|
|
||||||
InternalKey smallest, largest;
|
|
||||||
|
|
||||||
// Keep expanding c->inputs_[0] until we are sure that there is a
|
|
||||||
// "clean cut" boundary between the files in input and the surrounding files.
|
|
||||||
// This will ensure that no parts of a key are lost during compaction.
|
|
||||||
int hint_index = -1;
|
|
||||||
size_t old_size;
|
|
||||||
do {
|
|
||||||
old_size = c->inputs_[0].size();
|
|
||||||
GetRange(c->inputs_[0], &smallest, &largest);
|
|
||||||
c->inputs_[0].clear();
|
|
||||||
c->input_version_->GetOverlappingInputs(
|
|
||||||
level, &smallest, &largest, &c->inputs_[0], hint_index, &hint_index);
|
|
||||||
} while(c->inputs_[0].size() > old_size);
|
|
||||||
|
|
||||||
// Get the new range
|
|
||||||
GetRange(c->inputs_[0], &smallest, &largest);
|
|
||||||
|
|
||||||
// If, after the expansion, there are files that are already under
|
|
||||||
// compaction, then we must drop/cancel this compaction.
|
|
||||||
int parent_index = -1;
|
|
||||||
if (FilesInCompaction(c->inputs_[0]) ||
|
|
||||||
(c->level() != c->output_level() &&
|
|
||||||
ParentRangeInCompaction(&smallest, &largest, level, &parent_index))) {
|
|
||||||
c->inputs_[0].clear();
|
|
||||||
c->inputs_[1].clear();
|
|
||||||
delete c;
|
|
||||||
c = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Populates the set of inputs from "level+1" that overlap with "level".
|
|
||||||
// Will also attempt to expand "level" if that doesn't expand "level+1"
|
|
||||||
// or cause "level" to include a file for compaction that has an overlapping
|
|
||||||
// user-key with another file.
|
|
||||||
void VersionSet::SetupOtherInputs(Compaction* c) {
|
|
||||||
// If inputs are empty, then there is nothing to expand.
|
|
||||||
// If both input and output levels are the same, no need to consider
|
|
||||||
// files at level "level+1"
|
|
||||||
if (c->inputs_[0].empty() || c->level() == c->output_level()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const int level = c->level();
|
|
||||||
InternalKey smallest, largest;
|
|
||||||
|
|
||||||
// Get the range one last time.
|
|
||||||
GetRange(c->inputs_[0], &smallest, &largest);
|
|
||||||
|
|
||||||
// Populate the set of next-level files (inputs_[1]) to include in compaction
|
|
||||||
c->input_version_->GetOverlappingInputs(level + 1, &smallest, &largest,
|
|
||||||
&c->inputs_[1], c->parent_index_,
|
|
||||||
&c->parent_index_);
|
|
||||||
|
|
||||||
// Get entire range covered by compaction
|
|
||||||
InternalKey all_start, all_limit;
|
|
||||||
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
|
|
||||||
|
|
||||||
// See if we can further grow the number of inputs in "level" without
|
|
||||||
// changing the number of "level+1" files we pick up. We also choose NOT
|
|
||||||
// to expand if this would cause "level" to include some entries for some
|
|
||||||
// user key, while excluding other entries for the same user key. This
|
|
||||||
// can happen when one user key spans multiple files.
|
|
||||||
if (!c->inputs_[1].empty()) {
|
|
||||||
std::vector<FileMetaData*> expanded0;
|
|
||||||
c->input_version_->GetOverlappingInputs(
|
|
||||||
level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr);
|
|
||||||
const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]);
|
|
||||||
const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]);
|
|
||||||
const uint64_t expanded0_size = TotalFileSize(expanded0);
|
|
||||||
uint64_t limit = ExpandedCompactionByteSizeLimit(level);
|
|
||||||
if (expanded0.size() > c->inputs_[0].size() &&
|
|
||||||
inputs1_size + expanded0_size < limit &&
|
|
||||||
!FilesInCompaction(expanded0) &&
|
|
||||||
!c->input_version_->HasOverlappingUserKey(&expanded0, level)) {
|
|
||||||
InternalKey new_start, new_limit;
|
|
||||||
GetRange(expanded0, &new_start, &new_limit);
|
|
||||||
std::vector<FileMetaData*> expanded1;
|
|
||||||
c->input_version_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
|
|
||||||
&expanded1, c->parent_index_,
|
|
||||||
&c->parent_index_);
|
|
||||||
if (expanded1.size() == c->inputs_[1].size() &&
|
|
||||||
!FilesInCompaction(expanded1)) {
|
|
||||||
Log(options_->info_log,
|
|
||||||
"Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)"
|
|
||||||
"\n",
|
|
||||||
(unsigned long)level,
|
|
||||||
(unsigned long)(c->inputs_[0].size()),
|
|
||||||
(unsigned long)(c->inputs_[1].size()),
|
|
||||||
(unsigned long)inputs0_size,
|
|
||||||
(unsigned long)inputs1_size,
|
|
||||||
(unsigned long)(expanded0.size()),
|
|
||||||
(unsigned long)(expanded1.size()),
|
|
||||||
(unsigned long)expanded0_size,
|
|
||||||
(unsigned long)inputs1_size);
|
|
||||||
smallest = new_start;
|
|
||||||
largest = new_limit;
|
|
||||||
c->inputs_[0] = expanded0;
|
|
||||||
c->inputs_[1] = expanded1;
|
|
||||||
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compute the set of grandparent files that overlap this compaction
|
|
||||||
// (parent == level+1; grandparent == level+2)
|
|
||||||
if (level + 2 < NumberLevels()) {
|
|
||||||
c->input_version_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
|
|
||||||
&c->grandparents_);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (false) {
|
|
||||||
Log(options_->info_log, "Compacting %d '%s' .. '%s'",
|
|
||||||
level,
|
|
||||||
smallest.DebugString().c_str(),
|
|
||||||
largest.DebugString().c_str());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the place where we will do the next compaction for this level.
|
|
||||||
// We update this immediately instead of waiting for the VersionEdit
|
|
||||||
// to be applied so that if the compaction fails, we will try a different
|
|
||||||
// key range next time.
|
|
||||||
compact_pointer_[level] = largest.Encode().ToString();
|
|
||||||
c->edit_->SetCompactPointer(level, largest);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
|
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
|
||||||
@ -2920,69 +2151,4 @@ void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files) {
|
|||||||
obsolete_files_.clear();
|
obsolete_files_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
Compaction* VersionSet::CompactRange(int input_level,
|
|
||||||
int output_level,
|
|
||||||
const InternalKey* begin,
|
|
||||||
const InternalKey* end,
|
|
||||||
InternalKey** compaction_end) {
|
|
||||||
std::vector<FileMetaData*> inputs;
|
|
||||||
bool covering_the_whole_range = true;
|
|
||||||
|
|
||||||
// All files are 'overlapping' in universal style compaction.
|
|
||||||
// We have to compact the entire range in one shot.
|
|
||||||
if (options_->compaction_style == kCompactionStyleUniversal) {
|
|
||||||
begin = nullptr;
|
|
||||||
end = nullptr;
|
|
||||||
}
|
|
||||||
current_->GetOverlappingInputs(input_level, begin, end, &inputs);
|
|
||||||
if (inputs.empty()) {
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Avoid compacting too much in one shot in case the range is large.
|
|
||||||
// But we cannot do this for level-0 since level-0 files can overlap
|
|
||||||
// and we must not pick one file and drop another older file if the
|
|
||||||
// two files overlap.
|
|
||||||
if (input_level > 0) {
|
|
||||||
const uint64_t limit =
|
|
||||||
MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
|
|
||||||
uint64_t total = 0;
|
|
||||||
for (size_t i = 0; i + 1 < inputs.size(); ++i) {
|
|
||||||
uint64_t s = inputs[i]->file_size;
|
|
||||||
total += s;
|
|
||||||
if (total >= limit) {
|
|
||||||
**compaction_end = inputs[i + 1]->smallest;
|
|
||||||
covering_the_whole_range = false;
|
|
||||||
inputs.resize(i + 1);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Compaction* c = new Compaction(current_, input_level, output_level,
|
|
||||||
MaxFileSizeForLevel(output_level),
|
|
||||||
MaxGrandParentOverlapBytes(input_level));
|
|
||||||
|
|
||||||
c->inputs_[0] = inputs;
|
|
||||||
ExpandWhileOverlapping(c);
|
|
||||||
if (c == nullptr) {
|
|
||||||
Log(options_->info_log, "Could not compact due to expansion failure.\n");
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
SetupOtherInputs(c);
|
|
||||||
|
|
||||||
if (covering_the_whole_range) {
|
|
||||||
*compaction_end = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
// These files that are to be manaully compacted do not trample
|
|
||||||
// upon other files because manual compactions are processed when
|
|
||||||
// the system has a max of 1 background compaction thread.
|
|
||||||
c->MarkFilesBeingCompacted(true);
|
|
||||||
|
|
||||||
// Is this compaction creating a file at the bottommost level
|
|
||||||
c->SetupBottomMostLevel(true);
|
|
||||||
return c;
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -28,12 +28,14 @@
|
|||||||
#include "port/port.h"
|
#include "port/port.h"
|
||||||
#include "db/table_cache.h"
|
#include "db/table_cache.h"
|
||||||
#include "db/compaction.h"
|
#include "db/compaction.h"
|
||||||
|
#include "db/compaction_picker.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
namespace log { class Writer; }
|
namespace log { class Writer; }
|
||||||
|
|
||||||
class Compaction;
|
class Compaction;
|
||||||
|
class CompactionPicker;
|
||||||
class Iterator;
|
class Iterator;
|
||||||
class MemTable;
|
class MemTable;
|
||||||
class TableCache;
|
class TableCache;
|
||||||
@ -187,6 +189,9 @@ class Version {
|
|||||||
friend class Compaction;
|
friend class Compaction;
|
||||||
friend class VersionSet;
|
friend class VersionSet;
|
||||||
friend class DBImpl;
|
friend class DBImpl;
|
||||||
|
friend class CompactionPicker;
|
||||||
|
friend class LevelCompactionPicker;
|
||||||
|
friend class UniversalCompactionPicker;
|
||||||
|
|
||||||
class LevelFileNumIterator;
|
class LevelFileNumIterator;
|
||||||
Iterator* NewConcatenatingIterator(const ReadOptions&,
|
Iterator* NewConcatenatingIterator(const ReadOptions&,
|
||||||
@ -409,35 +414,18 @@ class VersionSet {
|
|||||||
// Return the size of the current manifest file
|
// Return the size of the current manifest file
|
||||||
uint64_t ManifestFileSize() const { return manifest_file_size_; }
|
uint64_t ManifestFileSize() const { return manifest_file_size_; }
|
||||||
|
|
||||||
// For the specfied level, pick a compaction.
|
|
||||||
// Returns nullptr if there is no compaction to be done.
|
|
||||||
// If level is 0 and there is already a compaction on that level, this
|
|
||||||
// function will return nullptr.
|
|
||||||
Compaction* PickCompactionBySize(int level, double score);
|
|
||||||
|
|
||||||
// Pick files to compact in Universal mode
|
|
||||||
Compaction* PickCompactionUniversal(int level, double score);
|
|
||||||
|
|
||||||
// Pick Universal compaction to limit read amplification
|
|
||||||
Compaction* PickCompactionUniversalReadAmp(int level, double score,
|
|
||||||
unsigned int ratio, unsigned int num_files);
|
|
||||||
|
|
||||||
// Pick Universal compaction to limit space amplification.
|
|
||||||
Compaction* PickCompactionUniversalSizeAmp(int level, double score);
|
|
||||||
|
|
||||||
// Free up the files that were participated in a compaction
|
|
||||||
void ReleaseCompactionFiles(Compaction* c, Status status);
|
|
||||||
|
|
||||||
// verify that the files that we started with for a compaction
|
// verify that the files that we started with for a compaction
|
||||||
// still exist in the current version and in the same original level.
|
// still exist in the current version and in the same original level.
|
||||||
// This ensures that a concurrent compaction did not erroneously
|
// This ensures that a concurrent compaction did not erroneously
|
||||||
// pick the same files to compact.
|
// pick the same files to compact.
|
||||||
bool VerifyCompactionFileConsistency(Compaction* c);
|
bool VerifyCompactionFileConsistency(Compaction* c);
|
||||||
|
|
||||||
|
double MaxBytesForLevel(int level);
|
||||||
|
|
||||||
// Get the max file size in a given level.
|
// Get the max file size in a given level.
|
||||||
uint64_t MaxFileSizeForLevel(int level);
|
uint64_t MaxFileSizeForLevel(int level);
|
||||||
|
|
||||||
double MaxBytesForLevel(int level);
|
void ReleaseCompactionFiles(Compaction* c, Status status);
|
||||||
|
|
||||||
Status GetMetadataForFile(
|
Status GetMetadataForFile(
|
||||||
uint64_t number, int *filelevel, FileMetaData **metadata);
|
uint64_t number, int *filelevel, FileMetaData **metadata);
|
||||||
@ -454,21 +442,6 @@ class VersionSet {
|
|||||||
friend class Compaction;
|
friend class Compaction;
|
||||||
friend class Version;
|
friend class Version;
|
||||||
|
|
||||||
void Init(int num_levels);
|
|
||||||
|
|
||||||
void GetRange(const std::vector<FileMetaData*>& inputs,
|
|
||||||
InternalKey* smallest,
|
|
||||||
InternalKey* largest);
|
|
||||||
|
|
||||||
void GetRange2(const std::vector<FileMetaData*>& inputs1,
|
|
||||||
const std::vector<FileMetaData*>& inputs2,
|
|
||||||
InternalKey* smallest,
|
|
||||||
InternalKey* largest);
|
|
||||||
|
|
||||||
void ExpandWhileOverlapping(Compaction* c);
|
|
||||||
|
|
||||||
void SetupOtherInputs(Compaction* c);
|
|
||||||
|
|
||||||
// Save current contents to *log
|
// Save current contents to *log
|
||||||
Status WriteSnapshot(log::Writer* log);
|
Status WriteSnapshot(log::Writer* log);
|
||||||
|
|
||||||
@ -476,10 +449,6 @@ class VersionSet {
|
|||||||
|
|
||||||
bool ManifestContains(const std::string& record) const;
|
bool ManifestContains(const std::string& record) const;
|
||||||
|
|
||||||
uint64_t ExpandedCompactionByteSizeLimit(int level);
|
|
||||||
|
|
||||||
uint64_t MaxGrandParentOverlapBytes(int level);
|
|
||||||
|
|
||||||
Env* const env_;
|
Env* const env_;
|
||||||
const std::string dbname_;
|
const std::string dbname_;
|
||||||
const Options* const options_;
|
const Options* const options_;
|
||||||
@ -502,18 +471,9 @@ class VersionSet {
|
|||||||
// we have too many level 0 files
|
// we have too many level 0 files
|
||||||
bool need_slowdown_for_num_level0_files_;
|
bool need_slowdown_for_num_level0_files_;
|
||||||
|
|
||||||
// Per-level key at which the next compaction at that level should start.
|
// An object that keeps all the compaction stats
|
||||||
// Either an empty string, or a valid InternalKey.
|
// and picks the next compaction
|
||||||
std::string* compact_pointer_;
|
std::unique_ptr<CompactionPicker> compaction_picker_;
|
||||||
|
|
||||||
// Per-level target file size.
|
|
||||||
uint64_t* max_file_size_;
|
|
||||||
|
|
||||||
// Per-level max bytes
|
|
||||||
uint64_t* level_max_bytes_;
|
|
||||||
|
|
||||||
// record all the ongoing compactions for all levels
|
|
||||||
std::vector<std::set<Compaction*> > compactions_in_progress_;
|
|
||||||
|
|
||||||
// generates a increasing version number for every new version
|
// generates a increasing version number for every new version
|
||||||
uint64_t current_version_number_;
|
uint64_t current_version_number_;
|
||||||
@ -537,17 +497,6 @@ class VersionSet {
|
|||||||
VersionSet(const VersionSet&);
|
VersionSet(const VersionSet&);
|
||||||
void operator=(const VersionSet&);
|
void operator=(const VersionSet&);
|
||||||
|
|
||||||
// Return the total amount of data that is undergoing
|
|
||||||
// compactions per level
|
|
||||||
void SizeBeingCompacted(std::vector<uint64_t>&);
|
|
||||||
|
|
||||||
// Returns true if any one of the parent files are being compacted
|
|
||||||
bool ParentRangeInCompaction(const InternalKey* smallest,
|
|
||||||
const InternalKey* largest, int level, int* index);
|
|
||||||
|
|
||||||
// Returns true if any one of the specified files are being compacted
|
|
||||||
bool FilesInCompaction(std::vector<FileMetaData*>& files);
|
|
||||||
|
|
||||||
void LogAndApplyHelper(Builder*b, Version* v,
|
void LogAndApplyHelper(Builder*b, Version* v,
|
||||||
VersionEdit* edit, port::Mutex* mu);
|
VersionEdit* edit, port::Mutex* mu);
|
||||||
};
|
};
|
||||||
|
@ -67,12 +67,8 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
|
|||||||
current_version->files_ = new_files_list;
|
current_version->files_ = new_files_list;
|
||||||
current_version->num_levels_ = new_levels;
|
current_version->num_levels_ = new_levels;
|
||||||
|
|
||||||
delete[] compact_pointer_;
|
|
||||||
delete[] max_file_size_;
|
|
||||||
delete[] level_max_bytes_;
|
|
||||||
num_levels_ = new_levels;
|
num_levels_ = new_levels;
|
||||||
compact_pointer_ = new std::string[new_levels];
|
compaction_picker_->ReduceNumberOfLevels(new_levels);
|
||||||
Init(new_levels);
|
|
||||||
VersionEdit ve;
|
VersionEdit ve;
|
||||||
st = LogAndApply(&ve, mu, true);
|
st = LogAndApply(&ve, mu, true);
|
||||||
return st;
|
return st;
|
||||||
|
Loading…
Reference in New Issue
Block a user