Add delay before flush in CompactRange to avoid write stalling
Summary: - Refactored logic for checking write stall condition to a helper function: `GetWriteStallConditionAndCause`. Now it is decoupled from the logic for updating WriteController / stats in `RecalculateWriteStallConditions`, so we can reuse it for predicting whether write stall will occur. - Updated `CompactRange` to first check whether the one additional immutable memtable / L0 file would cause stalling before it flushes. If so, it waits until that is no longer true. - Updated `bg_cv_` to be signaled on `SetOptions` calls. The stall conditions `CompactRange` cares about can change when (1) flush finishes, (2) compaction finishes, or (3) options dynamically change. The cv was already signaled for (1) and (2) but not yet for (3). Closes https://github.com/facebook/rocksdb/pull/3381 Differential Revision: D6754983 Pulled By: ajkr fbshipit-source-id: 5613e03f1524df7192dc6ae885d40fd8f091d972
This commit is contained in:
parent
0a0fad447b
commit
ee1c802675
@ -2,11 +2,12 @@
|
||||
## Unreleased
|
||||
### Public API Change
|
||||
* Iterator::SeekForPrev is now a pure virtual method. This is to prevent user who implement the Iterator interface fail to implement SeekForPrev by mistake.
|
||||
- Add `include_end` option to make the range end exclusive when `include_end == false` in `DeleteFilesInRange()`.
|
||||
* Add `include_end` option to make the range end exclusive when `include_end == false` in `DeleteFilesInRange()`.
|
||||
* Add `CompactRangeOptions::allow_write_stall`, which makes `CompactRange` start working immediately, even if it causes user writes to stall. The default value is false, meaning we add delay to `CompactRange` calls until stalling can be avoided when possible. Note this delay is not present in previous RocksDB versions.
|
||||
|
||||
### New Features
|
||||
* Improve the performance of iterators doing long range scans by using readahead.
|
||||
- Add new function `DeleteFilesInRanges()` to delete files in multiple ranges at once for better performance.
|
||||
* Add new function `DeleteFilesInRanges()` to delete files in multiple ranges at once for better performance.
|
||||
|
||||
### Bug Fixes
|
||||
* Fix `DisableFileDeletions()` followed by `GetSortedWalFiles()` to not return obsolete WAL files that `PurgeObsoleteFiles()` is going to delete.
|
||||
|
@ -632,6 +632,41 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
|
||||
}
|
||||
} // namespace
|
||||
|
||||
std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
|
||||
ColumnFamilyData::GetWriteStallConditionAndCause(
|
||||
int num_unflushed_memtables, int num_l0_files,
|
||||
uint64_t num_compaction_needed_bytes,
|
||||
const MutableCFOptions& mutable_cf_options) {
|
||||
if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
|
||||
return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
|
||||
return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
|
||||
num_compaction_needed_bytes >=
|
||||
mutable_cf_options.hard_pending_compaction_bytes_limit) {
|
||||
return {WriteStallCondition::kStopped,
|
||||
WriteStallCause::kPendingCompactionBytes};
|
||||
} else if (mutable_cf_options.max_write_buffer_number > 3 &&
|
||||
num_unflushed_memtables >=
|
||||
mutable_cf_options.max_write_buffer_number - 1) {
|
||||
return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
|
||||
num_l0_files >=
|
||||
mutable_cf_options.level0_slowdown_writes_trigger) {
|
||||
return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
|
||||
num_compaction_needed_bytes >=
|
||||
mutable_cf_options.soft_pending_compaction_bytes_limit) {
|
||||
return {WriteStallCondition::kDelayed,
|
||||
WriteStallCause::kPendingCompactionBytes};
|
||||
}
|
||||
return {WriteStallCondition::kNormal, WriteStallCause::kNone};
|
||||
}
|
||||
|
||||
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
const MutableCFOptions& mutable_cf_options) {
|
||||
auto write_stall_condition = WriteStallCondition::kNormal;
|
||||
@ -641,25 +676,29 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
uint64_t compaction_needed_bytes =
|
||||
vstorage->estimated_compaction_needed_bytes();
|
||||
|
||||
auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
|
||||
imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
|
||||
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
|
||||
write_stall_condition = write_stall_condition_and_cause.first;
|
||||
auto write_stall_cause = write_stall_condition_and_cause.second;
|
||||
|
||||
bool was_stopped = write_controller->IsStopped();
|
||||
bool needed_delay = write_controller->NeedsDelay();
|
||||
|
||||
if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
|
||||
if (write_stall_condition == WriteStallCondition::kStopped &&
|
||||
write_stall_cause == WriteStallCause::kMemtableLimit) {
|
||||
write_controller_token_ = write_controller->GetStopToken();
|
||||
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
|
||||
write_stall_condition = WriteStallCondition::kStopped;
|
||||
ROCKS_LOG_WARN(
|
||||
ioptions_.info_log,
|
||||
"[%s] Stopping writes because we have %d immutable memtables "
|
||||
"(waiting for flush), max_write_buffer_number is set to %d",
|
||||
name_.c_str(), imm()->NumNotFlushed(),
|
||||
mutable_cf_options.max_write_buffer_number);
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
vstorage->l0_delay_trigger_count() >=
|
||||
mutable_cf_options.level0_stop_writes_trigger) {
|
||||
} else if (write_stall_condition == WriteStallCondition::kStopped &&
|
||||
write_stall_cause == WriteStallCause::kL0FileCountLimit) {
|
||||
write_controller_token_ = write_controller->GetStopToken();
|
||||
internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
|
||||
write_stall_condition = WriteStallCondition::kStopped;
|
||||
if (compaction_picker_->IsLevel0CompactionInProgress()) {
|
||||
internal_stats_->AddCFStats(
|
||||
InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
|
||||
@ -667,28 +706,23 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
ROCKS_LOG_WARN(ioptions_.info_log,
|
||||
"[%s] Stopping writes because we have %d level-0 files",
|
||||
name_.c_str(), vstorage->l0_delay_trigger_count());
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
|
||||
compaction_needed_bytes >=
|
||||
mutable_cf_options.hard_pending_compaction_bytes_limit) {
|
||||
} else if (write_stall_condition == WriteStallCondition::kStopped &&
|
||||
write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
|
||||
write_controller_token_ = write_controller->GetStopToken();
|
||||
internal_stats_->AddCFStats(
|
||||
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
|
||||
write_stall_condition = WriteStallCondition::kStopped;
|
||||
ROCKS_LOG_WARN(
|
||||
ioptions_.info_log,
|
||||
"[%s] Stopping writes because of estimated pending compaction "
|
||||
"bytes %" PRIu64,
|
||||
name_.c_str(), compaction_needed_bytes);
|
||||
} else if (mutable_cf_options.max_write_buffer_number > 3 &&
|
||||
imm()->NumNotFlushed() >=
|
||||
mutable_cf_options.max_write_buffer_number - 1) {
|
||||
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
|
||||
write_stall_cause == WriteStallCause::kMemtableLimit) {
|
||||
write_controller_token_ =
|
||||
SetupDelay(write_controller, compaction_needed_bytes,
|
||||
prev_compaction_needed_bytes_, was_stopped,
|
||||
mutable_cf_options.disable_auto_compactions);
|
||||
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
|
||||
write_stall_condition = WriteStallCondition::kDelayed;
|
||||
ROCKS_LOG_WARN(
|
||||
ioptions_.info_log,
|
||||
"[%s] Stalling writes because we have %d immutable memtables "
|
||||
@ -697,10 +731,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
name_.c_str(), imm()->NumNotFlushed(),
|
||||
mutable_cf_options.max_write_buffer_number,
|
||||
write_controller->delayed_write_rate());
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
|
||||
vstorage->l0_delay_trigger_count() >=
|
||||
mutable_cf_options.level0_slowdown_writes_trigger) {
|
||||
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
|
||||
write_stall_cause == WriteStallCause::kL0FileCountLimit) {
|
||||
// L0 is the last two files from stopping.
|
||||
bool near_stop = vstorage->l0_delay_trigger_count() >=
|
||||
mutable_cf_options.level0_stop_writes_trigger - 2;
|
||||
@ -710,7 +742,6 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
mutable_cf_options.disable_auto_compactions);
|
||||
internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
|
||||
1);
|
||||
write_stall_condition = WriteStallCondition::kDelayed;
|
||||
if (compaction_picker_->IsLevel0CompactionInProgress()) {
|
||||
internal_stats_->AddCFStats(
|
||||
InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
|
||||
@ -720,10 +751,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
"rate %" PRIu64,
|
||||
name_.c_str(), vstorage->l0_delay_trigger_count(),
|
||||
write_controller->delayed_write_rate());
|
||||
} else if (!mutable_cf_options.disable_auto_compactions &&
|
||||
mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
|
||||
vstorage->estimated_compaction_needed_bytes() >=
|
||||
mutable_cf_options.soft_pending_compaction_bytes_limit) {
|
||||
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
|
||||
write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
|
||||
// If the distance to hard limit is less than 1/4 of the gap between soft
|
||||
// and
|
||||
// hard bytes limit, we think it is near stop and speed up the slowdown.
|
||||
@ -741,7 +770,6 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
mutable_cf_options.disable_auto_compactions);
|
||||
internal_stats_->AddCFStats(
|
||||
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
|
||||
write_stall_condition = WriteStallCondition::kDelayed;
|
||||
ROCKS_LOG_WARN(
|
||||
ioptions_.info_log,
|
||||
"[%s] Stalling writes because of estimated pending compaction "
|
||||
@ -749,6 +777,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
|
||||
write_controller->delayed_write_rate());
|
||||
} else {
|
||||
assert(write_stall_condition == WriteStallCondition::kNormal);
|
||||
if (vstorage->l0_delay_trigger_count() >=
|
||||
GetL0ThresholdSpeedupCompaction(
|
||||
mutable_cf_options.level0_file_num_compaction_trigger,
|
||||
|
@ -30,6 +30,7 @@ namespace rocksdb {
|
||||
|
||||
class Version;
|
||||
class VersionSet;
|
||||
class VersionStorageInfo;
|
||||
class MemTable;
|
||||
class MemTableListVersion;
|
||||
class CompactionPicker;
|
||||
@ -335,6 +336,17 @@ class ColumnFamilyData {
|
||||
bool pending_flush() { return pending_flush_; }
|
||||
bool pending_compaction() { return pending_compaction_; }
|
||||
|
||||
enum class WriteStallCause {
|
||||
kNone,
|
||||
kMemtableLimit,
|
||||
kL0FileCountLimit,
|
||||
kPendingCompactionBytes,
|
||||
};
|
||||
static std::pair<WriteStallCondition, WriteStallCause>
|
||||
GetWriteStallConditionAndCause(int num_unflushed_memtables, int num_l0_files,
|
||||
uint64_t num_compaction_needed_bytes,
|
||||
const MutableCFOptions& mutable_cf_options);
|
||||
|
||||
// Recalculate some small conditions, which are changed only during
|
||||
// compaction, adding new memtable and/or
|
||||
// recalculation of compaction score. These values are used in
|
||||
|
@ -3105,6 +3105,218 @@ TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
|
||||
// Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
|
||||
// compaction only triggers flush after it's sure stall won't be triggered for
|
||||
// L0 file count going too high.
|
||||
const int kNumL0FilesTrigger = 4;
|
||||
const int kNumL0FilesLimit = 8;
|
||||
// i == 0: verifies normal case where stall is avoided by delay
|
||||
// i == 1: verifies no delay in edge case where stall trigger is same as
|
||||
// compaction trigger, so stall can't be avoided
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
Options options = CurrentOptions();
|
||||
options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
|
||||
if (i == 0) {
|
||||
options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
|
||||
} else {
|
||||
options.level0_file_num_compaction_trigger = kNumL0FilesLimit;
|
||||
}
|
||||
Reopen(options);
|
||||
|
||||
if (i == 0) {
|
||||
// ensure the auto compaction doesn't finish until manual compaction has
|
||||
// had a chance to be delayed.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWait", "CompactionJob::Run():End"}});
|
||||
} else {
|
||||
// ensure the auto-compaction doesn't finish until manual compaction has
|
||||
// continued without delay.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}});
|
||||
}
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Random rnd(301);
|
||||
for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
|
||||
for (int k = 0; k < 2; ++k) {
|
||||
ASSERT_OK(Put(Key(k), RandomString(&rnd, 1024)));
|
||||
}
|
||||
Flush();
|
||||
}
|
||||
auto manual_compaction_thread = port::Thread([this]() {
|
||||
CompactRangeOptions cro;
|
||||
cro.allow_write_stall = false;
|
||||
db_->CompactRange(cro, nullptr, nullptr);
|
||||
});
|
||||
|
||||
manual_compaction_thread.join();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ(0, NumTableFilesAtLevel(0));
|
||||
ASSERT_GT(NumTableFilesAtLevel(1), 0);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
|
||||
// Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
|
||||
// compaction only triggers flush after it's sure stall won't be triggered for
|
||||
// immutable memtable count going too high.
|
||||
const int kNumImmMemTableLimit = 8;
|
||||
// i == 0: verifies normal case where stall is avoided by delay
|
||||
// i == 1: verifies no delay in edge case where stall trigger is same as flush
|
||||
// trigger, so stall can't be avoided
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = true;
|
||||
// the delay limit is one less than the stop limit. This test focuses on
|
||||
// avoiding delay limit, but this option sets stop limit, so add one.
|
||||
options.max_write_buffer_number = kNumImmMemTableLimit + 1;
|
||||
if (i == 1) {
|
||||
options.min_write_buffer_number_to_merge = kNumImmMemTableLimit;
|
||||
}
|
||||
Reopen(options);
|
||||
|
||||
if (i == 0) {
|
||||
// ensure the flush doesn't finish until manual compaction has had a
|
||||
// chance to be delayed.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWait", "FlushJob::WriteLevel0Table"}});
|
||||
} else {
|
||||
// ensure the flush doesn't finish until manual compaction has continued
|
||||
// without delay.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWaitDone",
|
||||
"FlushJob::WriteLevel0Table"}});
|
||||
}
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Random rnd(301);
|
||||
for (int j = 0; j < kNumImmMemTableLimit - 1; ++j) {
|
||||
ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
|
||||
FlushOptions flush_opts;
|
||||
flush_opts.wait = false;
|
||||
dbfull()->Flush(flush_opts);
|
||||
}
|
||||
|
||||
auto manual_compaction_thread = port::Thread([this]() {
|
||||
CompactRangeOptions cro;
|
||||
cro.allow_write_stall = false;
|
||||
db_->CompactRange(cro, nullptr, nullptr);
|
||||
});
|
||||
|
||||
manual_compaction_thread.join();
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
ASSERT_EQ(0, NumTableFilesAtLevel(0));
|
||||
ASSERT_GT(NumTableFilesAtLevel(1), 0);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
|
||||
// Verify that, when `CompactRangeOptions::allow_write_stall == false`, delay
|
||||
// does not hang if CF is dropped or DB is closed
|
||||
const int kNumL0FilesTrigger = 4;
|
||||
const int kNumL0FilesLimit = 8;
|
||||
Options options = CurrentOptions();
|
||||
options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
|
||||
options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
|
||||
// i == 0: DB::DropColumnFamily() on CompactRange's target CF unblocks it
|
||||
// i == 1: DB::CancelAllBackgroundWork() unblocks CompactRange. This is to
|
||||
// simulate what happens during Close as we can't call Close (it
|
||||
// blocks on the auto-compaction, making a cycle).
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
CreateAndReopenWithCF({"one"}, options);
|
||||
// The calls to close CF/DB wait until the manual compaction stalls.
|
||||
// The auto-compaction waits until the manual compaction finishes to ensure
|
||||
// the signal comes from closing CF/DB, not from compaction making progress.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWait",
|
||||
"DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
|
||||
{"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
|
||||
"CompactionJob::Run():End"}});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Random rnd(301);
|
||||
for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
|
||||
for (int k = 0; k < 2; ++k) {
|
||||
ASSERT_OK(Put(1, Key(k), RandomString(&rnd, 1024)));
|
||||
}
|
||||
Flush(1);
|
||||
}
|
||||
auto manual_compaction_thread = port::Thread([this]() {
|
||||
CompactRangeOptions cro;
|
||||
cro.allow_write_stall = false;
|
||||
ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
|
||||
.IsShutdownInProgress());
|
||||
});
|
||||
|
||||
TEST_SYNC_POINT(
|
||||
"DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown");
|
||||
if (i == 0) {
|
||||
ASSERT_OK(db_->DropColumnFamily(handles_[1]));
|
||||
} else {
|
||||
dbfull()->CancelAllBackgroundWork(false /* wait */);
|
||||
}
|
||||
manual_compaction_thread.join();
|
||||
TEST_SYNC_POINT(
|
||||
"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual");
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
|
||||
// Verify that, when `CompactRangeOptions::allow_write_stall == false`,
|
||||
// CompactRange skips its flush if the delay is long enough that the memtables
|
||||
// existing at the beginning of the call have already been flushed.
|
||||
const int kNumL0FilesTrigger = 4;
|
||||
const int kNumL0FilesLimit = 8;
|
||||
Options options = CurrentOptions();
|
||||
options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
|
||||
options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
|
||||
Reopen(options);
|
||||
|
||||
Random rnd(301);
|
||||
// The manual flush includes the memtable that was active when CompactRange
|
||||
// began. So it unblocks CompactRange and precludes its flush. Throughout the
|
||||
// test, stall conditions are upheld via high L0 file count.
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::CompactRange:StallWait",
|
||||
"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
|
||||
{"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
|
||||
"DBImpl::CompactRange:StallWaitDone"},
|
||||
{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
|
||||
for (int j = 0; j < 2; ++j) {
|
||||
ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
|
||||
}
|
||||
Flush();
|
||||
}
|
||||
auto manual_compaction_thread = port::Thread([this]() {
|
||||
CompactRangeOptions cro;
|
||||
cro.allow_write_stall = false;
|
||||
db_->CompactRange(cro, nullptr, nullptr);
|
||||
});
|
||||
|
||||
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
|
||||
Put(ToString(0), RandomString(&rnd, 1024));
|
||||
Flush();
|
||||
Put(ToString(0), RandomString(&rnd, 1024));
|
||||
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
|
||||
manual_compaction_thread.join();
|
||||
|
||||
// If CompactRange's flush was skipped, the final Put above will still be
|
||||
// in the active memtable.
|
||||
std::string num_keys_in_memtable;
|
||||
db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable);
|
||||
ASSERT_EQ(ToString(1), num_keys_in_memtable);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
|
||||
::testing::Values(std::make_tuple(1, true),
|
||||
std::make_tuple(1, false),
|
||||
|
@ -549,6 +549,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
|
||||
|
||||
persist_options_status = WriteOptionsFile(
|
||||
false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
}
|
||||
sv_context.Clean();
|
||||
@ -1433,6 +1434,7 @@ Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
|
||||
}
|
||||
is_snapshot_supported_ = new_is_snapshot_supported;
|
||||
}
|
||||
bg_cv_.SignalAll();
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
|
@ -963,6 +963,8 @@ class DBImpl : public DB {
|
||||
// * whenever num_running_ingest_file_ goes to 0.
|
||||
// * whenever pending_purge_obsolete_files_ goes to 0.
|
||||
// * whenever disable_delete_obsolete_files_ goes to 0.
|
||||
// * whenever SetOptions successfully updates options.
|
||||
// * whenever a column family is dropped.
|
||||
InstrumentedCondVar bg_cv_;
|
||||
// Writes are protected by locking both mutex_ and log_write_mutex_, and reads
|
||||
// must be under either mutex_ or log_write_mutex_. Since after ::Open,
|
||||
|
@ -290,11 +290,66 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
|
||||
auto cfd = cfh->cfd();
|
||||
bool exclusive = options.exclusive_manual_compaction;
|
||||
|
||||
Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kManualCompaction);
|
||||
bool flush_needed = true;
|
||||
if (!options.allow_write_stall) {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
|
||||
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
|
||||
do {
|
||||
if (write_stall_condition != WriteStallCondition::kNormal) {
|
||||
TEST_SYNC_POINT("DBImpl::CompactRange:StallWait");
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"[%s] CompactRange waiting on stall conditions to clear",
|
||||
cfd->GetName().c_str());
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status::ShutdownInProgress();
|
||||
}
|
||||
|
||||
uint64_t earliest_memtable_id =
|
||||
std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
|
||||
if (earliest_memtable_id > orig_active_memtable_id) {
|
||||
// We waited so long that the memtable we were originally waiting on was
|
||||
// flushed.
|
||||
flush_needed = false;
|
||||
break;
|
||||
}
|
||||
|
||||
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
|
||||
const auto* vstorage = cfd->current()->storage_info();
|
||||
|
||||
// Skip stalling check if we're below auto-flush and auto-compaction
|
||||
// triggers. If it stalled in these conditions, that'd mean the stall
|
||||
// triggers are so low that stalling is needed for any background work. In
|
||||
// that case we shouldn't wait since background work won't be scheduled.
|
||||
if (cfd->imm()->NumNotFlushed() <
|
||||
cfd->ioptions()->min_write_buffer_number_to_merge &&
|
||||
vstorage->l0_delay_trigger_count() <
|
||||
mutable_cf_options.level0_file_num_compaction_trigger) {
|
||||
break;
|
||||
}
|
||||
|
||||
// check whether one extra immutable memtable or an extra L0 file would
|
||||
// cause write stalling mode to be entered. It could still enter stall
|
||||
// mode due to pending compaction bytes, but that's less common
|
||||
write_stall_condition =
|
||||
ColumnFamilyData::GetWriteStallConditionAndCause(
|
||||
cfd->imm()->NumNotFlushed() + 1,
|
||||
vstorage->l0_delay_trigger_count() + 1,
|
||||
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
|
||||
.first;
|
||||
} while (write_stall_condition != WriteStallCondition::kNormal);
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::CompactRange:StallWaitDone");
|
||||
Status s;
|
||||
if (flush_needed) {
|
||||
s = FlushMemTable(cfd, FlushOptions(), FlushReason::kManualCompaction);
|
||||
if (!s.ok()) {
|
||||
LogFlush(immutable_db_options_.info_log);
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
int max_level_with_files = 0;
|
||||
{
|
||||
|
@ -1205,6 +1205,9 @@ struct CompactRangeOptions {
|
||||
// if there is a compaction filter
|
||||
BottommostLevelCompaction bottommost_level_compaction =
|
||||
BottommostLevelCompaction::kIfHaveCompactionFilter;
|
||||
// If true, will execute immediately even if doing so would cause the DB to
|
||||
// enter write stall mode. Otherwise, it'll sleep until load is low enough.
|
||||
bool allow_write_stall = false;
|
||||
};
|
||||
|
||||
// IngestExternalFileOptions is used by IngestExternalFile()
|
||||
|
Loading…
x
Reference in New Issue
Block a user