Fix a race condition when disable and enable manual compaction (#9694)

Summary:
In https://github.com/facebook/rocksdb/issues/9659, when `DisableManualCompaction()` is issued, the foreground
manual compaction thread does not have to wait background compaction
thread to finish. Which could be a problem that the user re-enable
manual compaction with `EnableManualCompaction()`, it may re-enable the
BG compaction which supposed be cancelled.
This patch makes the FG compaction wait on
`manual_compaction_state.done`, which either be set by BG compaction or
Unschedule callback. Then when FG manual compaction thread returns, it
should not have BG compaction running. So shared_ptr is no longer needed
for `manual_compaction_state`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9694

Test Plan: a StressTest and unittest

Reviewed By: ajkr

Differential Revision: D34885472

Pulled By: jay-zhuang

fbshipit-source-id: e6476175b43e8c59cd49f5c09241036a0716c274
This commit is contained in:
Jay Zhuang 2022-03-15 12:31:14 -07:00 committed by Andrew Kryczka
parent 4f37eb4db2
commit 32ad0dcafe
4 changed files with 100 additions and 63 deletions

View File

@ -4,6 +4,7 @@
* Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data.
* Fixed a bug that DisableManualCompaction may assert when disable an unscheduled manual compaction.
* Fixed a bug that `Iterator::Refresh()` reads stale keys after DeleteRange() performed.
* Fixed a race condition when disable and re-enable manual compaction.
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.
* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)
* Fixed a read-after-free bug in `DB::GetMergeOperands()`.

View File

