CompactRange skips levels 1 to base_level -1 for dynamic level base size
Summary: CompactRange() now is much more expensive for dynamic level base size as it goes through all the levels. Skip those not used levels between level 0 an base level. Test Plan: Run all unit tests Reviewers: yhchiang, rven, anthony, kradhakrishnan, igor Reviewed By: igor Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D37125
This commit is contained in:
parent
3f0867c0fe
commit
6fa7085121
@ -536,6 +536,7 @@ Compaction* ColumnFamilyData::PickCompaction(
|
|||||||
}
|
}
|
||||||
|
|
||||||
const int ColumnFamilyData::kCompactAllLevels = -1;
|
const int ColumnFamilyData::kCompactAllLevels = -1;
|
||||||
|
const int ColumnFamilyData::kCompactToBaseLevel = -2;
|
||||||
|
|
||||||
Compaction* ColumnFamilyData::CompactRange(
|
Compaction* ColumnFamilyData::CompactRange(
|
||||||
const MutableCFOptions& mutable_cf_options,
|
const MutableCFOptions& mutable_cf_options,
|
||||||
|
@ -239,6 +239,8 @@ class ColumnFamilyData {
|
|||||||
// A flag to tell a manual compaction is to compact all levels together
|
// A flag to tell a manual compaction is to compact all levels together
|
||||||
// instad of for specific level.
|
// instad of for specific level.
|
||||||
static const int kCompactAllLevels;
|
static const int kCompactAllLevels;
|
||||||
|
// A flag to tell a manual compaction's output is base level.
|
||||||
|
static const int kCompactToBaseLevel;
|
||||||
// REQUIRES: DB mutex held
|
// REQUIRES: DB mutex held
|
||||||
Compaction* CompactRange(
|
Compaction* CompactRange(
|
||||||
const MutableCFOptions& mutable_cf_options,
|
const MutableCFOptions& mutable_cf_options,
|
||||||
|
@ -468,6 +468,11 @@ Compaction* CompactionPicker::CompactRange(
|
|||||||
}
|
}
|
||||||
|
|
||||||
CompactionInputFiles output_level_inputs;
|
CompactionInputFiles output_level_inputs;
|
||||||
|
if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
|
||||||
|
assert(input_level == 0);
|
||||||
|
output_level = vstorage->base_level();
|
||||||
|
assert(output_level > 0);
|
||||||
|
}
|
||||||
output_level_inputs.level = output_level;
|
output_level_inputs.level = output_level;
|
||||||
if (input_level != output_level) {
|
if (input_level != output_level) {
|
||||||
int parent_index = -1;
|
int parent_index = -1;
|
||||||
@ -487,13 +492,16 @@ Compaction* CompactionPicker::CompactRange(
|
|||||||
|
|
||||||
std::vector<FileMetaData*> grandparents;
|
std::vector<FileMetaData*> grandparents;
|
||||||
GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
|
GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
|
||||||
return new Compaction(
|
Compaction* compaction = new Compaction(
|
||||||
vstorage, mutable_cf_options, std::move(compaction_inputs), output_level,
|
vstorage, mutable_cf_options, std::move(compaction_inputs), output_level,
|
||||||
mutable_cf_options.MaxFileSizeForLevel(output_level),
|
mutable_cf_options.MaxFileSizeForLevel(output_level),
|
||||||
mutable_cf_options.MaxGrandParentOverlapBytes(input_level),
|
mutable_cf_options.MaxGrandParentOverlapBytes(input_level),
|
||||||
output_path_id,
|
output_path_id,
|
||||||
GetCompressionType(ioptions_, output_level, vstorage->base_level()),
|
GetCompressionType(ioptions_, output_level, vstorage->base_level()),
|
||||||
std::move(grandparents), /* is manual compaction */ true);
|
std::move(grandparents), /* is manual compaction */ true);
|
||||||
|
|
||||||
|
TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
|
||||||
|
return compaction;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
|
@ -1312,13 +1312,9 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
|
|||||||
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
|
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
|
||||||
cfd->NumberLevels() > 1) {
|
cfd->NumberLevels() > 1) {
|
||||||
// Always compact all files together.
|
// Always compact all files together.
|
||||||
int output_level = 0;
|
|
||||||
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
|
|
||||||
cfd->NumberLevels() > 1) {
|
|
||||||
output_level = cfd->NumberLevels() - 1;
|
|
||||||
}
|
|
||||||
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
|
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
|
||||||
output_level, target_path_id, begin, end);
|
cfd->NumberLevels() - 1, target_path_id, begin,
|
||||||
|
end);
|
||||||
} else {
|
} else {
|
||||||
for (int level = 0; level <= max_level_with_files; level++) {
|
for (int level = 0; level <= max_level_with_files; level++) {
|
||||||
// in case the compaction is unversal or if we're compacting the
|
// in case the compaction is unversal or if we're compacting the
|
||||||
@ -1330,8 +1326,13 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
|
|||||||
(level == max_level_with_files && level > 0)) {
|
(level == max_level_with_files && level > 0)) {
|
||||||
s = RunManualCompaction(cfd, level, level, target_path_id, begin, end);
|
s = RunManualCompaction(cfd, level, level, target_path_id, begin, end);
|
||||||
} else {
|
} else {
|
||||||
// TODO(sdong) Skip empty levels if possible.
|
int output_level = level + 1;
|
||||||
s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin,
|
if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
|
||||||
|
cfd->ioptions()->level_compaction_dynamic_level_bytes &&
|
||||||
|
level == 0) {
|
||||||
|
output_level = ColumnFamilyData::kCompactToBaseLevel;
|
||||||
|
}
|
||||||
|
s = RunManualCompaction(cfd, level, output_level, target_path_id, begin,
|
||||||
end);
|
end);
|
||||||
}
|
}
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
@ -2234,16 +2235,23 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
|||||||
m->output_path_id, m->begin, m->end, &manual_end));
|
m->output_path_id, m->begin, m->end, &manual_end));
|
||||||
if (!c) {
|
if (!c) {
|
||||||
m->done = true;
|
m->done = true;
|
||||||
}
|
LogToBuffer(log_buffer,
|
||||||
|
"[%s] Manual compaction from level-%d from %s .. "
|
||||||
|
"%s; nothing to do\n",
|
||||||
|
m->cfd->GetName().c_str(), m->input_level,
|
||||||
|
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
|
||||||
|
(m->end ? m->end->DebugString().c_str() : "(end)"));
|
||||||
|
} else {
|
||||||
LogToBuffer(log_buffer,
|
LogToBuffer(log_buffer,
|
||||||
"[%s] Manual compaction from level-%d to level-%d from %s .. "
|
"[%s] Manual compaction from level-%d to level-%d from %s .. "
|
||||||
"%s; will stop at %s\n",
|
"%s; will stop at %s\n",
|
||||||
m->cfd->GetName().c_str(), m->input_level, m->output_level,
|
m->cfd->GetName().c_str(), m->input_level, c->output_level(),
|
||||||
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
|
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
|
||||||
(m->end ? m->end->DebugString().c_str() : "(end)"),
|
(m->end ? m->end->DebugString().c_str() : "(end)"),
|
||||||
((m->done || manual_end == nullptr)
|
((m->done || manual_end == nullptr)
|
||||||
? "(end)"
|
? "(end)"
|
||||||
: manual_end->DebugString().c_str()));
|
: manual_end->DebugString().c_str()));
|
||||||
|
}
|
||||||
} else if (!compaction_queue_.empty()) {
|
} else if (!compaction_queue_.empty()) {
|
||||||
// cfd is referenced here
|
// cfd is referenced here
|
||||||
auto cfd = PopFirstFromCompactionQueue();
|
auto cfd = PopFirstFromCompactionQueue();
|
||||||
|
@ -11175,6 +11175,85 @@ TEST_F(DBTest, DynamicLevelMaxBytesBase2) {
|
|||||||
ASSERT_EQ(1U, int_prop);
|
ASSERT_EQ(1U, int_prop);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test specific cases in dynamic max bytes
|
||||||
|
TEST_F(DBTest, DynamicLevelMaxBytesCompactRange) {
|
||||||
|
Random rnd(301);
|
||||||
|
int kMaxKey = 1000000;
|
||||||
|
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.db_write_buffer_size = 2048;
|
||||||
|
options.write_buffer_size = 2048;
|
||||||
|
options.max_write_buffer_number = 2;
|
||||||
|
options.level0_file_num_compaction_trigger = 2;
|
||||||
|
options.level0_slowdown_writes_trigger = 9999;
|
||||||
|
options.level0_stop_writes_trigger = 9999;
|
||||||
|
options.target_file_size_base = 2;
|
||||||
|
options.level_compaction_dynamic_level_bytes = true;
|
||||||
|
options.max_bytes_for_level_base = 10240;
|
||||||
|
options.max_bytes_for_level_multiplier = 4;
|
||||||
|
options.max_background_compactions = 1;
|
||||||
|
const int kNumLevels = 5;
|
||||||
|
options.num_levels = kNumLevels;
|
||||||
|
options.expanded_compaction_factor = 0; // Force not expanding in compactions
|
||||||
|
BlockBasedTableOptions table_options;
|
||||||
|
table_options.block_size = 1024;
|
||||||
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||||
|
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
// Compact against empty DB
|
||||||
|
dbfull()->CompactRange(nullptr, nullptr);
|
||||||
|
|
||||||
|
uint64_t int_prop;
|
||||||
|
std::string str_prop;
|
||||||
|
|
||||||
|
// Initial base level is the last level
|
||||||
|
ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop));
|
||||||
|
ASSERT_EQ(4U, int_prop);
|
||||||
|
|
||||||
|
// Put about 7K to L0
|
||||||
|
for (int i = 0; i < 140; i++) {
|
||||||
|
ASSERT_OK(Put(Key(static_cast<int>(rnd.Uniform(kMaxKey))),
|
||||||
|
RandomString(&rnd, 80)));
|
||||||
|
}
|
||||||
|
Flush();
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
ASSERT_OK(
|
||||||
|
Put(Key(static_cast<int>(rnd.Uniform(kMaxKey))), RandomString(&rnd, 80)));
|
||||||
|
Flush();
|
||||||
|
|
||||||
|
ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop));
|
||||||
|
ASSERT_EQ(3U, int_prop);
|
||||||
|
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level1", &str_prop));
|
||||||
|
ASSERT_EQ("0", str_prop);
|
||||||
|
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level2", &str_prop));
|
||||||
|
ASSERT_EQ("0", str_prop);
|
||||||
|
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
|
|
||||||
|
std::set<int> output_levels;
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"CompactionPicker::CompactRange:Return", [&](void* arg) {
|
||||||
|
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
||||||
|
output_levels.insert(compaction->output_level());
|
||||||
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
dbfull()->CompactRange(nullptr, nullptr);
|
||||||
|
ASSERT_EQ(output_levels.size(), 2);
|
||||||
|
ASSERT_TRUE(output_levels.find(3) != output_levels.end());
|
||||||
|
ASSERT_TRUE(output_levels.find(4) != output_levels.end());
|
||||||
|
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &str_prop));
|
||||||
|
ASSERT_EQ("0", str_prop);
|
||||||
|
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level3", &str_prop));
|
||||||
|
ASSERT_EQ("0", str_prop);
|
||||||
|
// Base level is still level 3.
|
||||||
|
ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop));
|
||||||
|
ASSERT_EQ(3U, int_prop);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(DBTest, DynamicLevelMaxBytesBaseInc) {
|
TEST_F(DBTest, DynamicLevelMaxBytesBaseInc) {
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
options.create_if_missing = true;
|
options.create_if_missing = true;
|
||||||
|
@ -31,9 +31,7 @@ namespace rocksdb {
|
|||||||
|
|
||||||
// TODO(yhchiang): remove this function once c++14 is available
|
// TODO(yhchiang): remove this function once c++14 is available
|
||||||
// as std::max will be able to cover this.
|
// as std::max will be able to cover this.
|
||||||
constexpr int constexpr_max(int a, int b) {
|
constexpr int constexpr_max(int a, int b) { return a > b ? a : b; }
|
||||||
return a > b ? a : b;
|
|
||||||
}
|
|
||||||
|
|
||||||
// A structure that describes the current status of a thread.
|
// A structure that describes the current status of a thread.
|
||||||
// The status of active threads can be fetched using
|
// The status of active threads can be fetched using
|
||||||
@ -92,8 +90,8 @@ struct ThreadStatus {
|
|||||||
|
|
||||||
// The maximum number of properties of an operation.
|
// The maximum number of properties of an operation.
|
||||||
// This number should be set to the biggest NUM_XXX_PROPERTIES.
|
// This number should be set to the biggest NUM_XXX_PROPERTIES.
|
||||||
static const int kNumOperationProperties = constexpr_max(
|
static const int kNumOperationProperties =
|
||||||
NUM_COMPACTION_PROPERTIES, NUM_FLUSH_PROPERTIES);
|
constexpr_max(NUM_COMPACTION_PROPERTIES, NUM_FLUSH_PROPERTIES);
|
||||||
|
|
||||||
// The type used to refer to a thread state.
|
// The type used to refer to a thread state.
|
||||||
// A state describes lower-level action of a thread
|
// A state describes lower-level action of a thread
|
||||||
|
Loading…
x
Reference in New Issue
Block a user