options.level_compaction_dynamic_level_bytes to allow RocksDB to pick size bases of levels dynamically.

Summary:
When having fixed max_bytes_for_level_base, the ratio of size of largest level and the second one can range from 0 to the multiplier. This makes LSM tree frequently irregular and unpredictable. It can also cause poor space amplification in some cases.

In this improvement (proposed by Igor Kabiljo), we introduce a parameter option.level_compaction_use_dynamic_max_bytes. When turning it on, RocksDB is free to pick a level base in the range of (options.max_bytes_for_level_base/options.max_bytes_for_level_multiplier, options.max_bytes_for_level_base] so that real level ratios are close to options.max_bytes_for_level_multiplier.

Test Plan: New unit tests and pass tests suites including valgrind.

Reviewers: MarkCallaghan, rven, yhchiang, igor, ikabiljo

Reviewed By: ikabiljo

Subscribers: yoshinorim, ikabiljo, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D31437
This commit is contained in:
Igor Canadi 2015-02-05 11:44:17 -08:00 committed by sdong
parent f29b33c73b
commit db03739340
28 changed files with 1187 additions and 253 deletions

View File

@ -16,6 +16,7 @@
* Block based table now makes use of prefix bloom filter if it is a full fulter. * Block based table now makes use of prefix bloom filter if it is a full fulter.
* Block based table remembers whether a whole key or prefix based bloom filter is supported in SST files. Do a sanity check when reading the file with users' configuration. * Block based table remembers whether a whole key or prefix based bloom filter is supported in SST files. Do a sanity check when reading the file with users' configuration.
* Fixed a bug in ReadOnlyBackupEngine that deleted corrupted backups in some cases, even though the engine was ReadOnly * Fixed a bug in ReadOnlyBackupEngine that deleted corrupted backups in some cases, even though the engine was ReadOnly
* options.level_compaction_dynamic_level_bytes, a feature to allow RocksDB to pick dynamic base of bytes for levels. With this feature turned on, we will automatically adjust max bytes for each level. The goal of this feature is to have lower bound on size amplification. For more details, see comments in options.h.
### Public API changes ### Public API changes
* Deprecated skip_log_error_on_recovery option * Deprecated skip_log_error_on_recovery option

View File

@ -100,9 +100,9 @@ const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
return cfd()->user_comparator(); return cfd()->user_comparator();
} }
ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
const ColumnFamilyOptions& src, const InternalKeyComparator* icmp,
Logger* info_log) { const ColumnFamilyOptions& src) {
ColumnFamilyOptions result = src; ColumnFamilyOptions result = src;
result.comparator = icmp; result.comparator = icmp;
#ifdef OS_MACOSX #ifdef OS_MACOSX
@ -168,7 +168,7 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
result.level0_slowdown_writes_trigger || result.level0_slowdown_writes_trigger ||
result.level0_slowdown_writes_trigger < result.level0_slowdown_writes_trigger <
result.level0_file_num_compaction_trigger) { result.level0_file_num_compaction_trigger) {
Warn(info_log, Warn(db_options.info_log.get(),
"This condition must be satisfied: " "This condition must be satisfied: "
"level0_stop_writes_trigger(%d) >= " "level0_stop_writes_trigger(%d) >= "
"level0_slowdown_writes_trigger(%d) >= " "level0_slowdown_writes_trigger(%d) >= "
@ -185,7 +185,7 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
result.level0_slowdown_writes_trigger) { result.level0_slowdown_writes_trigger) {
result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger; result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
} }
Warn(info_log, Warn(db_options.info_log.get(),
"Adjust the value to " "Adjust the value to "
"level0_stop_writes_trigger(%d)" "level0_stop_writes_trigger(%d)"
"level0_slowdown_writes_trigger(%d)" "level0_slowdown_writes_trigger(%d)"
@ -194,6 +194,16 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
result.level0_slowdown_writes_trigger, result.level0_slowdown_writes_trigger,
result.level0_file_num_compaction_trigger); result.level0_file_num_compaction_trigger);
} }
if (result.level_compaction_dynamic_level_bytes) {
if (result.compaction_style != kCompactionStyleLevel ||
db_options.db_paths.size() > 1U) {
// 1. level_compaction_dynamic_level_bytes only makes sense for
// level-based compaction.
// 2. we don't yet know how to make both of this feature and multiple
// DB path work.
result.level_compaction_dynamic_level_bytes = false;
}
}
return result; return result;
} }
@ -269,8 +279,8 @@ ColumnFamilyData::ColumnFamilyData(
refs_(0), refs_(0),
dropped_(false), dropped_(false),
internal_comparator_(cf_options.comparator), internal_comparator_(cf_options.comparator),
options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options, options_(*db_options,
db_options->info_log.get())), SanitizeOptions(*db_options, &internal_comparator_, cf_options)),
ioptions_(options_), ioptions_(options_),
mutable_cf_options_(options_, ioptions_), mutable_cf_options_(options_, ioptions_),
write_buffer_(write_buffer), write_buffer_(write_buffer),

View File

@ -122,9 +122,9 @@ struct SuperVersion {
static void* const kSVObsolete; static void* const kSVObsolete;
}; };
extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, extern ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
const ColumnFamilyOptions& src, const InternalKeyComparator* icmp,
Logger* info_log); const ColumnFamilyOptions& src);
class ColumnFamilySet; class ColumnFamilySet;

View File

@ -1020,7 +1020,7 @@ TEST(ColumnFamilyTest, CreateMissingColumnFamilies) {
} }
TEST(ColumnFamilyTest, SanitizeOptions) { TEST(ColumnFamilyTest, SanitizeOptions) {
DumbLogger logger; DBOptions db_options;
for (int i = 1; i <= 3; i++) { for (int i = 1; i <= 3; i++) {
for (int j = 1; j <= 3; j++) { for (int j = 1; j <= 3; j++) {
for (int k = 1; k <= 3; k++) { for (int k = 1; k <= 3; k++) {
@ -1028,7 +1028,8 @@ TEST(ColumnFamilyTest, SanitizeOptions) {
original.level0_stop_writes_trigger = i; original.level0_stop_writes_trigger = i;
original.level0_slowdown_writes_trigger = j; original.level0_slowdown_writes_trigger = j;
original.level0_file_num_compaction_trigger = k; original.level0_file_num_compaction_trigger = k;
ColumnFamilyOptions result = SanitizeOptions(NULL, original, &logger); ColumnFamilyOptions result =
SanitizeOptions(db_options, nullptr, original);
ASSERT_TRUE(result.level0_stop_writes_trigger >= ASSERT_TRUE(result.level0_stop_writes_trigger >=
result.level0_slowdown_writes_trigger); result.level0_slowdown_writes_trigger);
ASSERT_TRUE(result.level0_slowdown_writes_trigger >= ASSERT_TRUE(result.level0_slowdown_writes_trigger >=

View File

@ -39,13 +39,13 @@ void Compaction::SetInputVersion(Version* _input_version) {
edit_->SetColumnFamily(cfd_->GetID()); edit_->SetColumnFamily(cfd_->GetID());
} }
Compaction::Compaction(int number_levels, int start_level, int out_level, Compaction::Compaction(int number_levels, int _start_level, int out_level,
uint64_t target_file_size, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, uint64_t max_grandparent_overlap_bytes,
uint32_t output_path_id, uint32_t output_path_id,
CompressionType output_compression, bool seek_compaction, CompressionType output_compression, bool seek_compaction,
bool deletion_compaction) bool deletion_compaction)
: start_level_(start_level), : start_level_(_start_level),
output_level_(out_level), output_level_(out_level),
max_output_file_size_(target_file_size), max_output_file_size_(target_file_size),
max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes), max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes),
@ -242,6 +242,32 @@ void Compaction::SetupBottomMostLevel(VersionStorageInfo* vstorage,
} }
} }
// Sample output:
// If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
// print: "3@0 + 2@3 + 1@4 files to L5"
const char* Compaction::InputLevelSummary(
InputLevelSummaryBuffer* scratch) const {
int len = 0;
bool is_first = true;
for (auto& input_level : inputs_) {
if (input_level.empty()) {
continue;
}
if (!is_first) {
len +=
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + ");
} else {
is_first = false;
}
len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
"%zu@%d", input_level.size(), input_level.level);
}
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
" files to L%d", output_level());
return scratch->buffer;
}
void Compaction::ReleaseCompactionFiles(Status status) { void Compaction::ReleaseCompactionFiles(Status status) {
cfd_->compaction_picker()->ReleaseCompactionFiles(this, status); cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
} }

View File

@ -52,6 +52,8 @@ class Compaction {
return inputs_[compaction_input_level].level; return inputs_[compaction_input_level].level;
} }
int start_level() const { return start_level_; }
// Outputs will go to this level // Outputs will go to this level
int output_level() const { return output_level_; } int output_level() const { return output_level_; }
@ -189,6 +191,12 @@ class Compaction {
return &inputs_[l]; return &inputs_[l];
} }
struct InputLevelSummaryBuffer {
char buffer[128];
};
const char* InputLevelSummary(InputLevelSummaryBuffer* scratch) const;
private: private:
friend class CompactionPicker; friend class CompactionPicker;
friend class UniversalCompactionPicker; friend class UniversalCompactionPicker;

View File

@ -231,15 +231,19 @@ void CompactionJob::Prepare() {
compact_->CleanupBatchBuffer(); compact_->CleanupBatchBuffer();
compact_->CleanupMergedBuffer(); compact_->CleanupMergedBuffer();
auto* compaction = compact_->compaction;
// Generate file_levels_ for compaction berfore making Iterator // Generate file_levels_ for compaction berfore making Iterator
compact_->compaction->GenerateFileLevels(); compaction->GenerateFileLevels();
ColumnFamilyData* cfd = compact_->compaction->column_family_data(); ColumnFamilyData* cfd = compact_->compaction->column_family_data();
assert(cfd != nullptr); assert(cfd != nullptr);
LogToBuffer( {
log_buffer_, "[%s] [JOB %d] Compacting %d@%d + %d@%d files, score %.2f", Compaction::InputLevelSummaryBuffer inputs_summary;
cfd->GetName().c_str(), job_id_, compact_->compaction->num_input_files(0), LogToBuffer(log_buffer_, "[%s] [JOB %d] Compacting %s, score %.2f",
compact_->compaction->level(), compact_->compaction->num_input_files(1), cfd->GetName().c_str(), job_id_,
compact_->compaction->output_level(), compact_->compaction->score()); compaction->InputLevelSummary(&inputs_summary),
compaction->score());
}
char scratch[2345]; char scratch[2345];
compact_->compaction->Summary(scratch, sizeof(scratch)); compact_->compaction->Summary(scratch, sizeof(scratch));
LogToBuffer(log_buffer_, "[%s] Compaction start summary: %s\n", LogToBuffer(log_buffer_, "[%s] Compaction start summary: %s\n",
@ -959,39 +963,40 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) { Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) {
db_mutex->AssertHeld(); db_mutex->AssertHeld();
auto* compaction = compact_->compaction;
// paranoia: verify that the files that we started with // paranoia: verify that the files that we started with
// still exist in the current version and in the same original level. // still exist in the current version and in the same original level.
// This ensures that a concurrent compaction did not erroneously // This ensures that a concurrent compaction did not erroneously
// pick the same files to compact_. // pick the same files to compact_.
if (!versions_->VerifyCompactionFileConsistency(compact_->compaction)) { if (!versions_->VerifyCompactionFileConsistency(compaction)) {
Compaction::InputLevelSummaryBuffer inputs_summary;
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Compaction %d@%d + %d@%d files aborted", "[%s] [JOB %d] Compaction %s aborted",
compact_->compaction->column_family_data()->GetName().c_str(), job_id_, compaction->column_family_data()->GetName().c_str(), job_id_,
compact_->compaction->num_input_files(0), compact_->compaction->level(), compaction->InputLevelSummary(&inputs_summary));
compact_->compaction->num_input_files(1),
compact_->compaction->output_level());
return Status::Corruption("Compaction input files inconsistent"); return Status::Corruption("Compaction input files inconsistent");
} }
{
Compaction::InputLevelSummaryBuffer inputs_summary;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Compacted %d@%d + %d@%d files => %" PRIu64 " bytes", "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
compact_->compaction->column_family_data()->GetName().c_str(), job_id_, compaction->column_family_data()->GetName().c_str(), job_id_,
compact_->compaction->num_input_files(0), compact_->compaction->level(), compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
compact_->compaction->num_input_files(1), }
compact_->compaction->output_level(), compact_->total_bytes);
// Add compaction outputs // Add compaction outputs
compact_->compaction->AddInputDeletions(compact_->compaction->edit()); compaction->AddInputDeletions(compact_->compaction->edit());
for (size_t i = 0; i < compact_->outputs.size(); i++) { for (size_t i = 0; i < compact_->outputs.size(); i++) {
const CompactionState::Output& out = compact_->outputs[i]; const CompactionState::Output& out = compact_->outputs[i];
compact_->compaction->edit()->AddFile( compaction->edit()->AddFile(
compact_->compaction->output_level(), out.number, out.path_id, compaction->output_level(), out.number, out.path_id, out.file_size,
out.file_size, out.smallest, out.largest, out.smallest_seqno, out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
out.largest_seqno);
} }
return versions_->LogAndApply( return versions_->LogAndApply(compaction->column_family_data(),
compact_->compaction->column_family_data(), mutable_cf_options_, mutable_cf_options_, compaction->edit(),
compact_->compaction->edit(), db_mutex, db_directory_); db_mutex, db_directory_);
} }
// Given a sequence number, return the sequence number of the // Given a sequence number, return the sequence number of the