@ -6965,8 +6965,7 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) {
{{"DBImpl::BGWorkCompaction",
"DBCompactionTest::DisableJustStartedManualCompaction:"
"PreDisableManualCompaction"},
{"DBCompactionTest::DisableJustStartedManualCompaction:"
"ManualCompactionReturn",
{"DBImpl::RunManualCompaction:Unscheduled",
"BackgroundCallCompaction:0"}});
SyncPoint::GetInstance()->EnableProcessing();
@ -6975,9 +6974,6 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) {
cro.exclusive_manual_compaction = true;
auto s = db_->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
TEST_SYNC_POINT(
"DBCompactionTest::DisableJustStartedManualCompaction:"
"ManualCompactionReturn");
});
TEST_SYNC_POINT(
"DBCompactionTest::DisableJustStartedManualCompaction:"
@ -6987,6 +6983,43 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) {
compact_thread.join();
}
TEST_F(DBCompactionTest, DisableInProgressManualCompaction) {
const int kNumL0Files = 4;
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumL0Files;
Reopen(options);
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BackgroundCompaction:InProgress",
"DBCompactionTest::DisableInProgressManualCompaction:"
"PreDisableManualCompaction"},
{"DBImpl::RunManualCompaction:Unscheduled",
"CompactionJob::Run():Start"}});
SyncPoint::GetInstance()->EnableProcessing();
// generate files, but avoid trigger auto compaction
for (int i = 0; i < kNumL0Files / 2; i++) {
ASSERT_OK(Put(Key(1), "value1"));
ASSERT_OK(Put(Key(2), "value2"));
ASSERT_OK(Flush());
}
port::Thread compact_thread([&]() {
CompactRangeOptions cro;
cro.exclusive_manual_compaction = true;
auto s = db_->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
});
TEST_SYNC_POINT(
"DBCompactionTest::DisableInProgressManualCompaction:"
"PreDisableManualCompaction");
db_->DisableManualCompaction();
compact_thread.join();
}
TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) {
const int kNumL0Files = 4;

View File

@ -1526,8 +1526,7 @@ class DBImpl : public DB {
Compaction* compaction;
// caller retains ownership of `manual_compaction_state` as it is reused
// across background compactions.
std::shared_ptr<ManualCompactionState>
manual_compaction_state; // nullptr if non-manual
ManualCompactionState* manual_compaction_state; // nullptr if non-manual
// task limiter token is requested during compaction picking.
std::unique_ptr<TaskLimiterToken> task_token;
};

View File

@ -1781,10 +1781,11 @@ Status DBImpl::RunManualCompaction(
CompactionArg* ca = nullptr;
bool scheduled = false;
bool unscheduled = false;
Env::Priority thread_pool_priority = Env::Priority::TOTAL;
bool manual_conflict = false;
auto manual = std::make_shared<ManualCompactionState>(
ManualCompactionState manual(
cfd, input_level, output_level, compact_range_options.target_path_id,
exclusive, disallow_trivial_move, compact_range_options.canceled);
// For universal compaction, we enforce every manual compaction to compact
@ -1792,18 +1793,18 @@ Status DBImpl::RunManualCompaction(
if (begin == nullptr ||
cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
manual->begin = nullptr;
manual.begin = nullptr;
} else {
begin_storage.SetMinPossibleForUserKey(*begin);
manual->begin = &begin_storage;
manual.begin = &begin_storage;
}
if (end == nullptr ||
cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
manual->end = nullptr;
manual.end = nullptr;
} else {
end_storage.SetMaxPossibleForUserKey(*end);
manual->end = &end_storage;
manual.end = &end_storage;
}
TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
@ -1815,10 +1816,10 @@ Status DBImpl::RunManualCompaction(
// `DisableManualCompaction()` just waited for the manual compaction queue
// to drain. So return immediately.
TEST_SYNC_POINT("DBImpl::RunManualCompaction:PausedAtStart");
manual->status =
manual.status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
manual->done = true;
return manual->status;
manual.done = true;
return manual.status;
}
// When a manual compaction arrives, temporarily disable scheduling of
@ -1838,7 +1839,7 @@ Status DBImpl::RunManualCompaction(
// However, only one of them will actually schedule compaction, while
// others will wait on a condition variable until it completes.
AddManualCompaction(manual.get());
AddManualCompaction(&manual);
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
if (exclusive) {
// Limitation: there's no way to wake up the below loop when user sets
@ -1847,11 +1848,11 @@ Status DBImpl::RunManualCompaction(
while (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0) {
if (manual_compaction_paused_ > 0 ||
(manual->canceled != nullptr && *manual->canceled == true)) {
(manual.canceled != nullptr && *manual.canceled == true)) {
// Pretend the error came from compaction so the below cleanup/error
// handling code can process it.
manual->done = true;
manual->status =
manual.done = true;
manual.status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
break;
}
@ -1873,30 +1874,27 @@ Status DBImpl::RunManualCompaction(
// We don't check bg_error_ here, because if we get the error in compaction,
// the compaction will set manual.status to bg_error_ and set manual.done to
// true.
while (!manual->done) {
while (!manual.done) {
assert(HasPendingManualCompaction());
manual_conflict = false;
Compaction* compaction = nullptr;
if (ShouldntRunManualCompaction(manual.get()) ||
(manual->in_progress == true) || scheduled ||
(((manual->manual_end = &manual->tmp_storage1) != nullptr) &&
((compaction = manual->cfd->CompactRange(
*manual->cfd->GetLatestMutableCFOptions(), mutable_db_options_,
manual->input_level, manual->output_level, compact_range_options,
manual->begin, manual->end, &manual->manual_end,
&manual_conflict, max_file_num_to_ignore)) == nullptr &&
if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
scheduled ||
(((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
((compaction = manual.cfd->CompactRange(
*manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_,
manual.input_level, manual.output_level, compact_range_options,
manual.begin, manual.end, &manual.manual_end, &manual_conflict,
max_file_num_to_ignore)) == nullptr &&
manual_conflict))) {
// exclusive manual compactions should not see a conflict during
// CompactRange
assert(!exclusive || !manual_conflict);
// Running either this or some other manual compaction
bg_cv_.Wait();
if (manual_compaction_paused_ > 0) {
manual->done = true;
manual->status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
if (scheduled) {
if (manual_compaction_paused_ > 0 && scheduled && !unscheduled) {
assert(thread_pool_priority != Env::Priority::TOTAL);
// unschedule all manual compactions
auto unscheduled_task_num = env_->UnSchedule(
GetTaskTag(TaskType::kManualCompaction), thread_pool_priority);
if (unscheduled_task_num > 0) {
@ -1905,32 +1903,34 @@ Status DBImpl::RunManualCompaction(
"[%s] Unscheduled %d number of manual compactions from the "
"thread-pool",
cfd->GetName().c_str(), unscheduled_task_num);
// it may unschedule other manual compactions, notify others.
bg_cv_.SignalAll();
}
unscheduled = true;
TEST_SYNC_POINT("DBImpl::RunManualCompaction:Unscheduled");
}
break;
}
if (scheduled && manual->incomplete == true) {
assert(!manual->in_progress);
if (scheduled && manual.incomplete == true) {
assert(!manual.in_progress);
scheduled = false;
manual->incomplete = false;
manual.incomplete = false;
}
} else if (!scheduled) {
if (compaction == nullptr) {
manual->done = true;
manual.done = true;
bg_cv_.SignalAll();
continue;
}
ca = new CompactionArg;
ca->db = this;
ca->prepicked_compaction = new PrepickedCompaction;
ca->prepicked_compaction->manual_compaction_state = manual;
ca->prepicked_compaction->manual_compaction_state = &manual;
ca->prepicked_compaction->compaction = compaction;
if (!RequestCompactionToken(
cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
// Don't throttle manual compaction, only count outstanding tasks.
assert(false);
}
manual->incomplete = false;
manual.incomplete = false;
if (compaction->bottommost_level() &&
env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
bg_bottom_compaction_scheduled_++;
@ -1954,18 +1954,18 @@ Status DBImpl::RunManualCompaction(
}
log_buffer.FlushBufferToLog();
assert(!manual->in_progress);
assert(!manual.in_progress);
assert(HasPendingManualCompaction());
RemoveManualCompaction(manual.get());
RemoveManualCompaction(&manual);
// if the manual job is unscheduled, try schedule other jobs in case there's
// any unscheduled compaction job which was blocked by exclusive manual
// compaction.
if (manual->status.IsIncomplete() &&
manual->status.subcode() == Status::SubCode::kManualCompactionPaused) {
if (manual.status.IsIncomplete() &&
manual.status.subcode() == Status::SubCode::kManualCompactionPaused) {
MaybeScheduleFlushOrCompaction();
}
bg_cv_.SignalAll();
return manual->status;
return manual.status;
}
void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
@ -2691,6 +2691,12 @@ void DBImpl::UnscheduleCompactionCallback(void* arg) {
CompactionArg ca = *(ca_ptr);
delete reinterpret_cast<CompactionArg*>(arg);
if (ca.prepicked_compaction != nullptr) {
// if it's a manual compaction, set status to ManualCompactionPaused
if (ca.prepicked_compaction->manual_compaction_state) {
ca.prepicked_compaction->manual_compaction_state->done = true;
ca.prepicked_compaction->manual_compaction_state->status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (ca.prepicked_compaction->compaction != nullptr) {
ca.prepicked_compaction->compaction->ReleaseCompactionFiles(
Status::Incomplete(Status::SubCode::kManualCompactionPaused));
@ -2933,7 +2939,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
mutex_.Lock();
} else if (s.IsManualCompactionPaused()) {
assert(prepicked_compaction);
auto m = prepicked_compaction->manual_compaction_state;
ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
assert(m);
ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
m->cfd->GetName().c_str(), job_context.job_id);
@ -3015,7 +3021,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction,
Env::Priority thread_pri) {
std::shared_ptr<ManualCompactionState> manual_compaction =
ManualCompactionState* manual_compaction =
prepicked_compaction == nullptr
? nullptr
: prepicked_compaction->manual_compaction_state;
@ -3059,10 +3065,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (!status.ok()) {
if (is_manual) {
manual_compaction->status = status;
manual_compaction->status
.PermitUncheckedError(); // the manual compaction thread may exit
// first, which won't be able to check the
// status
manual_compaction->done = true;
manual_compaction->in_progress = false;
manual_compaction = nullptr;
@ -3079,13 +3081,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
manual_compaction->in_progress = true;
}
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:InProgress");
std::unique_ptr<TaskLimiterToken> task_token;
// InternalKey manual_end_storage;
// InternalKey* manual_end = &manual_end_storage;
bool sfm_reserved_compact_space = false;
if (is_manual) {
auto m = manual_compaction;
ManualCompactionState* m = manual_compaction;
assert(m->in_progress);
if (!c) {
m->done = true;
@ -3469,7 +3473,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
c.reset();
if (is_manual) {
auto m = manual_compaction;
ManualCompactionState* m = manual_compaction;
if (!status.ok()) {
m->status = status;
m->done = true;