Add max_subcompactions as a compaction option
Summary: Sometimes we want to compact files as fast as possible, but don't want to set a large `max_subcompactions` in the `DBOptions` by default. I add a `max_subcompactions` options to `CompactionOptions` so that we can choose a proper concurrency dynamically. Closes https://github.com/facebook/rocksdb/pull/3775 Differential Revision: D7792357 Pulled By: ajkr fbshipit-source-id: 94f54c3784dce69e40a229721a79a97e80cd6a6c
This commit is contained in:
parent
7dfbe33532
commit
ed7a95b28c
@ -995,11 +995,13 @@ const int ColumnFamilyData::kCompactToBaseLevel = -2;
|
||||
|
||||
Compaction* ColumnFamilyData::CompactRange(
|
||||
const MutableCFOptions& mutable_cf_options, int input_level,
|
||||
int output_level, uint32_t output_path_id, const InternalKey* begin,
|
||||
const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
|
||||
int output_level, uint32_t output_path_id, uint32_t max_subcompactions,
|
||||
const InternalKey* begin, const InternalKey* end,
|
||||
InternalKey** compaction_end, bool* conflict) {
|
||||
auto* result = compaction_picker_->CompactRange(
|
||||
GetName(), mutable_cf_options, current_->storage_info(), input_level,
|
||||
output_level, output_path_id, begin, end, compaction_end, conflict);
|
||||
output_level, output_path_id, max_subcompactions, begin, end,
|
||||
compaction_end, conflict);
|
||||
if (result != nullptr) {
|
||||
result->SetInputVersion(current_);
|
||||
}
|
||||
|
@ -296,9 +296,9 @@ class ColumnFamilyData {
|
||||
// REQUIRES: DB mutex held
|
||||
Compaction* CompactRange(const MutableCFOptions& mutable_cf_options,
|
||||
int input_level, int output_level,
|
||||
uint32_t output_path_id, const InternalKey* begin,
|
||||
const InternalKey* end, InternalKey** compaction_end,
|
||||
bool* manual_conflict);
|
||||
uint32_t output_path_id, uint32_t max_subcompactions,
|
||||
const InternalKey* begin, const InternalKey* end,
|
||||
InternalKey** compaction_end, bool* manual_conflict);
|
||||
|
||||
CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
|
||||
// thread-safe
|
||||
|
@ -134,6 +134,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
|
||||
int _output_level, uint64_t _target_file_size,
|
||||
uint64_t _max_compaction_bytes, uint32_t _output_path_id,
|
||||
CompressionType _compression,
|
||||
uint32_t _max_subcompactions,
|
||||
std::vector<FileMetaData*> _grandparents,
|
||||
bool _manual_compaction, double _score,
|
||||
bool _deletion_compaction,
|
||||
@ -143,6 +144,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
|
||||
output_level_(_output_level),
|
||||
max_output_file_size_(_target_file_size),
|
||||
max_compaction_bytes_(_max_compaction_bytes),
|
||||
max_subcompactions_(_max_subcompactions),
|
||||
immutable_cf_options_(_immutable_cf_options),
|
||||
mutable_cf_options_(_mutable_cf_options),
|
||||
input_version_(nullptr),
|
||||
@ -163,6 +165,9 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
|
||||
if (is_manual_compaction_) {
|
||||
compaction_reason_ = CompactionReason::kManualCompaction;
|
||||
}
|
||||
if (max_subcompactions_ == 0) {
|
||||
max_subcompactions_ = immutable_cf_options_.max_subcompactions;
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
for (size_t i = 1; i < inputs_.size(); ++i) {
|
||||
@ -442,7 +447,7 @@ bool Compaction::IsOutputLevelEmpty() const {
|
||||
}
|
||||
|
||||
bool Compaction::ShouldFormSubcompactions() const {
|
||||
if (immutable_cf_options_.max_subcompactions <= 1 || cfd_ == nullptr) {
|
||||
if (max_subcompactions_ <= 1 || cfd_ == nullptr) {
|
||||
return false;
|
||||
}
|
||||
if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
|
||||
|
@ -40,6 +40,7 @@ class Compaction {
|
||||
std::vector<CompactionInputFiles> inputs, int output_level,
|
||||
uint64_t target_file_size, uint64_t max_compaction_bytes,
|
||||
uint32_t output_path_id, CompressionType compression,
|
||||
uint32_t max_subcompactions,
|
||||
std::vector<FileMetaData*> grandparents,
|
||||
bool manual_compaction = false, double score = -1,
|
||||
bool deletion_compaction = false,
|
||||
@ -241,6 +242,8 @@ class Compaction {
|
||||
|
||||
uint64_t max_compaction_bytes() const { return max_compaction_bytes_; }
|
||||
|
||||
uint32_t max_subcompactions() const { return max_subcompactions_; }
|
||||
|
||||
uint64_t MaxInputFileCreationTime() const;
|
||||
|
||||
private:
|
||||
@ -267,6 +270,7 @@ class Compaction {
|
||||
const int output_level_; // levels to which output files are stored
|
||||
uint64_t max_output_file_size_;
|
||||
uint64_t max_compaction_bytes_;
|
||||
uint32_t max_subcompactions_;
|
||||
const ImmutableCFOptions immutable_cf_options_;
|
||||
const MutableCFOptions mutable_cf_options_;
|
||||
Version* input_version_;
|
||||
|
@ -531,7 +531,7 @@ void CompactionJob::GenSubcompactionBoundaries() {
|
||||
c->mutable_cf_options()->MaxFileSizeForLevel(out_lvl)));
|
||||
uint64_t subcompactions =
|
||||
std::min({static_cast<uint64_t>(ranges.size()),
|
||||
static_cast<uint64_t>(db_options_.max_subcompactions),
|
||||
static_cast<uint64_t>(c->max_subcompactions()),
|
||||
max_output_files});
|
||||
|
||||
if (subcompactions > 1) {
|
||||
|
@ -246,7 +246,7 @@ class CompactionJobTest : public testing::Test {
|
||||
Compaction compaction(cfd->current()->storage_info(), *cfd->ioptions(),
|
||||
*cfd->GetLatestMutableCFOptions(),
|
||||
compaction_input_files, 1, 1024 * 1024,
|
||||
10 * 1024 * 1024, 0, kNoCompression, {}, true);
|
||||
10 * 1024 * 1024, 0, kNoCompression, 0, {}, true);
|
||||
compaction.SetInputVersion(cfd->current());
|
||||
|
||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
|
||||
|
@ -301,7 +301,9 @@ Compaction* CompactionPicker::CompactFiles(
|
||||
new Compaction(vstorage, ioptions_, mutable_cf_options, input_files,
|
||||
output_level, compact_options.output_file_size_limit,
|
||||
mutable_cf_options.max_compaction_bytes, output_path_id,
|
||||
compact_options.compression, /* grandparents */ {}, true);
|
||||
compact_options.compression,
|
||||
compact_options.max_subcompactions,
|
||||
/* grandparents */ {}, true);
|
||||
RegisterCompaction(c);
|
||||
return c;
|
||||
}
|
||||
@ -504,7 +506,8 @@ void CompactionPicker::GetGrandparents(
|
||||
Compaction* CompactionPicker::CompactRange(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, int input_level, int output_level,
|
||||
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
|
||||
uint32_t output_path_id, uint32_t max_subcompactions,
|
||||
const InternalKey* begin, const InternalKey* end,
|
||||
InternalKey** compaction_end, bool* manual_conflict) {
|
||||
// CompactionPickerFIFO has its own implementation of compact range
|
||||
assert(ioptions_.compaction_style != kCompactionStyleFIFO);
|
||||
@ -570,7 +573,7 @@ Compaction* CompactionPicker::CompactRange(
|
||||
/* max_compaction_bytes */ LLONG_MAX, output_path_id,
|
||||
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
|
||||
output_level, 1),
|
||||
/* grandparents */ {}, /* is manual */ true);
|
||||
max_subcompactions, /* grandparents */ {}, /* is manual */ true);
|
||||
RegisterCompaction(c);
|
||||
return c;
|
||||
}
|
||||
@ -677,7 +680,8 @@ Compaction* CompactionPicker::CompactRange(
|
||||
mutable_cf_options.max_compaction_bytes, output_path_id,
|
||||
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
|
||||
vstorage->base_level()),
|
||||
std::move(grandparents), /* is manual compaction */ true);
|
||||
/* max_subcompactions */ 0, std::move(grandparents),
|
||||
/* is manual compaction */ true);
|
||||
|
||||
TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
|
||||
RegisterCompaction(compaction);
|
||||
@ -1312,8 +1316,8 @@ Compaction* LevelCompactionBuilder::GetCompaction() {
|
||||
GetPathId(ioptions_, mutable_cf_options_, output_level_),
|
||||
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
|
||||
output_level_, vstorage_->base_level()),
|
||||
std::move(grandparents_), is_manual_, start_level_score_,
|
||||
false /* deletion_compaction */, compaction_reason_);
|
||||
/* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
|
||||
start_level_score_, false /* deletion_compaction */, compaction_reason_);
|
||||
|
||||
// If it's level 0 compaction, make sure we don't execute any other level 0
|
||||
// compactions in parallel
|
||||
@ -1561,8 +1565,9 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
|
||||
|
||||
Compaction* c = new Compaction(
|
||||
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
|
||||
kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0),
|
||||
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
|
||||
kNoCompression, /* max_subcompactions */ 0, {}, /* is manual */ false,
|
||||
vstorage->CompactionScore(0), /* is deletion compaction */ true,
|
||||
CompactionReason::kFIFOTtl);
|
||||
return c;
|
||||
}
|
||||
|
||||
@ -1599,9 +1604,9 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
|
||||
vstorage, ioptions_, mutable_cf_options, {comp_inputs}, 0,
|
||||
16 * 1024 * 1024 /* output file size limit */,
|
||||
0 /* max compaction bytes, not applicable */,
|
||||
0 /* output path ID */, mutable_cf_options.compression, {},
|
||||
/* is manual */ false, vstorage->CompactionScore(0),
|
||||
/* is deletion compaction */ false,
|
||||
0 /* output path ID */, mutable_cf_options.compression,
|
||||
0 /* max_subcompactions */, {}, /* is manual */ false,
|
||||
vstorage->CompactionScore(0), /* is deletion compaction */ false,
|
||||
CompactionReason::kFIFOReduceNumFiles);
|
||||
return c;
|
||||
}
|
||||
@ -1647,8 +1652,9 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
|
||||
|
||||
Compaction* c = new Compaction(
|
||||
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
|
||||
kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0),
|
||||
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
|
||||
kNoCompression, /* max_subcompactions */ 0, {}, /* is manual */ false,
|
||||
vstorage->CompactionScore(0), /* is deletion compaction */ true,
|
||||
CompactionReason::kFIFOMaxSize);
|
||||
return c;
|
||||
}
|
||||
|
||||
@ -1671,9 +1677,9 @@ Compaction* FIFOCompactionPicker::PickCompaction(
|
||||
Compaction* FIFOCompactionPicker::CompactRange(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, int input_level, int output_level,
|
||||
uint32_t /*output_path_id*/, const InternalKey* /*begin*/,
|
||||
const InternalKey* /*end*/, InternalKey** compaction_end,
|
||||
bool* /*manual_conflict*/) {
|
||||
uint32_t /*output_path_id*/, uint32_t /*max_subcompactions*/,
|
||||
const InternalKey* /*begin*/, const InternalKey* /*end*/,
|
||||
InternalKey** compaction_end, bool* /*manual_conflict*/) {
|
||||
#ifdef NDEBUG
|
||||
(void)input_level;
|
||||
(void)output_level;
|
||||
|
@ -58,7 +58,8 @@ class CompactionPicker {
|
||||
virtual Compaction* CompactRange(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, int input_level, int output_level,
|
||||
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
|
||||
uint32_t output_path_id, uint32_t max_subcompactions,
|
||||
const InternalKey* begin, const InternalKey* end,
|
||||
InternalKey** compaction_end, bool* manual_conflict);
|
||||
|
||||
// The maximum allowed output level. Default value is NumberLevels() - 1.
|
||||
@ -238,7 +239,8 @@ class FIFOCompactionPicker : public CompactionPicker {
|
||||
virtual Compaction* CompactRange(
|
||||
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
|
||||
VersionStorageInfo* vstorage, int input_level, int output_level,
|
||||
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
|
||||
uint32_t output_path_id, uint32_t max_subcompactions,
|
||||
const InternalKey* begin, const InternalKey* end,
|
||||
InternalKey** compaction_end, bool* manual_conflict) override;
|
||||
|
||||
// The maximum allowed output level. Always returns 0.
|
||||
@ -280,6 +282,7 @@ class NullCompactionPicker : public CompactionPicker {
|
||||
VersionStorageInfo* /*vstorage*/,
|
||||
int /*input_level*/, int /*output_level*/,
|
||||
uint32_t /*output_path_id*/,
|
||||
uint32_t /*max_subcompactions*/,
|
||||
const InternalKey* /*begin*/,
|
||||
const InternalKey* /*end*/,
|
||||
InternalKey** /*compaction_end*/,
|
||||
|
@ -612,8 +612,8 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
|
||||
mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id,
|
||||
GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level,
|
||||
1, enable_compression),
|
||||
/* grandparents */ {}, /* is manual */ false, score,
|
||||
false /* deletion_compaction */, compaction_reason);
|
||||
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
|
||||
score, false /* deletion_compaction */, compaction_reason);
|
||||
}
|
||||
|
||||
// Look at overall size amplification. If size amplification
|
||||
@ -746,8 +746,8 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
|
||||
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
|
||||
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
|
||||
output_level, 1),
|
||||
/* grandparents */ {}, /* is manual */ false, score,
|
||||
false /* deletion_compaction */,
|
||||
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
|
||||
score, false /* deletion_compaction */,
|
||||
CompactionReason::kUniversalSizeAmplification);
|
||||
}
|
||||
} // namespace rocksdb
|
||||
|
@ -342,6 +342,7 @@ class DBImpl : public DB {
|
||||
|
||||
Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
int output_level, uint32_t output_path_id,
|
||||
uint32_t max_subcompactions,
|
||||
const Slice* begin, const Slice* end,
|
||||
bool exclusive,
|
||||
bool disallow_trivial_move = false);
|
||||
|
@ -427,8 +427,8 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
|
||||
final_output_level--;
|
||||
}
|
||||
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
|
||||
final_output_level, options.target_path_id, begin,
|
||||
end, exclusive);
|
||||
final_output_level, options.target_path_id,
|
||||
options.max_subcompactions, begin, end, exclusive);
|
||||
} else {
|
||||
for (int level = 0; level <= max_level_with_files; level++) {
|
||||
int output_level;
|
||||
@ -462,7 +462,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
|
||||
}
|
||||
}
|
||||
s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
|
||||
begin, end, exclusive);
|
||||
options.max_subcompactions, begin, end, exclusive);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
@ -974,6 +974,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
|
||||
|
||||
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
int output_level, uint32_t output_path_id,
|
||||
uint32_t max_subcompactions,
|
||||
const Slice* begin, const Slice* end,
|
||||
bool exclusive, bool disallow_trivial_move) {
|
||||
assert(input_level == ColumnFamilyData::kCompactAllLevels ||
|
||||
@ -1061,8 +1062,9 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
(((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
|
||||
((compaction = manual.cfd->CompactRange(
|
||||
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
|
||||
manual.output_level, manual.output_path_id, manual.begin,
|
||||
manual.end, &manual.manual_end, &manual_conflict)) == nullptr &&
|
||||
manual.output_level, manual.output_path_id, max_subcompactions,
|
||||
manual.begin, manual.end, &manual.manual_end,
|
||||
&manual_conflict)) == nullptr &&
|
||||
manual_conflict))) {
|
||||
// exclusive manual compactions should not see a conflict during
|
||||
// CompactRange
|
||||
|
@ -80,7 +80,7 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
|
||||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO)
|
||||
? level
|
||||
: level + 1;
|
||||
return RunManualCompaction(cfd, level, output_level, 0, begin, end, true,
|
||||
return RunManualCompaction(cfd, level, output_level, 0, 0, begin, end, true,
|
||||
disallow_trivial_move);
|
||||
}
|
||||
|
||||
|
@ -433,8 +433,8 @@ TEST_F(DBRangeDelTest, ValidUniversalSubcompactionBoundaries) {
|
||||
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
|
||||
->cfd(),
|
||||
1 /* input_level */, 2 /* output_level */, 0 /* output_path_id */,
|
||||
nullptr /* begin */, nullptr /* end */, true /* exclusive */,
|
||||
true /* disallow_trivial_move */));
|
||||
0 /* max_subcompactions */, nullptr /* begin */, nullptr /* end */,
|
||||
true /* exclusive */, true /* disallow_trivial_move */));
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
|
@ -1185,10 +1185,13 @@ struct CompactionOptions {
|
||||
// Compaction will create files of size `output_file_size_limit`.
|
||||
// Default: MAX, which means that compaction will create a single file
|
||||
uint64_t output_file_size_limit;
|
||||
// If > 0, it will replace the option in the DBOptions for this compaction.
|
||||
uint32_t max_subcompactions;
|
||||
|
||||
CompactionOptions()
|
||||
: compression(kSnappyCompression),
|
||||
output_file_size_limit(std::numeric_limits<uint64_t>::max()) {}
|
||||
output_file_size_limit(std::numeric_limits<uint64_t>::max()),
|
||||
max_subcompactions(0) {}
|
||||
};
|
||||
|
||||
// For level based compaction, we can configure if we want to skip/force
|
||||
@ -1224,6 +1227,8 @@ struct CompactRangeOptions {
|
||||
// If true, will execute immediately even if doing so would cause the DB to
|
||||
// enter write stall mode. Otherwise, it'll sleep until load is low enough.
|
||||
bool allow_write_stall = false;
|
||||
// If > 0, it will replace the option in the DBOptions for this compaction.
|
||||
uint32_t max_subcompactions = 0;
|
||||
};
|
||||
|
||||
// IngestExternalFileOptions is used by IngestExternalFile()
|
||||
|
Loading…
Reference in New Issue
Block a user