View File

@ -116,7 +116,7 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name,
assert(c != nullptr); assert(c != nullptr);
// If inputs are empty then there is nothing to expand. // If inputs are empty then there is nothing to expand.
if (c->inputs_[0].empty()) { if (c->inputs_[0].empty()) {
assert(c->inputs_[1].empty()); assert(c->inputs(c->num_input_levels() - 1)->empty());
// This isn't good compaction // This isn't good compaction
return false; return false;
} }
@ -157,10 +157,10 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name,
} }
if (c->inputs_[0].empty() || FilesInCompaction(c->inputs_[0].files) || if (c->inputs_[0].empty() || FilesInCompaction(c->inputs_[0].files) ||
(c->level() != c->output_level() && (c->level() != c->output_level() &&
ParentRangeInCompaction(vstorage, &smallest, &largest, level, RangeInCompaction(vstorage, &smallest, &largest, c->output_level(),
&parent_index))) { &parent_index))) {
c->inputs_[0].clear(); c->inputs_[0].clear();
c->inputs_[1].clear(); c->inputs_[c->num_input_levels() - 1].clear();
if (!c->inputs_[0].empty()) { if (!c->inputs_[0].empty()) {
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] ExpandWhileOverlapping() failure because some of the necessary" "[%s] ExpandWhileOverlapping() failure because some of the necessary"
@ -267,22 +267,24 @@ Status CompactionPicker::GetCompactionInputsFromFileNumbers(
// Returns true if any one of the parent files are being compacted // Returns true if any one of the parent files are being compacted
bool CompactionPicker::ParentRangeInCompaction(VersionStorageInfo* vstorage, bool CompactionPicker::RangeInCompaction(VersionStorageInfo* vstorage,
const InternalKey* smallest, const InternalKey* smallest,
const InternalKey* largest, const InternalKey* largest, int level,
int level, int* parent_index) { int* level_index) {
std::vector<FileMetaData*> inputs; std::vector<FileMetaData*> inputs;
assert(level + 1 < NumberLevels()); assert(level < NumberLevels());
vstorage->GetOverlappingInputs(level + 1, smallest, largest, &inputs, vstorage->GetOverlappingInputs(level, smallest, largest, &inputs,
*parent_index, parent_index); *level_index, level_index);
return FilesInCompaction(inputs); return FilesInCompaction(inputs);
} }
// Populates the set of inputs from "level+1" that overlap with "level". // Populates the set of inputs of all other levels that overlap with the
// Will also attempt to expand "level" if that doesn't expand "level+1" // start level.
// or cause "level" to include a file for compaction that has an overlapping // Now we assume all levels except start level and output level are empty.
// user-key with another file. // Will also attempt to expand "start level" if that doesn't expand
// "output level" or cause "level" to include a file for compaction that has an
// overlapping user-key with another file.
void CompactionPicker::SetupOtherInputs( void CompactionPicker::SetupOtherInputs(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, Compaction* c) { VersionStorageInfo* vstorage, Compaction* c) {
@ -293,32 +295,41 @@ void CompactionPicker::SetupOtherInputs(
return; return;
} }
// For now, we only support merging two levels, start level and output level.
// We need to assert other levels are empty.
for (int l = c->start_level() + 1; l < c->output_level(); l++) {
assert(vstorage->NumLevelFiles(l) == 0);
}
const int level = c->level(); const int level = c->level();
InternalKey smallest, largest; InternalKey smallest, largest;
// Get the range one last time. // Get the range one last time.
GetRange(c->inputs_[0].files, &smallest, &largest); GetRange(c->inputs_[0].files, &smallest, &largest);
// Populate the set of next-level files (inputs_[1]) to include in compaction // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
vstorage->GetOverlappingInputs(level + 1, &smallest, &largest, // include in compaction
&c->inputs_[1].files, c->parent_index_, vstorage->GetOverlappingInputs(c->output_level(), &smallest, &largest,
&c->parent_index_); &c->inputs_[c->num_input_levels() - 1].files,
c->parent_index_, &c->parent_index_);
// Get entire range covered by compaction // Get entire range covered by compaction
InternalKey all_start, all_limit; InternalKey all_start, all_limit;
GetRange(c->inputs_[0].files, c->inputs_[1].files, &all_start, &all_limit); GetRange(c->inputs_[0].files, c->inputs_[c->num_input_levels() - 1].files,
&all_start, &all_limit);
// See if we can further grow the number of inputs in "level" without // See if we can further grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up. We also choose NOT // changing the number of "level+1" files we pick up. We also choose NOT
// to expand if this would cause "level" to include some entries for some // to expand if this would cause "level" to include some entries for some
// user key, while excluding other entries for the same user key. This // user key, while excluding other entries for the same user key. This
// can happen when one user key spans multiple files. // can happen when one user key spans multiple files.
if (!c->inputs_[1].empty()) { if (!c->inputs(c->num_input_levels() - 1)->empty()) {
std::vector<FileMetaData*> expanded0; std::vector<FileMetaData*> expanded0;
vstorage->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0, vstorage->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0,
c->base_index_, nullptr); c->base_index_, nullptr);
const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0].files); const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0].files);
const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1].files); const uint64_t inputs1_size =
TotalCompensatedFileSize(c->inputs_[c->num_input_levels() - 1].files);
const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0); const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0);
uint64_t limit = mutable_cf_options.ExpandedCompactionByteSizeLimit(level); uint64_t limit = mutable_cf_options.ExpandedCompactionByteSizeLimit(level);
if (expanded0.size() > c->inputs_[0].size() && if (expanded0.size() > c->inputs_[0].size() &&
@ -328,32 +339,34 @@ void CompactionPicker::SetupOtherInputs(
InternalKey new_start, new_limit; InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit); GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1; std::vector<FileMetaData*> expanded1;
vstorage->GetOverlappingInputs(level + 1, &new_start, &new_limit, vstorage->GetOverlappingInputs(c->output_level(), &new_start, &new_limit,
&expanded1, c->parent_index_, &expanded1, c->parent_index_,
&c->parent_index_); &c->parent_index_);
if (expanded1.size() == c->inputs_[1].size() && if (expanded1.size() == c->inputs(c->num_input_levels() - 1)->size() &&
!FilesInCompaction(expanded1)) { !FilesInCompaction(expanded1)) {
Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log, Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
"[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64 "[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64
" bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n", " bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n",
cf_name.c_str(), level, c->inputs_[0].size(), c->inputs_[1].size(), cf_name.c_str(), level, c->inputs_[0].size(),
inputs0_size, inputs1_size, expanded0.size(), expanded1.size(), c->inputs(c->num_input_levels() - 1)->size(), inputs0_size,
expanded0_size, inputs1_size); inputs1_size, expanded0.size(), expanded1.size(), expanded0_size,
inputs1_size);
smallest = new_start; smallest = new_start;
largest = new_limit; largest = new_limit;
c->inputs_[0].files = expanded0; c->inputs_[0].files = expanded0;
c->inputs_[1].files = expanded1; c->inputs_[c->num_input_levels() - 1].files = expanded1;
GetRange(c->inputs_[0].files, c->inputs_[1].files, GetRange(c->inputs_[0].files,
&all_start, &all_limit); c->inputs_[c->num_input_levels() - 1].files, &all_start,
&all_limit);
} }
} }
} }
// Compute the set of grandparent files that overlap this compaction // Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2) // (parent == level+1; grandparent == level+2)
if (level + 2 < NumberLevels()) { if (c->output_level() + 1 < NumberLevels()) {
vstorage->GetOverlappingInputs(level + 2, &all_start, &all_limit, vstorage->GetOverlappingInputs(c->output_level() + 1, &all_start,
&c->grandparents_); &all_limit, &c->grandparents_);
} }
} }
@ -682,9 +695,6 @@ Compaction* LevelCompactionPicker::PickCompaction(
Compaction* c = nullptr; Compaction* c = nullptr;
int level = -1; int level = -1;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
//
// Find the compactions by size on all levels. // Find the compactions by size on all levels.
for (int i = 0; i < NumberLevels() - 1; i++) { for (int i = 0; i < NumberLevels() - 1; i++) {
double score = vstorage->CompactionScore(i); double score = vstorage->CompactionScore(i);
@ -723,7 +733,7 @@ Compaction* LevelCompactionPicker::PickCompaction(
// cause the 'smallest' and 'largest' key to get extended to a // cause the 'smallest' and 'largest' key to get extended to a
// larger range. So, re-invoke GetRange to get the new key range // larger range. So, re-invoke GetRange to get the new key range
GetRange(c->inputs_[0].files, &smallest, &largest); GetRange(c->inputs_[0].files, &smallest, &largest);
if (ParentRangeInCompaction(vstorage, &smallest, &largest, level, if (RangeInCompaction(vstorage, &smallest, &largest, c->output_level(),
&c->parent_index_)) { &c->parent_index_)) {
delete c; delete c;
return nullptr; return nullptr;
@ -731,7 +741,7 @@ Compaction* LevelCompactionPicker::PickCompaction(
assert(!c->inputs_[0].empty()); assert(!c->inputs_[0].empty());
} }
// Setup "level+1" files (inputs_[1]) // Setup input files from output level
SetupOtherInputs(cf_name, mutable_cf_options, vstorage, c); SetupOtherInputs(cf_name, mutable_cf_options, vstorage, c);
// mark all the files that are being compacted // mark all the files that are being compacted
@ -810,12 +820,19 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
} }
assert(level >= 0); assert(level >= 0);
assert(level + 1 < NumberLevels()); int output_level;
c = new Compaction(vstorage->num_levels(), level, level + 1, if (level == 0) {
mutable_cf_options.MaxFileSizeForLevel(level + 1), output_level = vstorage->base_level();
} else {
output_level = level + 1;
}
assert(output_level < NumberLevels());
c = new Compaction(vstorage->num_levels(), level, output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(level), mutable_cf_options.MaxGrandParentOverlapBytes(level),
GetPathId(ioptions_, mutable_cf_options, level + 1), GetPathId(ioptions_, mutable_cf_options, output_level),
GetCompressionType(ioptions_, level + 1)); GetCompressionType(ioptions_, output_level));
c->score_ = score; c->score_ = score;
// Pick the largest file in this level that is not already // Pick the largest file in this level that is not already
@ -850,8 +867,8 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
// Do not pick this file if its parents at level+1 are being compacted. // Do not pick this file if its parents at level+1 are being compacted.
// Maybe we can avoid redoing this work in SetupOtherInputs // Maybe we can avoid redoing this work in SetupOtherInputs
int parent_index = -1; int parent_index = -1;
if (ParentRangeInCompaction(vstorage, &f->smallest, &f->largest, level, if (RangeInCompaction(vstorage, &f->smallest, &f->largest,
&parent_index)) { c->output_level(), &parent_index)) {
continue; continue;
} }
c->inputs_[0].files.push_back(f); c->inputs_[0].files.push_back(f);

View File

@ -139,10 +139,9 @@ class CompactionPicker {
VersionStorageInfo* vstorage, Compaction* c); VersionStorageInfo* vstorage, Compaction* c);
// Returns true if any one of the parent files are being compacted // Returns true if any one of the parent files are being compacted
bool ParentRangeInCompaction(VersionStorageInfo* vstorage, bool RangeInCompaction(VersionStorageInfo* vstorage,
const InternalKey* smallest, const InternalKey* smallest,
const InternalKey* largest, int level, const InternalKey* largest, int level, int* index);
int* index);
void SetupOtherInputs(const std::string& cf_name, void SetupOtherInputs(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,

View File

@ -59,6 +59,7 @@ class CompactionPickerTest {
options_.num_levels = num_levels; options_.num_levels = num_levels;
vstorage_.reset(new VersionStorageInfo( vstorage_.reset(new VersionStorageInfo(
&icmp_, ucmp_, options_.num_levels, style, nullptr)); &icmp_, ucmp_, options_.num_levels, style, nullptr));
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
} }
void DeleteVersionStorage() { void DeleteVersionStorage() {
@ -82,6 +83,7 @@ class CompactionPickerTest {
} }
void UpdateVersionStorageInfo() { void UpdateVersionStorageInfo() {
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
vstorage_->UpdateFilesBySize(); vstorage_->UpdateFilesBySize();
vstorage_->UpdateNumNonEmptyLevels(); vstorage_->UpdateNumNonEmptyLevels();
vstorage_->GenerateFileIndexer(); vstorage_->GenerateFileIndexer();
@ -186,9 +188,10 @@ TEST(CompactionPickerTest, LevelMaxScore) {
TEST(CompactionPickerTest, NeedsCompactionLevel) { TEST(CompactionPickerTest, NeedsCompactionLevel) {
const int kLevels = 6; const int kLevels = 6;
const int kFileCount = 20; const int kFileCount = 20;
for (int level = 0; level < kLevels - 1; ++level) { for (int level = 0; level < kLevels - 1; ++level) {
uint64_t file_size = NewVersionStorage(kLevels, kCompactionStyleLevel);
mutable_cf_options_.MaxBytesForLevel(level) * 2 / kFileCount; uint64_t file_size = vstorage_->MaxBytesForLevel(level) * 2 / kFileCount;
for (int file_count = 1; file_count <= kFileCount; ++file_count) { for (int file_count = 1; file_count <= kFileCount; ++file_count) {
// start a brand new version in each test. // start a brand new version in each test.
NewVersionStorage(kLevels, kCompactionStyleLevel); NewVersionStorage(kLevels, kCompactionStyleLevel);
@ -207,6 +210,137 @@ TEST(CompactionPickerTest, NeedsCompactionLevel) {
} }
} }
TEST(CompactionPickerTest, Level0TriggerDynamic) {
int num_levels = ioptions_.num_levels;
ioptions_.level_compaction_dynamic_level_bytes = true;
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
mutable_cf_options_.max_bytes_for_level_base = 200;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
NewVersionStorage(num_levels, kCompactionStyleLevel);
Add(0, 1U, "150", "200");
Add(0, 2U, "200", "250");
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(num_levels, static_cast<int>(compaction->num_input_levels()));
ASSERT_EQ(num_levels - 1, compaction->output_level());
}
TEST(CompactionPickerTest, Level0TriggerDynamic2) {
int num_levels = ioptions_.num_levels;
ioptions_.level_compaction_dynamic_level_bytes = true;
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
mutable_cf_options_.max_bytes_for_level_base = 200;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
NewVersionStorage(num_levels, kCompactionStyleLevel);
Add(0, 1U, "150", "200");
Add(0, 2U, "200", "250");
Add(num_levels - 1, 3U, "200", "250", 300U);
UpdateVersionStorageInfo();
ASSERT_EQ(vstorage_->base_level(), num_levels - 2);
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(num_levels - 1, static_cast<int>(compaction->num_input_levels()));
ASSERT_EQ(num_levels - 2, compaction->output_level());
}
TEST(CompactionPickerTest, Level0TriggerDynamic3) {
int num_levels = ioptions_.num_levels;
ioptions_.level_compaction_dynamic_level_bytes = true;
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
mutable_cf_options_.max_bytes_for_level_base = 200;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
NewVersionStorage(num_levels, kCompactionStyleLevel);
Add(0, 1U, "150", "200");
Add(0, 2U, "200", "250");
Add(num_levels - 1, 3U, "200", "250", 300U);
Add(num_levels - 1, 4U, "300", "350", 3000U);
UpdateVersionStorageInfo();
ASSERT_EQ(vstorage_->base_level(), num_levels - 3);
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(num_levels - 2, static_cast<int>(compaction->num_input_levels()));
ASSERT_EQ(num_levels - 3, compaction->output_level());
}
TEST(CompactionPickerTest, Level0TriggerDynamic4) {
int num_levels = ioptions_.num_levels;
ioptions_.level_compaction_dynamic_level_bytes = true;
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
mutable_cf_options_.max_bytes_for_level_base = 200;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
NewVersionStorage(num_levels, kCompactionStyleLevel);
Add(0, 1U, "150", "200");
Add(0, 2U, "200", "250");
Add(num_levels - 1, 3U, "200", "250", 300U);
Add(num_levels - 1, 4U, "300", "350", 3000U);
Add(num_levels - 3, 5U, "150", "180", 3U);
Add(num_levels - 3, 6U, "181", "300", 3U);
Add(num_levels - 3, 7U, "400", "450", 3U);
UpdateVersionStorageInfo();
ASSERT_EQ(vstorage_->base_level(), num_levels - 3);
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(2U, compaction->num_input_files(num_levels - 3));
ASSERT_EQ(5U, compaction->input(num_levels - 3, 0)->fd.GetNumber());
ASSERT_EQ(6U, compaction->input(num_levels - 3, 1)->fd.GetNumber());
ASSERT_EQ(num_levels - 2, static_cast<int>(compaction->num_input_levels()));
ASSERT_EQ(num_levels - 3, compaction->output_level());
}
TEST(CompactionPickerTest, LevelTriggerDynamic4) {
int num_levels = ioptions_.num_levels;
ioptions_.level_compaction_dynamic_level_bytes = true;
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
mutable_cf_options_.max_bytes_for_level_base = 200;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
NewVersionStorage(num_levels, kCompactionStyleLevel);
Add(0, 1U, "150", "200");
Add(num_levels - 1, 3U, "200", "250", 300U);
Add(num_levels - 1, 4U, "300", "350", 3000U);
Add(num_levels - 1, 4U, "400", "450", 3U);
Add(num_levels - 2, 5U, "150", "180", 300U);
Add(num_levels - 2, 6U, "181", "350", 500U);
Add(num_levels - 2, 7U, "400", "450", 200U);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(6U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->num_input_files(1));
ASSERT_EQ(3U, compaction->input(1, 0)->fd.GetNumber());
ASSERT_EQ(4U, compaction->input(1, 1)->fd.GetNumber());
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(num_levels - 1, compaction->output_level());
}
TEST(CompactionPickerTest, NeedsCompactionUniversal) { TEST(CompactionPickerTest, NeedsCompactionUniversal) {
NewVersionStorage(1, kCompactionStyleUniversal); NewVersionStorage(1, kCompactionStyleUniversal);
UniversalCompactionPicker universal_compaction_picker( UniversalCompactionPicker universal_compaction_picker(

View File

@ -335,6 +335,9 @@ DEFINE_int32(target_file_size_multiplier, 1,
DEFINE_uint64(max_bytes_for_level_base, 10 * 1048576, "Max bytes for level-1"); DEFINE_uint64(max_bytes_for_level_base, 10 * 1048576, "Max bytes for level-1");
DEFINE_bool(level_compaction_dynamic_level_bytes, false,
"Whether level size base is dynamic");
DEFINE_int32(max_bytes_for_level_multiplier, 10, DEFINE_int32(max_bytes_for_level_multiplier, 10,
"A multiplier to compute max bytes for level-N (N >= 2)"); "A multiplier to compute max bytes for level-N (N >= 2)");
@ -1933,6 +1936,8 @@ class Benchmark {
options.target_file_size_base = FLAGS_target_file_size_base; options.target_file_size_base = FLAGS_target_file_size_base;
options.target_file_size_multiplier = FLAGS_target_file_size_multiplier; options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base; options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
options.level_compaction_dynamic_level_bytes =
FLAGS_level_compaction_dynamic_level_bytes;
options.max_bytes_for_level_multiplier = options.max_bytes_for_level_multiplier =
FLAGS_max_bytes_for_level_multiplier; FLAGS_max_bytes_for_level_multiplier;
options.filter_deletes = FLAGS_filter_deletes; options.filter_deletes = FLAGS_filter_deletes;

View File

@ -108,8 +108,7 @@ Options SanitizeOptions(const std::string& dbname,
const InternalKeyComparator* icmp, const InternalKeyComparator* icmp,
const Options& src) { const Options& src) {
auto db_options = SanitizeOptions(dbname, DBOptions(src)); auto db_options = SanitizeOptions(dbname, DBOptions(src));
auto cf_options = SanitizeOptions(icmp, ColumnFamilyOptions(src), auto cf_options = SanitizeOptions(db_options, icmp, ColumnFamilyOptions(src));
db_options.info_log.get());
return Options(db_options, cf_options); return Options(db_options, cf_options);
} }
@ -1514,8 +1513,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
// stop if level i is not empty // stop if level i is not empty
if (vstorage->NumLevelFiles(i) > 0) break; if (vstorage->NumLevelFiles(i) > 0) break;
// stop if level i is too small (cannot fit the level files) // stop if level i is too small (cannot fit the level files)
if (mutable_cf_options.MaxBytesForLevel(i) < if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
vstorage->NumLevelBytes(level)) {
break; break;
} }

View File

@ -2825,9 +2825,8 @@ TEST(DBTest, IgnoreRecoveredLog) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;
options.merge_operator = MergeOperators::CreateUInt64AddOperator(); options.merge_operator = MergeOperators::CreateUInt64AddOperator();
options.wal_dir = dbname_ + "/wal"; options.wal_dir = dbname_ + "/logs";
Destroy(options); DestroyAndReopen(options);
Reopen(options);
// fill up the DB // fill up the DB
std::string one, two; std::string one, two;
@ -10184,6 +10183,239 @@ TEST(DBTest, ThreadStatusSingleCompaction) {
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
TEST(DBTest, DynamicLevelMaxBytesBase) {
// Use InMemoryEnv, or it would be too slow.
unique_ptr<Env> env(new MockEnv(env_));
const int kNKeys = 1000;
int keys[kNKeys];
auto verify_func = [&]() {
for (int i = 0; i < kNKeys; i++) {
ASSERT_NE("NOT_FOUND", Get(Key(i)));
ASSERT_NE("NOT_FOUND", Get(Key(kNKeys * 2 + i)));
if (i < kNKeys / 10) {
ASSERT_EQ("NOT_FOUND", Get(Key(kNKeys + keys[i])));
} else {
ASSERT_NE("NOT_FOUND", Get(Key(kNKeys + keys[i])));
}
}
};
Random rnd(301);
for (int ordered_insert = 0; ordered_insert <= 1; ordered_insert++) {
for (int i = 0; i < kNKeys; i++) {
keys[i] = i;
}
if (ordered_insert == 0) {
std::random_shuffle(std::begin(keys), std::end(keys));
}
for (int max_background_compactions = 1; max_background_compactions < 4;
max_background_compactions += 2) {
Options options;
options.env = env.get();
options.create_if_missing = true;
options.db_write_buffer_size = 2048;
options.write_buffer_size = 2048;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 2;
options.target_file_size_base = 2048;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 10240;
options.max_bytes_for_level_multiplier = 4;
options.hard_rate_limit = 1.1;
options.max_background_compactions = max_background_compactions;
options.num_levels = 5;
DestroyAndReopen(options);
for (int i = 0; i < kNKeys; i++) {
int key = keys[i];
ASSERT_OK(Put(Key(kNKeys + key), RandomString(&rnd, 102)));
ASSERT_OK(Put(Key(key), RandomString(&rnd, 102)));
ASSERT_OK(Put(Key(kNKeys * 2 + key), RandomString(&rnd, 102)));
ASSERT_OK(Delete(Key(kNKeys + keys[i / 10])));
env_->SleepForMicroseconds(5000);
}
uint64_t int_prop;
ASSERT_TRUE(db_->GetIntProperty("rocksdb.background-errors", &int_prop));
ASSERT_EQ(0U, int_prop);
// Verify DB
for (int j = 0; j < 2; j++) {
verify_func();
if (j == 0) {
Reopen(options);
}
}
// Test compact range works
dbfull()->CompactRange(nullptr, nullptr);
// All data should be in the last level.
ColumnFamilyMetaData cf_meta;
db_->GetColumnFamilyMetaData(&cf_meta);
ASSERT_EQ(5U, cf_meta.levels.size());
for (int i = 0; i < 4; i++) {
ASSERT_EQ(0U, cf_meta.levels[i].files.size());
}
ASSERT_GT(cf_meta.levels[4U].files.size(), 0U);
verify_func();
Close();
}
}
env_->SetBackgroundThreads(1, Env::LOW);
env_->SetBackgroundThreads(1, Env::HIGH);
}
// Test specific cases in dynamic max bytes
TEST(DBTest, DynamicLevelMaxBytesBase2) {
Random rnd(301);
int kMaxKey = 1000000;
Options options = CurrentOptions();
options.create_if_missing = true;
options.db_write_buffer_size = 2048;
options.write_buffer_size = 2048;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 9999;
options.level0_stop_writes_trigger = 9999;
options.target_file_size_base = 2048;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 10240;
options.max_bytes_for_level_multiplier = 4;
options.max_background_compactions = 2;
options.num_levels = 5;
options.expanded_compaction_factor = 0; // Force not expanding in compactions
BlockBasedTableOptions table_options;
table_options.block_size = 1024;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "true"},
}));
uint64_t int_prop;
std::string str_prop;
// Initial base level is the last level
ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop));
ASSERT_EQ(4U, int_prop);
// Put about 7K to L0
for (int i = 0; i < 70; i++) {
ASSERT_OK(Put(Key(static_cast<int>(rnd.Uniform(kMaxKey))),
RandomString(&rnd, 80)));
}
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "false"},
}));
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop));
ASSERT_EQ(4U, int_prop);
// Insert extra about 3.5K to L0. After they are compacted to L4, base level
// should be changed to L3.
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "true"},
}));
for (int i = 0; i < 70; i++) {
ASSERT_OK(Put(Key(static_cast<int>(rnd.Uniform(kMaxKey))),
RandomString(&rnd, 80)));
}
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "false"},
}));
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop));
ASSERT_EQ(3U, int_prop);
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level3", &str_prop));
ASSERT_EQ("0", str_prop);
// Trigger parallel compaction, and the first one would change the base
// level.
// Hold compaction jobs to make sure
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run:Start",
[&]() { env_->SleepForMicroseconds(100000); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "true"},
}));
// Write about 10K more
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(static_cast<int>(rnd.Uniform(kMaxKey))),
RandomString(&rnd, 80)));
}
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "false"},
}));
Flush();
// Wait for 200 milliseconds before proceeding compactions to make sure two
// parallel ones are executed.
env_->SleepForMicroseconds(200000);
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop));
ASSERT_EQ(3U, int_prop);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// Trigger a condition that the compaction changes base level and L0->Lbase
// happens at the same time.
// We try to make last levels' targets to be 10K, 40K, 160K, add triggers
// another compaction from 40K->160K.
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "true"},
}));
// Write about 150K more
for (int i = 0; i < 1350; i++) {
ASSERT_OK(Put(Key(static_cast<int>(rnd.Uniform(kMaxKey))),
RandomString(&rnd, 80)));
}
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "false"},
}));
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop));
ASSERT_EQ(2U, int_prop);
// Keep Writing data until base level changed 2->1. There will be L0->L2
// compaction going on at the same time.
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (int attempt = 0; attempt <= 20; attempt++) {
// Write about 5K more data with two flushes. It should be flush to level 2
// but when it is applied, base level is already 1.
for (int i = 0; i < 50; i++) {
ASSERT_OK(Put(Key(static_cast<int>(rnd.Uniform(kMaxKey))),
RandomString(&rnd, 80)));
}
Flush();
ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop));
if (int_prop == 2U) {
env_->SleepForMicroseconds(50000);
} else {
break;
}
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
env_->SleepForMicroseconds(200000);
ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop));
ASSERT_EQ(1U, int_prop);
}
TEST(DBTest, DynamicCompactionOptions) { TEST(DBTest, DynamicCompactionOptions) {
// minimum write buffer size is enforced at 64KB // minimum write buffer size is enforced at 64KB
const uint64_t k32KB = 1 << 15; const uint64_t k32KB = 1 << 15;

View File

@ -142,6 +142,8 @@ DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property,
return kOldestSnapshotTime; return kOldestSnapshotTime;
} else if (in == "num-live-versions") { } else if (in == "num-live-versions") {
return kNumLiveVersions; return kNumLiveVersions;
} else if (in == "base-level") {
return kBaseLevel;
} }
return kUnknown; return kUnknown;
} }
@ -284,6 +286,9 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type,
*value = db->IsFileDeletionsEnabled(); *value = db->IsFileDeletionsEnabled();
return true; return true;
#endif #endif
case kBaseLevel:
*value = vstorage->base_level();
return true;
default: default:
return false; return false;
} }

View File

@ -51,6 +51,7 @@ enum DBPropertyType : uint32_t {
kNumSnapshots, // Number of snapshots in the system kNumSnapshots, // Number of snapshots in the system
kOldestSnapshotTime, // Unix timestamp of the first snapshot kOldestSnapshotTime, // Unix timestamp of the first snapshot
kNumLiveVersions, kNumLiveVersions,
kBaseLevel, // The level that L0 data is compacted to
}; };
extern DBPropertyType GetPropertyType(const Slice& property, extern DBPropertyType GetPropertyType(const Slice& property,

View File

@ -53,15 +53,17 @@ class VersionBuilderTest {
void Add(int level, uint32_t file_number, const char* smallest, void Add(int level, uint32_t file_number, const char* smallest,
const char* largest, uint64_t file_size = 0, uint32_t path_id = 0, const char* largest, uint64_t file_size = 0, uint32_t path_id = 0,
SequenceNumber smallest_seq = 100, SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100,
SequenceNumber largest_seq = 100,
uint64_t num_entries = 0, uint64_t num_deletions = 0, uint64_t num_entries = 0, uint64_t num_deletions = 0,
bool sampled = false) { bool sampled = false, SequenceNumber smallest_seqno = 0,
SequenceNumber largest_seqno = 0) {
assert(level < vstorage_.num_levels()); assert(level < vstorage_.num_levels());
FileMetaData* f = new FileMetaData; FileMetaData* f = new FileMetaData;
f->fd = FileDescriptor(file_number, path_id, file_size); f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = GetInternalKey(smallest, smallest_seq); f->smallest = GetInternalKey(smallest, smallest_seq);
f->largest = GetInternalKey(largest, largest_seq); f->largest = GetInternalKey(largest, largest_seq);
f->smallest_seqno = smallest_seqno;
f->largest_seqno = largest_seqno;
f->compensated_file_size = file_size; f->compensated_file_size = file_size;
f->refs = 0; f->refs = 0;
f->num_entries = num_entries; f->num_entries = num_entries;
@ -78,20 +80,31 @@ class VersionBuilderTest {
vstorage_.UpdateNumNonEmptyLevels(); vstorage_.UpdateNumNonEmptyLevels();
vstorage_.GenerateFileIndexer(); vstorage_.GenerateFileIndexer();
vstorage_.GenerateLevelFilesBrief(); vstorage_.GenerateLevelFilesBrief();
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
vstorage_.SetFinalized(); vstorage_.SetFinalized();
} }
}; };
void UnrefFilesInVersion(VersionStorageInfo* new_vstorage) {
for (int i = 0; i < new_vstorage->num_levels(); i++) {
for (auto* f : new_vstorage->LevelFiles(i)) {
if (--f->refs == 0) {
delete f;
}
}
}
}
TEST(VersionBuilderTest, ApplyAndSaveTo) { TEST(VersionBuilderTest, ApplyAndSaveTo) {
Add(0, 1U, "150", "200", 100U); Add(0, 1U, "150", "200", 100U);
// Level 1 score 1.2
Add(1, 66U, "150", "200", 100U); Add(1, 66U, "150", "200", 100U);
Add(1, 88U, "201", "300", 100U); Add(1, 88U, "201", "300", 100U);
// Level 2 score 1.8. File 7 is the largest. Should be picked
Add(2, 6U, "150", "179", 100U); Add(2, 6U, "150", "179", 100U);
Add(2, 7U, "180", "220", 100U); Add(2, 7U, "180", "220", 100U);
Add(2, 8U, "221", "300", 100U); Add(2, 8U, "221", "300", 100U);
// Level 3 score slightly larger than 1
Add(3, 26U, "150", "170", 100U); Add(3, 26U, "150", "170", 100U);
Add(3, 27U, "171", "179", 100U); Add(3, 27U, "171", "179", 100U);
Add(3, 28U, "191", "220", 100U); Add(3, 28U, "191", "220", 100U);
@ -115,13 +128,83 @@ TEST(VersionBuilderTest, ApplyAndSaveTo) {
ASSERT_EQ(400U, new_vstorage.NumLevelBytes(2)); ASSERT_EQ(400U, new_vstorage.NumLevelBytes(2));
ASSERT_EQ(300U, new_vstorage.NumLevelBytes(3)); ASSERT_EQ(300U, new_vstorage.NumLevelBytes(3));
for (int i = 0; i < new_vstorage.num_levels(); i++) { UnrefFilesInVersion(&new_vstorage);
for (auto* f : new_vstorage.LevelFiles(i)) { }
if (--f->refs == 0) {
delete f; TEST(VersionBuilderTest, ApplyAndSaveToDynamic) {
} ioptions_.level_compaction_dynamic_level_bytes = true;
}
} Add(0, 1U, "150", "200", 100U, 0, 200U, 200U, 0, 0, false, 200U, 200U);
Add(0, 88U, "201", "300", 100U, 0, 100U, 100U, 0, 0, false, 100U, 100U);
Add(4, 6U, "150", "179", 100U);
Add(4, 7U, "180", "220", 100U);
Add(4, 8U, "221", "300", 100U);
Add(5, 26U, "150", "170", 100U);
Add(5, 27U, "171", "179", 100U);
UpdateVersionStorageInfo();
VersionEdit version_edit;
version_edit.AddFile(3, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200);
version_edit.DeleteFile(0, 1U);
version_edit.DeleteFile(0, 88U);
EnvOptions env_options;
VersionBuilder version_builder(env_options, nullptr, &vstorage_);
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, nullptr);
version_builder.Apply(&version_edit);
version_builder.SaveTo(&new_vstorage);
ASSERT_EQ(0U, new_vstorage.NumLevelBytes(0));
ASSERT_EQ(100U, new_vstorage.NumLevelBytes(3));
ASSERT_EQ(300U, new_vstorage.NumLevelBytes(4));
ASSERT_EQ(200U, new_vstorage.NumLevelBytes(5));
UnrefFilesInVersion(&new_vstorage);
}
TEST(VersionBuilderTest, ApplyAndSaveToDynamic2) {
ioptions_.level_compaction_dynamic_level_bytes = true;
Add(0, 1U, "150", "200", 100U, 0, 200U, 200U, 0, 0, false, 200U, 200U);
Add(0, 88U, "201", "300", 100U, 0, 100U, 100U, 0, 0, false, 100U, 100U);
Add(4, 6U, "150", "179", 100U);
Add(4, 7U, "180", "220", 100U);
Add(4, 8U, "221", "300", 100U);
Add(5, 26U, "150", "170", 100U);
Add(5, 27U, "171", "179", 100U);
UpdateVersionStorageInfo();
VersionEdit version_edit;
version_edit.AddFile(4, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200);
version_edit.DeleteFile(0, 1U);
version_edit.DeleteFile(0, 88U);
version_edit.DeleteFile(4, 6U);
version_edit.DeleteFile(4, 7U);
version_edit.DeleteFile(4, 8U);
EnvOptions env_options;
VersionBuilder version_builder(env_options, nullptr, &vstorage_);
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, nullptr);
version_builder.Apply(&version_edit);
version_builder.SaveTo(&new_vstorage);
ASSERT_EQ(0U, new_vstorage.NumLevelBytes(0));
ASSERT_EQ(100U, new_vstorage.NumLevelBytes(4));
ASSERT_EQ(200U, new_vstorage.NumLevelBytes(5));
UnrefFilesInVersion(&new_vstorage);
} }
TEST(VersionBuilderTest, ApplyMultipleAndSaveTo) { TEST(VersionBuilderTest, ApplyMultipleAndSaveTo) {
@ -150,13 +233,7 @@ TEST(VersionBuilderTest, ApplyMultipleAndSaveTo) {
ASSERT_EQ(500U, new_vstorage.NumLevelBytes(2)); ASSERT_EQ(500U, new_vstorage.NumLevelBytes(2));
for (int i = 0; i < new_vstorage.num_levels(); i++) { UnrefFilesInVersion(&new_vstorage);
for (auto* f : new_vstorage.LevelFiles(i)) {
if (--f->refs == 0) {
delete f;
}
}
}
} }
TEST(VersionBuilderTest, ApplyDeleteAndSaveTo) { TEST(VersionBuilderTest, ApplyDeleteAndSaveTo) {
@ -193,13 +270,7 @@ TEST(VersionBuilderTest, ApplyDeleteAndSaveTo) {
ASSERT_EQ(300U, new_vstorage.NumLevelBytes(2)); ASSERT_EQ(300U, new_vstorage.NumLevelBytes(2));
for (int i = 0; i < new_vstorage.num_levels(); i++) { UnrefFilesInVersion(&new_vstorage);
for (auto* f : new_vstorage.LevelFiles(i)) {
if (--f->refs == 0) {
delete f;
}
}
}
} }
TEST(VersionBuilderTest, EstimatedActiveKeys) { TEST(VersionBuilderTest, EstimatedActiveKeys) {

View File

@ -726,6 +726,7 @@ VersionStorageInfo::VersionStorageInfo(
file_indexer_(user_comparator), file_indexer_(user_comparator),
compaction_style_(compaction_style), compaction_style_(compaction_style),
files_(new std::vector<FileMetaData*>[num_levels_]), files_(new std::vector<FileMetaData*>[num_levels_]),
base_level_(1),
files_by_size_(num_levels_), files_by_size_(num_levels_),
next_file_to_compact_by_size_(num_levels_), next_file_to_compact_by_size_(num_levels_),
compaction_score_(num_levels_), compaction_score_(num_levels_),
@ -856,10 +857,11 @@ void VersionStorageInfo::GenerateLevelFilesBrief() {
} }
} }
void Version::PrepareApply() { void Version::PrepareApply(const MutableCFOptions& mutable_cf_options) {
UpdateAccumulatedStats(); UpdateAccumulatedStats();
storage_info_.UpdateFilesBySize();
storage_info_.UpdateNumNonEmptyLevels(); storage_info_.UpdateNumNonEmptyLevels();
storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
storage_info_.UpdateFilesBySize();
storage_info_.GenerateFileIndexer(); storage_info_.GenerateFileIndexer();
storage_info_.GenerateLevelFilesBrief(); storage_info_.GenerateLevelFilesBrief();
} }
@ -1018,7 +1020,7 @@ void VersionStorageInfo::ComputeCompactionScore(
} }
} }
score = static_cast<double>(level_bytes_no_compacting) / score = static_cast<double>(level_bytes_no_compacting) /
mutable_cf_options.MaxBytesForLevel(level); MaxBytesForLevel(level);
if (max_score < score) { if (max_score < score) {
max_score = score; max_score = score;
max_score_level = level; max_score_level = level;
@ -1077,6 +1079,45 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
level_files->push_back(f); level_files->push_back(f);
} }
// Version::PrepareApply() need to be called before calling the function, or
// following functions called:
// 1. UpdateNumNonEmptyLevels();
// 2. CalculateBaseBytes();
// 3. UpdateFilesBySize();
// 4. GenerateFileIndexer();
// 5. GenerateLevelFilesBrief();
void VersionStorageInfo::SetFinalized() {
finalized_ = true;
#ifndef NDEBUG
assert(base_level_ >= 1 && (num_levels() <= 1 || base_level_ < num_levels()));
// Verify all levels newer than base_level are empty except L0
for (int level = 1; level < base_level(); level++) {
assert(NumLevelBytes(level) == 0);
}
uint64_t max_bytes_prev_level = 0;
for (int level = base_level(); level < num_levels() - 1; level++) {
if (LevelFiles(level).size() == 0) {
continue;
}
assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
max_bytes_prev_level = MaxBytesForLevel(level);
}
int num_empty_non_l0_level = 0;
for (int level = 0; level < num_levels(); level++) {
assert(LevelFiles(level).size() == 0 ||
LevelFiles(level).size() == LevelFilesBrief(level).num_files);
if (level > 0 && NumLevelBytes(level) > 0) {
num_empty_non_l0_level++;
}
if (LevelFiles(level).size() > 0) {
assert(level < num_non_empty_levels());
}
}
assert(compaction_level_.size() > 0);
assert(compaction_level_.size() == compaction_score_.size());
#endif
}
void VersionStorageInfo::UpdateNumNonEmptyLevels() { void VersionStorageInfo::UpdateNumNonEmptyLevels() {
num_non_empty_levels_ = num_levels_; num_non_empty_levels_ = num_levels_;
for (int i = num_levels_ - 1; i >= 0; i--) { for (int i = num_levels_ - 1; i >= 0; i--) {
@ -1399,7 +1440,15 @@ uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
const char* VersionStorageInfo::LevelSummary( const char* VersionStorageInfo::LevelSummary(
LevelSummaryStorage* scratch) const { LevelSummaryStorage* scratch) const {
int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files["); int len = 0;
if (num_levels() > 1) {
assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
len = snprintf(scratch->buffer, sizeof(scratch->buffer),
"base level %d max bytes base %" PRIu64, base_level_,
level_max_bytes_[base_level_]);
}
len +=
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " files[");
for (int i = 0; i < num_levels(); i++) { for (int i = 0; i < num_levels(); i++) {
int sz = sizeof(scratch->buffer) - len; int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size())); int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
@ -1452,6 +1501,113 @@ int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
return result; return result;
} }
uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
assert(level >= 0);
assert(level < static_cast<int>(level_max_bytes_.size()));
return level_max_bytes_[level];
}
void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
const MutableCFOptions& options) {
level_max_bytes_.resize(ioptions.num_levels);
if (!ioptions.level_compaction_dynamic_level_bytes) {
base_level_ = 1;
// Calculate for static bytes base case
for (int i = 0; i < ioptions.num_levels; ++i) {
if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
level_max_bytes_[i] = options.max_bytes_for_level_base;
} else if (i > 1) {
level_max_bytes_[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes_[i - 1],
options.max_bytes_for_level_multiplier),
options.max_bytes_for_level_multiplier_additional[i - 1]);
} else {
level_max_bytes_[i] = options.max_bytes_for_level_base;
}
}
} else {
uint64_t max_level_size = 0;
int first_non_empty_level = -1;
// Find size of non-L0 level of most data.
// Cannot use the size of the last level because it can be empty or less
// than previous levels after compaction.
for (int i = 1; i < num_levels_; i++) {
uint64_t total_size = 0;
for (const auto& f : files_[i]) {
total_size += f->fd.GetFileSize();
}
if (total_size > 0 && first_non_empty_level == -1) {
first_non_empty_level = i;
}
if (total_size > max_level_size) {
max_level_size = total_size;
}
}
// Prefill every level's max bytes to disallow compaction from there.
for (int i = 0; i < num_levels_; i++) {
level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
}
if (max_level_size == 0) {
// No data for L1 and up. L0 compacts to last level directly.
// No compaction from L1+ needs to be scheduled.
base_level_ = num_levels_ - 1;
} else {
uint64_t base_bytes_max = options.max_bytes_for_level_base;
uint64_t base_bytes_min =
base_bytes_max / options.max_bytes_for_level_multiplier;
// Try whether we can make last level's target size to be max_level_size
uint64_t cur_level_size = max_level_size;
for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
// Round up after dividing
cur_level_size /= options.max_bytes_for_level_multiplier;
}
// Calculate base level and its size.
int base_level_size;
if (cur_level_size <= base_bytes_min) {
// Case 1. If we make target size of last level to be max_level_size,
// target size of the first non-empty level would be smaller than
// base_bytes_min. We set it be base_bytes_min.
base_level_size = static_cast<int>(base_bytes_min + 1);
base_level_ = first_non_empty_level;
Warn(ioptions.info_log,
"More existing levels in DB than needed. "
"max_bytes_for_level_multiplier may not be guaranteed.");
} else {
// Find base level (where L0 data is compacted to).
base_level_ = first_non_empty_level;
while (base_level_ > 1 && cur_level_size > base_bytes_max) {
--base_level_;
cur_level_size =
cur_level_size / options.max_bytes_for_level_multiplier;
}
if (cur_level_size > base_bytes_max) {
// Even L1 will be too large
assert(base_level_ == 1);
base_level_size = static_cast<int>(base_bytes_max);
} else {
base_level_size = static_cast<int>(cur_level_size);
}
}
int level_size = base_level_size;
for (int i = base_level_; i < num_levels_; i++) {
if (i > base_level_) {
level_size = level_size * options.max_bytes_for_level_multiplier;
}
level_max_bytes_[i] = level_size;
}
}
}
}
void Version::AddLiveFiles(std::vector<FileDescriptor>* live) { void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
for (int level = 0; level < storage_info_.num_levels(); level++) { for (int level = 0; level < storage_info_.num_levels(); level++) {
const std::vector<FileMetaData*>& files = storage_info_.files_[level]; const std::vector<FileMetaData*>& files = storage_info_.files_[level];
@ -1679,7 +1835,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (!edit->IsColumnFamilyManipulation()) { if (!edit->IsColumnFamilyManipulation()) {
// This is cpu-heavy operations, which should be called outside mutex. // This is cpu-heavy operations, which should be called outside mutex.
v->PrepareApply(); v->PrepareApply(mutable_cf_options);
} }
// Write new record to MANIFEST log // Write new record to MANIFEST log
@ -2102,7 +2258,7 @@ Status VersionSet::Recover(
builder->SaveTo(v->storage_info()); builder->SaveTo(v->storage_info());
// Install recovered version // Install recovered version
v->PrepareApply(); v->PrepareApply(*cfd->GetLatestMutableCFOptions());
AppendVersion(cfd, v); AppendVersion(cfd, v);
} }
@ -2436,7 +2592,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
Version* v = new Version(cfd, this, current_version_number_++); Version* v = new Version(cfd, this, current_version_number_++);
builder->SaveTo(v->storage_info()); builder->SaveTo(v->storage_info());
v->PrepareApply(); v->PrepareApply(*cfd->GetLatestMutableCFOptions());
printf("--------------- Column family \"%s\" (ID %u) --------------\n", printf("--------------- Column family \"%s\" (ID %u) --------------\n",
cfd->GetName().c_str(), (unsigned int)cfd->GetID()); cfd->GetName().c_str(), (unsigned int)cfd->GetID());
@ -2700,6 +2856,18 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
"[%s] compaction output being applied to a different base version from" "[%s] compaction output being applied to a different base version from"
" input version", " input version",
c->column_family_data()->GetName().c_str()); c->column_family_data()->GetName().c_str());
if (c->start_level() == 0 && c->num_input_levels() > 2U) {
// We are doing a L0->base_level compaction. The assumption is if
// base level is not L1, levels from L1 to base_level - 1 is empty.
// This is ensured by having one compaction from L0 going on at the
// same time in level-based compaction. So that during the time, no
// compaction/flush can put files to those levels.
for (int l = c->start_level() + 1; l < c->output_level(); l++) {
if (vstorage->NumLevelFiles(l) != 0) {
return false;
}
}
}
} }
for (size_t input = 0; input < c->num_input_levels(); ++input) { for (size_t input = 0; input < c->num_input_levels(); ++input) {
@ -2797,6 +2965,9 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
Version* v = new Version(new_cfd, this, current_version_number_++); Version* v = new Version(new_cfd, this, current_version_number_++);
// Fill level target base information.
v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
*new_cfd->GetLatestMutableCFOptions());
AppendVersion(new_cfd, v); AppendVersion(new_cfd, v);
// GetLatestMutableCFOptions() is safe here without mutex since the // GetLatestMutableCFOptions() is safe here without mutex since the
// cfd is not available to client // cfd is not available to client

View File

@ -63,8 +63,7 @@ class MergeIteratorBuilder;
// REQUIRES: "file_level.files" contains a sorted list of // REQUIRES: "file_level.files" contains a sorted list of
// non-overlapping files. // non-overlapping files.
extern int FindFile(const InternalKeyComparator& icmp, extern int FindFile(const InternalKeyComparator& icmp,
const LevelFilesBrief& file_level, const LevelFilesBrief& file_level, const Slice& key);
const Slice& key);
// Returns true iff some file in "files" overlaps the user key range // Returns true iff some file in "files" overlaps the user key range
// [*smallest,*largest]. // [*smallest,*largest].
@ -72,8 +71,7 @@ extern int FindFile(const InternalKeyComparator& icmp,
// largest==nullptr represents a key largest than all keys in the DB. // largest==nullptr represents a key largest than all keys in the DB.
// REQUIRES: If disjoint_sorted_files, file_level.files[] // REQUIRES: If disjoint_sorted_files, file_level.files[]
// contains disjoint ranges in sorted order. // contains disjoint ranges in sorted order.
extern bool SomeFileOverlapsRange( extern bool SomeFileOverlapsRange(const InternalKeyComparator& icmp,
const InternalKeyComparator& icmp,
bool disjoint_sorted_files, bool disjoint_sorted_files,
const LevelFilesBrief& file_level, const LevelFilesBrief& file_level,
const Slice* smallest_user_key, const Slice* smallest_user_key,
@ -98,7 +96,7 @@ class VersionStorageInfo {
void AddFile(int level, FileMetaData* f); void AddFile(int level, FileMetaData* f);
void SetFinalized() { finalized_ = true; } void SetFinalized();
// Update num_non_empty_levels_. // Update num_non_empty_levels_.
void UpdateNumNonEmptyLevels(); void UpdateNumNonEmptyLevels();
@ -148,14 +146,16 @@ class VersionStorageInfo {
int* file_index = nullptr); // return index of overlap file int* file_index = nullptr); // return index of overlap file
void GetOverlappingInputsBinarySearch( void GetOverlappingInputsBinarySearch(
int level, const Slice& begin, // nullptr means before all keys int level,
const Slice& begin, // nullptr means before all keys
const Slice& end, // nullptr means after all keys const Slice& end, // nullptr means after all keys
std::vector<FileMetaData*>* inputs, std::vector<FileMetaData*>* inputs,
int hint_index, // index of overlap file int hint_index, // index of overlap file
int* file_index); // return index of overlap file int* file_index); // return index of overlap file
void ExtendOverlappingInputs( void ExtendOverlappingInputs(
int level, const Slice& begin, // nullptr means before all keys int level,
const Slice& begin, // nullptr means before all keys
const Slice& end, // nullptr means after all keys const Slice& end, // nullptr means after all keys
std::vector<FileMetaData*>* inputs, std::vector<FileMetaData*>* inputs,
unsigned int index); // start extending from this index unsigned int index); // start extending from this index
@ -213,6 +213,8 @@ class VersionStorageInfo {
return files_by_size_[level]; return files_by_size_[level];
} }
int base_level() const { return base_level_; }
// REQUIRES: lock is held // REQUIRES: lock is held
// Set the index that is used to offset into files_by_size_ to find // Set the index that is used to offset into files_by_size_ to find
// the next compaction candidate file. // the next compaction candidate file.
@ -281,12 +283,22 @@ class VersionStorageInfo {
return internal_comparator_; return internal_comparator_;
} }
// Returns maximum total bytes of data on a given level.
uint64_t MaxBytesForLevel(int level) const;
// Must be called after any change to MutableCFOptions.
void CalculateBaseBytes(const ImmutableCFOptions& ioptions,
const MutableCFOptions& options);
private: private:
const InternalKeyComparator* internal_comparator_; const InternalKeyComparator* internal_comparator_;
const Comparator* user_comparator_; const Comparator* user_comparator_;
int num_levels_; // Number of levels int num_levels_; // Number of levels
int num_non_empty_levels_; // Number of levels. Any level larger than it int num_non_empty_levels_; // Number of levels. Any level larger than it
// is guaranteed to be empty. // is guaranteed to be empty.
// Per-level max bytes
std::vector<uint64_t> level_max_bytes_;
// A short brief metadata of files per level // A short brief metadata of files per level
autovector<rocksdb::LevelFilesBrief> level_files_brief_; autovector<rocksdb::LevelFilesBrief> level_files_brief_;
FileIndexer file_indexer_; FileIndexer file_indexer_;
@ -298,6 +310,10 @@ class VersionStorageInfo {
// in increasing order of keys // in increasing order of keys
std::vector<FileMetaData*>* files_; std::vector<FileMetaData*>* files_;
// Level that L0 data should be compacted to. All levels < base_level_ should
// be empty.
int base_level_;
// A list for the same set of files that are stored in files_, // A list for the same set of files that are stored in files_,
// but files in each level are now sorted based on file // but files in each level are now sorted based on file
// size. The file with the largest size is at the front. // size. The file with the largest size is at the front.
@ -366,7 +382,7 @@ class Version {
// Loads some stats information from files. Call without mutex held. It needs // Loads some stats information from files. Call without mutex held. It needs
// to be called before applying the version to the version set. // to be called before applying the version to the version set.
void PrepareApply(); void PrepareApply(const MutableCFOptions& mutable_cf_options);
// Reference count management (so Versions do not disappear out from // Reference count management (so Versions do not disappear out from
// under live iterators) // under live iterators)
@ -407,7 +423,6 @@ class Version {
ColumnFamilyData* cfd() const { return cfd_; } ColumnFamilyData* cfd() const { return cfd_; }
// Return the next Version in the linked list. Used for debug only // Return the next Version in the linked list. Used for debug only
Version* TEST_Next() const { Version* TEST_Next() const {
return next_; return next_;

View File

@ -73,6 +73,149 @@ TEST(GenerateLevelFilesBriefTest, Multiple) {
ASSERT_EQ(0, Compare()); ASSERT_EQ(0, Compare());
} }
class CountingLogger : public Logger {
public:
CountingLogger() : log_count(0) {}
using Logger::Logv;
virtual void Logv(const char* format, va_list ap) override { log_count++; }
int log_count;
};
Options GetOptionsWithNumLevels(int num_levels,
std::shared_ptr<CountingLogger> logger) {
Options opt;
opt.num_levels = num_levels;
opt.info_log = logger;
return opt;
}
class VersionStorageInfoTest {
public:
const Comparator* ucmp_;
InternalKeyComparator icmp_;
std::shared_ptr<CountingLogger> logger_;
Options options_;
ImmutableCFOptions ioptions_;
MutableCFOptions mutable_cf_options_;
VersionStorageInfo vstorage_;
InternalKey GetInternalKey(const char* ukey,
SequenceNumber smallest_seq = 100) {
return InternalKey(ukey, smallest_seq, kTypeValue);
}
VersionStorageInfoTest()
: ucmp_(BytewiseComparator()),
icmp_(ucmp_),
logger_(new CountingLogger()),
options_(GetOptionsWithNumLevels(6, logger_)),
ioptions_(options_),
mutable_cf_options_(options_, ioptions_),
vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr) {}
~VersionStorageInfoTest() {
for (int i = 0; i < vstorage_.num_levels(); i++) {
for (auto* f : vstorage_.LevelFiles(i)) {
if (--f->refs == 0) {
delete f;
}
}
}
}
void Add(int level, uint32_t file_number, const char* smallest,
const char* largest, uint64_t file_size = 0) {
assert(level < vstorage_.num_levels());
FileMetaData* f = new FileMetaData;
f->fd = FileDescriptor(file_number, 0, file_size);
f->smallest = GetInternalKey(smallest, 0);
f->largest = GetInternalKey(largest, 0);
f->compensated_file_size = file_size;
f->refs = 0;
f->num_entries = 0;
f->num_deletions = 0;
vstorage_.AddFile(level, f);
}
};
TEST(VersionStorageInfoTest, MaxBytesForLevelStatic) {
ioptions_.level_compaction_dynamic_level_bytes = false;
mutable_cf_options_.max_bytes_for_level_base = 10;
mutable_cf_options_.max_bytes_for_level_multiplier = 5;
Add(4, 100U, "1", "2");
Add(5, 101U, "1", "2");
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U);
ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U);
ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U);
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U);
ASSERT_EQ(0, logger_->log_count);
}
TEST(VersionStorageInfoTest, MaxBytesForLevelDynamic) {
ioptions_.level_compaction_dynamic_level_bytes = true;
mutable_cf_options_.max_bytes_for_level_base = 1000;
mutable_cf_options_.max_bytes_for_level_multiplier = 5;
Add(5, 1U, "1", "2", 500U);
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
ASSERT_EQ(0, logger_->log_count);
ASSERT_EQ(vstorage_.base_level(), 5);
Add(5, 2U, "3", "4", 550U);
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
ASSERT_EQ(0, logger_->log_count);
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 210U);
ASSERT_EQ(vstorage_.base_level(), 4);
Add(4, 3U, "3", "4", 550U);
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
ASSERT_EQ(0, logger_->log_count);
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 210U);
ASSERT_EQ(vstorage_.base_level(), 4);
Add(3, 4U, "3", "4", 250U);
Add(3, 5U, "5", "7", 300U);
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
ASSERT_EQ(1, logger_->log_count);
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U);
ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 201U);
ASSERT_EQ(vstorage_.base_level(), 3);
Add(1, 6U, "3", "4", 5U);
Add(1, 7U, "8", "9", 5U);
logger_->log_count = 0;
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
ASSERT_EQ(1, logger_->log_count);
ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U);
ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U);
ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U);
ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 201U);
ASSERT_EQ(vstorage_.base_level(), 1);
}
TEST(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) {
ioptions_.level_compaction_dynamic_level_bytes = true;
mutable_cf_options_.max_bytes_for_level_base = 100;
mutable_cf_options_.max_bytes_for_level_multiplier = 2;
Add(0, 1U, "1", "2", 50U);
Add(1, 2U, "1", "2", 50U);
Add(2, 3U, "1", "2", 500U);
Add(3, 4U, "1", "2", 500U);
Add(4, 5U, "1", "2", 1700U);
Add(5, 6U, "1", "2", 500U);
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U);
ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U);
ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U);
ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U);
ASSERT_EQ(vstorage_.base_level(), 1);
ASSERT_EQ(0, logger_->log_count);
}
class FindLevelFileTest { class FindLevelFileTest {
public: public:
LevelFilesBrief file_level_; LevelFilesBrief file_level_;

View File

@ -87,6 +87,8 @@ struct ImmutableCFOptions {
CompressionOptions compression_opts; CompressionOptions compression_opts;
bool level_compaction_dynamic_level_bytes;
Options::AccessHint access_hint_on_compaction_start; Options::AccessHint access_hint_on_compaction_start;
int num_levels; int num_levels;

View File

@ -343,6 +343,65 @@ struct ColumnFamilyOptions {
// Dynamically changeable through SetOptions() API // Dynamically changeable through SetOptions() API
uint64_t max_bytes_for_level_base; uint64_t max_bytes_for_level_base;
// If true, RocksDB will pick target size of each level dynamically.
// We will pick a base level b >= 1. L0 will be directly merged into level b,
// instead of always into level 1. Level 1 to b-1 need to be empty.
// We try to pick b and its target size so that
// 1. target size is in the range of
// (max_bytes_for_level_base / max_bytes_for_level_multiplier,
// max_bytes_for_level_base]
// 2. target size of the last level (level num_levels-1) equals to extra size
// of the level.
// At the same time max_bytes_for_level_multiplier and
// max_bytes_for_level_multiplier_additional are still satisfied.
//
// With this option on, from an empty DB, we make last level the base level,
// which means merging L0 data into the last level, until it exceeds
// max_bytes_for_level_base. And then we make the second last level to be
// base level, to start to merge L0 data to second last level, with its
// target size to be 1/max_bytes_for_level_multiplier of the last level's
// extra size. After the data accumulates more so that we need to move the
// base level to the third last one, and so on.
//
// For example, assume max_bytes_for_level_multiplier=10, num_levels=6,
// and max_bytes_for_level_base=10MB.
// Target sizes of level 1 to 5 starts with:
// [- - - - 10MB]
// with base level is level. Target sizes of level 1 to 4 are not applicable
// because they will not be used.
// Until the size of Level 5 grows to more than 10MB, say 11MB, we make
// base target to level 4 and now the targets looks like:
// [- - - 1.1MB 11MB]
// While data are accumulated, size targets are tuned based on actual data
// of level 5. When level 5 has 50MB of data, the target is like:
// [- - - 5MB 50MB]
// Until level 5's actual size is more than 100MB, say 101MB. Now if we keep
// level 4 to be the base level, its target size needs to be 10.1MB, which
// doesn't satisfy the target size range. So now we make level 3 the target
// size and the target sizes of the levels look like:
// [- - 1.01MB 10.1MB 101MB]
// In the same way, while level 5 further grows, all levels' targets grow,
// like
// [- - 5MB 50MB 500MB]
// Until level 5 exceeds 1000MB and becomes 1001MB, we make level 2 the
// base level and make levels' target sizes like this:
// [- 1.001MB 10.01MB 100.1MB 1001MB]
// and go on...
//
// By doing it, we give max_bytes_for_level_multiplier a priority against
// max_bytes_for_level_base, for a more predictable LSM tree shape. It is
// useful to limit worse case space amplification.
//
// max_bytes_for_level_multiplier_additional is ignored with this flag on.
//
// Turning this feature on or off for an existing DB can cause unexpected
// LSM tree structure so it's not recommended.
//
// NOTE: this option is experimental
//
// Default: false
bool level_compaction_dynamic_level_bytes;
// Default: 10. // Default: 10.
// //
// Dynamically changeable through SetOptions() API // Dynamically changeable through SetOptions() API

View File

@ -18,7 +18,6 @@
namespace rocksdb { namespace rocksdb {
namespace {
// Multiple two operands. If they overflow, return op1. // Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) {
if (op1 == 0) { if (op1 == 0) {
@ -33,26 +32,18 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) {
} }
return op1 * casted_op2; return op1 * casted_op2;
} }
} // anonymous namespace
void MutableCFOptions::RefreshDerivedOptions( void MutableCFOptions::RefreshDerivedOptions(
const ImmutableCFOptions& ioptions) { const ImmutableCFOptions& ioptions) {
max_file_size.resize(ioptions.num_levels); max_file_size.resize(ioptions.num_levels);
level_max_bytes.resize(ioptions.num_levels);
for (int i = 0; i < ioptions.num_levels; ++i) { for (int i = 0; i < ioptions.num_levels; ++i) {
if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) { if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
max_file_size[i] = ULLONG_MAX; max_file_size[i] = ULLONG_MAX;
level_max_bytes[i] = max_bytes_for_level_base;
} else if (i > 1) { } else if (i > 1) {
max_file_size[i] = MultiplyCheckOverflow(max_file_size[i - 1], max_file_size[i] = MultiplyCheckOverflow(max_file_size[i - 1],
target_file_size_multiplier); target_file_size_multiplier);
level_max_bytes[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes[i - 1],
max_bytes_for_level_multiplier),
max_bytes_for_level_multiplier_additional[i - 1]);
} else { } else {
max_file_size[i] = target_file_size_base; max_file_size[i] = target_file_size_base;
level_max_bytes[i] = max_bytes_for_level_base;
} }
} }
} }
@ -62,13 +53,6 @@ uint64_t MutableCFOptions::MaxFileSizeForLevel(int level) const {
assert(level < (int)max_file_size.size()); assert(level < (int)max_file_size.size());
return max_file_size[level]; return max_file_size[level];
} }
uint64_t MutableCFOptions::MaxBytesForLevel(int level) const {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
assert(level >= 0);
assert(level < (int)level_max_bytes.size());
return level_max_bytes[level];
}
uint64_t MutableCFOptions::MaxGrandParentOverlapBytes(int level) const { uint64_t MutableCFOptions::MaxGrandParentOverlapBytes(int level) const {
return MaxFileSizeForLevel(level) * max_grandparent_overlap_factor; return MaxFileSizeForLevel(level) * max_grandparent_overlap_factor;
} }

View File

@ -79,8 +79,6 @@ struct MutableCFOptions {
// Get the max file size in a given level. // Get the max file size in a given level.
uint64_t MaxFileSizeForLevel(int level) const; uint64_t MaxFileSizeForLevel(int level) const;
// Returns maximum total bytes of data on a given level.
uint64_t MaxBytesForLevel(int level) const;
// Returns maximum total overlap bytes with grandparent // Returns maximum total overlap bytes with grandparent
// level (i.e., level+2) before we stop building a single // level (i.e., level+2) before we stop building a single
// file in level->level+1 compaction. // file in level->level+1 compaction.
@ -124,8 +122,8 @@ struct MutableCFOptions {
// Derived options // Derived options
// Per-level target file size. // Per-level target file size.
std::vector<uint64_t> max_file_size; std::vector<uint64_t> max_file_size;
// Per-level max bytes
std::vector<uint64_t> level_max_bytes;
}; };
uint64_t MultiplyCheckOverflow(uint64_t op1, int op2);
} // namespace rocksdb } // namespace rocksdb

View File

@ -65,13 +65,18 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
compression(options.compression), compression(options.compression),
compression_per_level(options.compression_per_level), compression_per_level(options.compression_per_level),
compression_opts(options.compression_opts), compression_opts(options.compression_opts),
level_compaction_dynamic_level_bytes(
options.level_compaction_dynamic_level_bytes),
access_hint_on_compaction_start(options.access_hint_on_compaction_start), access_hint_on_compaction_start(options.access_hint_on_compaction_start),
num_levels(options.num_levels), num_levels(options.num_levels),
optimize_filters_for_hits(options.optimize_filters_for_hits) optimize_filters_for_hits(options.optimize_filters_for_hits)
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
, listeners(options.listeners) {} ,
listeners(options.listeners) {
}
#else // ROCKSDB_LITE #else // ROCKSDB_LITE
{} {
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
ColumnFamilyOptions::ColumnFamilyOptions() ColumnFamilyOptions::ColumnFamilyOptions()
@ -94,6 +99,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
target_file_size_base(2 * 1048576), target_file_size_base(2 * 1048576),
target_file_size_multiplier(1), target_file_size_multiplier(1),
max_bytes_for_level_base(10 * 1048576), max_bytes_for_level_base(10 * 1048576),
level_compaction_dynamic_level_bytes(false),
max_bytes_for_level_multiplier(10), max_bytes_for_level_multiplier(10),
max_bytes_for_level_multiplier_additional(num_levels, 1), max_bytes_for_level_multiplier_additional(num_levels, 1),
expanded_compaction_factor(25), expanded_compaction_factor(25),
@ -123,9 +129,10 @@ ColumnFamilyOptions::ColumnFamilyOptions()
min_partial_merge_operands(2), min_partial_merge_operands(2),
optimize_filters_for_hits(false) optimize_filters_for_hits(false)
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
, listeners() { ,
listeners() {
#else // ROCKSDB_LITE #else // ROCKSDB_LITE
{ {
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
assert(memtable_factory.get() != nullptr); assert(memtable_factory.get() != nullptr);
} }
@ -153,6 +160,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
target_file_size_base(options.target_file_size_base), target_file_size_base(options.target_file_size_base),
target_file_size_multiplier(options.target_file_size_multiplier), target_file_size_multiplier(options.target_file_size_multiplier),
max_bytes_for_level_base(options.max_bytes_for_level_base), max_bytes_for_level_base(options.max_bytes_for_level_base),
level_compaction_dynamic_level_bytes(
options.level_compaction_dynamic_level_bytes),
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier), max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
max_bytes_for_level_multiplier_additional( max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional), options.max_bytes_for_level_multiplier_additional),
@ -189,9 +198,10 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
min_partial_merge_operands(options.min_partial_merge_operands), min_partial_merge_operands(options.min_partial_merge_operands),
optimize_filters_for_hits(options.optimize_filters_for_hits) optimize_filters_for_hits(options.optimize_filters_for_hits)
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
, listeners(options.listeners) { ,
listeners(options.listeners) {
#else // ROCKSDB_LITE #else // ROCKSDB_LITE
{ {
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
assert(memtable_factory.get() != nullptr); assert(memtable_factory.get() != nullptr);
if (max_bytes_for_level_multiplier_additional.size() < if (max_bytes_for_level_multiplier_additional.size() <
@ -411,6 +421,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
target_file_size_multiplier); target_file_size_multiplier);
Log(log," Options.max_bytes_for_level_base: %" PRIu64, Log(log," Options.max_bytes_for_level_base: %" PRIu64,
max_bytes_for_level_base); max_bytes_for_level_base);
Log(log, "Options.level_compaction_dynamic_level_bytes: %d",
level_compaction_dynamic_level_bytes);
Log(log," Options.max_bytes_for_level_multiplier: %d", Log(log," Options.max_bytes_for_level_multiplier: %d",
max_bytes_for_level_multiplier); max_bytes_for_level_multiplier);
for (int i = 0; i < num_levels; i++) { for (int i = 0; i < num_levels; i++) {

View File

@ -410,6 +410,9 @@ bool ParseColumnFamilyOption(const std::string& name, const std::string& value,
ParseInt(value.substr(start, value.size() - start)); ParseInt(value.substr(start, value.size() - start));
} else if (name == "num_levels") { } else if (name == "num_levels") {
new_options->num_levels = ParseInt(value); new_options->num_levels = ParseInt(value);
} else if (name == "level_compaction_dynamic_level_bytes") {
new_options->level_compaction_dynamic_level_bytes =
ParseBoolean(name, value);
} else if (name == "purge_redundant_kvs_while_flush") { } else if (name == "purge_redundant_kvs_while_flush") {
new_options->purge_redundant_kvs_while_flush = new_options->purge_redundant_kvs_while_flush =
ParseBoolean(name, value); ParseBoolean(name, value);

View File

@ -96,7 +96,8 @@ TEST(OptionsTest, GetOptionsFromMapTest) {
{"max_write_buffer_number", "2"}, {"max_write_buffer_number", "2"},
{"min_write_buffer_number_to_merge", "3"}, {"min_write_buffer_number_to_merge", "3"},
{"compression", "kSnappyCompression"}, {"compression", "kSnappyCompression"},
{"compression_per_level", "kNoCompression:" {"compression_per_level",
"kNoCompression:"
"kSnappyCompression:" "kSnappyCompression:"
"kZlibCompression:" "kZlibCompression:"
"kBZip2Compression:" "kBZip2Compression:"
@ -111,6 +112,7 @@ TEST(OptionsTest, GetOptionsFromMapTest) {
{"target_file_size_base", "12"}, {"target_file_size_base", "12"},
{"target_file_size_multiplier", "13"}, {"target_file_size_multiplier", "13"},
{"max_bytes_for_level_base", "14"}, {"max_bytes_for_level_base", "14"},
{"level_compaction_dynamic_level_bytes", "true"},
{"max_bytes_for_level_multiplier", "15"}, {"max_bytes_for_level_multiplier", "15"},
{"max_bytes_for_level_multiplier_additional", "16:17:18"}, {"max_bytes_for_level_multiplier_additional", "16:17:18"},
{"expanded_compaction_factor", "19"}, {"expanded_compaction_factor", "19"},
@ -198,6 +200,7 @@ TEST(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.target_file_size_base, static_cast<uint64_t>(12)); ASSERT_EQ(new_cf_opt.target_file_size_base, static_cast<uint64_t>(12));
ASSERT_EQ(new_cf_opt.target_file_size_multiplier, 13); ASSERT_EQ(new_cf_opt.target_file_size_multiplier, 13);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_base, 14U); ASSERT_EQ(new_cf_opt.max_bytes_for_level_base, 14U);
ASSERT_EQ(new_cf_opt.level_compaction_dynamic_level_bytes, true);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier, 15); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier, 15);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional.size(), 3U); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional.size(), 3U);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional[0], 16); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional[0], 16);

View File

@ -34,6 +34,20 @@ bool SyncPoint::PredecessorsAllCleared(const std::string& point) {
return true; return true;
} }
void SyncPoint::SetCallBack(const std::string point,
std::function<void()> callback) {
std::unique_lock<std::mutex> lock(mutex_);
callbacks_[point] = callback;
}
void SyncPoint::ClearAllCallBacks() {
std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) {
cv_.wait(lock);
}
callbacks_.clear();
}
void SyncPoint::EnableProcessing() { void SyncPoint::EnableProcessing() {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
enabled_ = true; enabled_ = true;
@ -54,6 +68,16 @@ void SyncPoint::Process(const std::string& point) {
if (!enabled_) return; if (!enabled_) return;
auto callback_pair = callbacks_.find(point);
if (callback_pair != callbacks_.end()) {
num_callbacks_running_++;
mutex_.unlock();
callback_pair->second();
mutex_.lock();
num_callbacks_running_--;
cv_.notify_all();
}
while (!PredecessorsAllCleared(point)) { while (!PredecessorsAllCleared(point)) {
cv_.wait(lock); cv_.wait(lock);
} }

View File

@ -38,6 +38,11 @@ class SyncPoint {
// sync points // sync points
void LoadDependency(const std::vector<Dependency>& dependencies); void LoadDependency(const std::vector<Dependency>& dependencies);
// Set up a call back function in sync point.
void SetCallBack(const std::string point, std::function<void()> callback);
// Clear all call back functions.
void ClearAllCallBacks();
// enable sync point processing (disabled on startup) // enable sync point processing (disabled on startup)
void EnableProcessing(); void EnableProcessing();
@ -60,12 +65,14 @@ class SyncPoint {
// successor/predecessor map loaded from LoadDependency // successor/predecessor map loaded from LoadDependency
std::unordered_map<std::string, std::vector<std::string>> successors_; std::unordered_map<std::string, std::vector<std::string>> successors_;
std::unordered_map<std::string, std::vector<std::string>> predecessors_; std::unordered_map<std::string, std::vector<std::string>> predecessors_;
std::unordered_map<std::string, std::function<void()> > callbacks_;
std::mutex mutex_; std::mutex mutex_;
std::condition_variable cv_; std::condition_variable cv_;
// sync points that have been passed through // sync points that have been passed through
std::unordered_set<std::string> cleared_points_; std::unordered_set<std::string> cleared_points_;
bool enabled_ = false; bool enabled_ = false;
int num_callbacks_running_ = 0;
}; };
} // namespace rocksdb } // namespace rocksdb