Unschedule manual compaction from thread-pool queue (#9625)
Summary: PR https://github.com/facebook/rocksdb/issues/9557 introduced a race condition between manual compaction foreground thread and background compaction thread. This PR adds the ability to really unschedule manual compaction from thread-pool queue by differentiate tag name for manual compaction and other tasks. Also fix an issue that db `close()` didn't cancel the manual compaction thread. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9625 Test Plan: unittest not hang Reviewed By: ajkr Differential Revision: D34410811 Pulled By: jay-zhuang fbshipit-source-id: cb14065eabb8cf1345fa042b5652d4f788c0c40c
This commit is contained in:
parent
d6bb43202e
commit
529efcc5b2
@ -2,6 +2,7 @@
|
|||||||
## Unreleased
|
## Unreleased
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
* Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data.
|
* Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data.
|
||||||
|
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.
|
||||||
|
|
||||||
## 6.29.3 (02/17/2022)
|
## 6.29.3 (02/17/2022)
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
|
@ -6943,7 +6943,7 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
|
|||||||
|
|
||||||
SyncPoint::GetInstance()->LoadDependency(
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
{{"DBImpl::RunManualCompaction:Scheduled",
|
{{"DBImpl::RunManualCompaction:Scheduled",
|
||||||
"DBCompactionTest::DisableManualCompactionThreadQueueFull:"
|
"DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
|
||||||
"PreDisableManualCompaction"}});
|
"PreDisableManualCompaction"}});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
@ -6971,7 +6971,7 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
TEST_SYNC_POINT(
|
TEST_SYNC_POINT(
|
||||||
"DBCompactionTest::DisableManualCompactionThreadQueueFull:"
|
"DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
|
||||||
"PreDisableManualCompaction");
|
"PreDisableManualCompaction");
|
||||||
|
|
||||||
// Generate more files to trigger auto compaction which is scheduled after
|
// Generate more files to trigger auto compaction which is scheduled after
|
||||||
@ -6998,6 +6998,63 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
|
|||||||
sleeping_task_low.WaitUntilDone();
|
sleeping_task_low.WaitUntilDone();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBCompactionTest, DBCloseWithManualCompaction) {
|
||||||
|
const int kNumL0Files = 4;
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
|
{{"DBImpl::RunManualCompaction:Scheduled",
|
||||||
|
"DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
|
||||||
|
"PreDisableManualCompaction"}});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.level0_file_num_compaction_trigger = kNumL0Files;
|
||||||
|
Reopen(options);
|
||||||
|
|
||||||
|
// Block compaction queue
|
||||||
|
test::SleepingBackgroundTask sleeping_task_low;
|
||||||
|
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||||
|
Env::Priority::LOW);
|
||||||
|
|
||||||
|
// generate files, but avoid trigger auto compaction
|
||||||
|
for (int i = 0; i < kNumL0Files / 2; i++) {
|
||||||
|
ASSERT_OK(Put(Key(1), "value1"));
|
||||||
|
ASSERT_OK(Put(Key(2), "value2"));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
}
|
||||||
|
|
||||||
|
port::Thread compact_thread([&]() {
|
||||||
|
CompactRangeOptions cro;
|
||||||
|
cro.exclusive_manual_compaction = true;
|
||||||
|
auto s = db_->CompactRange(cro, nullptr, nullptr);
|
||||||
|
ASSERT_TRUE(s.IsIncomplete());
|
||||||
|
});
|
||||||
|
|
||||||
|
TEST_SYNC_POINT(
|
||||||
|
"DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
|
||||||
|
"PreDisableManualCompaction");
|
||||||
|
|
||||||
|
// Generate more files to trigger auto compaction which is scheduled after
|
||||||
|
// manual compaction. Has to generate 4 more files because existing files are
|
||||||
|
// pending compaction
|
||||||
|
for (int i = 0; i < kNumL0Files; i++) {
|
||||||
|
ASSERT_OK(Put(Key(1), "value1"));
|
||||||
|
ASSERT_OK(Put(Key(2), "value2"));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
}
|
||||||
|
ASSERT_EQ(ToString(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0));
|
||||||
|
|
||||||
|
// Close DB with manual compaction and auto triggered compaction in the queue.
|
||||||
|
auto s = db_->Close();
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
// manual compaction thread should return with Incomplete().
|
||||||
|
compact_thread.join();
|
||||||
|
|
||||||
|
sleeping_task_low.WakeUp();
|
||||||
|
sleeping_task_low.WaitUntilDone();
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(DBCompactionTest,
|
TEST_F(DBCompactionTest,
|
||||||
DisableManualCompactionDoesNotWaitForDrainingAutomaticCompaction) {
|
DisableManualCompactionDoesNotWaitForDrainingAutomaticCompaction) {
|
||||||
// When `CompactRangeOptions::exclusive_manual_compaction == true`, we wait
|
// When `CompactRangeOptions::exclusive_manual_compaction == true`, we wait
|
||||||
|
@ -533,10 +533,19 @@ Status DBImpl::CloseHelper() {
|
|||||||
// marker. After this we do a variant of the waiting and unschedule work
|
// marker. After this we do a variant of the waiting and unschedule work
|
||||||
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
|
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
|
||||||
CancelAllBackgroundWork(false);
|
CancelAllBackgroundWork(false);
|
||||||
|
|
||||||
|
// Cancel manual compaction if there's any
|
||||||
|
if (HasPendingManualCompaction()) {
|
||||||
|
DisableManualCompaction();
|
||||||
|
}
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
env_->UnSchedule(this, Env::Priority::BOTTOM);
|
// Unschedule all tasks for this DB
|
||||||
env_->UnSchedule(this, Env::Priority::LOW);
|
for (uint8_t i = 0; i < static_cast<uint8_t>(TaskType::kCount); i++) {
|
||||||
env_->UnSchedule(this, Env::Priority::HIGH);
|
env_->UnSchedule(GetTaskTag(i), Env::Priority::BOTTOM);
|
||||||
|
env_->UnSchedule(GetTaskTag(i), Env::Priority::LOW);
|
||||||
|
env_->UnSchedule(GetTaskTag(i), Env::Priority::HIGH);
|
||||||
|
}
|
||||||
|
|
||||||
Status ret = Status::OK();
|
Status ret = Status::OK();
|
||||||
|
|
||||||
// Wait for background work to finish
|
// Wait for background work to finish
|
||||||
|
@ -1517,7 +1517,6 @@ class DBImpl : public DB {
|
|||||||
ManualCompactionState* manual_compaction_state; // nullptr if non-manual
|
ManualCompactionState* manual_compaction_state; // nullptr if non-manual
|
||||||
// task limiter token is requested during compaction picking.
|
// task limiter token is requested during compaction picking.
|
||||||
std::unique_ptr<TaskLimiterToken> task_token;
|
std::unique_ptr<TaskLimiterToken> task_token;
|
||||||
bool is_canceled = false;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct CompactionArg {
|
struct CompactionArg {
|
||||||
@ -1712,6 +1711,25 @@ class DBImpl : public DB {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TaskType is used to identify tasks in thread-pool, currently only
|
||||||
|
// differentiate manual compaction, which could be unscheduled from the
|
||||||
|
// thread-pool.
|
||||||
|
enum class TaskType : uint8_t {
|
||||||
|
kDefault = 0,
|
||||||
|
kManualCompaction = 1,
|
||||||
|
kCount = 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Task tag is used to identity tasks in thread-pool, which is
|
||||||
|
// dbImpl obj address + type
|
||||||
|
inline void* GetTaskTag(TaskType type) {
|
||||||
|
return GetTaskTag(static_cast<uint8_t>(type));
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void* GetTaskTag(uint8_t type) {
|
||||||
|
return static_cast<uint8_t*>(static_cast<void*>(this)) + type;
|
||||||
|
}
|
||||||
|
|
||||||
// REQUIRES: mutex locked and in write thread.
|
// REQUIRES: mutex locked and in write thread.
|
||||||
void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds);
|
void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds);
|
||||||
|
|
||||||
|
@ -1781,6 +1781,7 @@ Status DBImpl::RunManualCompaction(
|
|||||||
CompactionArg* ca = nullptr;
|
CompactionArg* ca = nullptr;
|
||||||
|
|
||||||
bool scheduled = false;
|
bool scheduled = false;
|
||||||
|
Env::Priority thread_pool_priority = Env::Priority::TOTAL;
|
||||||
bool manual_conflict = false;
|
bool manual_conflict = false;
|
||||||
ManualCompactionState manual;
|
ManualCompactionState manual;
|
||||||
manual.cfd = cfd;
|
manual.cfd = cfd;
|
||||||
@ -1902,9 +1903,9 @@ Status DBImpl::RunManualCompaction(
|
|||||||
manual.done = true;
|
manual.done = true;
|
||||||
manual.status =
|
manual.status =
|
||||||
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
|
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
|
||||||
if (ca && ca->prepicked_compaction) {
|
assert(thread_pool_priority != Env::Priority::TOTAL);
|
||||||
ca->prepicked_compaction->is_canceled = true;
|
env_->UnSchedule(GetTaskTag(TaskType::kManualCompaction),
|
||||||
}
|
thread_pool_priority);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (scheduled && manual.incomplete == true) {
|
if (scheduled && manual.incomplete == true) {
|
||||||
@ -1934,13 +1935,17 @@ Status DBImpl::RunManualCompaction(
|
|||||||
bg_bottom_compaction_scheduled_++;
|
bg_bottom_compaction_scheduled_++;
|
||||||
ca->compaction_pri_ = Env::Priority::BOTTOM;
|
ca->compaction_pri_ = Env::Priority::BOTTOM;
|
||||||
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca,
|
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca,
|
||||||
Env::Priority::BOTTOM, this,
|
Env::Priority::BOTTOM,
|
||||||
|
GetTaskTag(TaskType::kManualCompaction),
|
||||||
&DBImpl::UnscheduleCompactionCallback);
|
&DBImpl::UnscheduleCompactionCallback);
|
||||||
|
thread_pool_priority = Env::Priority::BOTTOM;
|
||||||
} else {
|
} else {
|
||||||
bg_compaction_scheduled_++;
|
bg_compaction_scheduled_++;
|
||||||
ca->compaction_pri_ = Env::Priority::LOW;
|
ca->compaction_pri_ = Env::Priority::LOW;
|
||||||
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
|
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW,
|
||||||
|
GetTaskTag(TaskType::kManualCompaction),
|
||||||
&DBImpl::UnscheduleCompactionCallback);
|
&DBImpl::UnscheduleCompactionCallback);
|
||||||
|
thread_pool_priority = Env::Priority::LOW;
|
||||||
}
|
}
|
||||||
scheduled = true;
|
scheduled = true;
|
||||||
TEST_SYNC_POINT("DBImpl::RunManualCompaction:Scheduled");
|
TEST_SYNC_POINT("DBImpl::RunManualCompaction:Scheduled");
|
||||||
@ -1951,6 +1956,13 @@ Status DBImpl::RunManualCompaction(
|
|||||||
assert(!manual.in_progress);
|
assert(!manual.in_progress);
|
||||||
assert(HasPendingManualCompaction());
|
assert(HasPendingManualCompaction());
|
||||||
RemoveManualCompaction(&manual);
|
RemoveManualCompaction(&manual);
|
||||||
|
// if the manual job is unscheduled, try schedule other jobs in case there's
|
||||||
|
// any unscheduled compaction job which was blocked by exclusive manual
|
||||||
|
// compaction.
|
||||||
|
if (manual.status.IsIncomplete() &&
|
||||||
|
manual.status.subcode() == Status::SubCode::kManualCompactionPaused) {
|
||||||
|
MaybeScheduleFlushOrCompaction();
|
||||||
|
}
|
||||||
bg_cv_.SignalAll();
|
bg_cv_.SignalAll();
|
||||||
return manual.status;
|
return manual.status;
|
||||||
}
|
}
|
||||||
@ -2679,6 +2691,8 @@ void DBImpl::UnscheduleCompactionCallback(void* arg) {
|
|||||||
delete reinterpret_cast<CompactionArg*>(arg);
|
delete reinterpret_cast<CompactionArg*>(arg);
|
||||||
if (ca.prepicked_compaction != nullptr) {
|
if (ca.prepicked_compaction != nullptr) {
|
||||||
if (ca.prepicked_compaction->compaction != nullptr) {
|
if (ca.prepicked_compaction->compaction != nullptr) {
|
||||||
|
ca.prepicked_compaction->compaction->ReleaseCompactionFiles(
|
||||||
|
Status::Incomplete(Status::SubCode::kManualCompactionPaused));
|
||||||
delete ca.prepicked_compaction->compaction;
|
delete ca.prepicked_compaction->compaction;
|
||||||
}
|
}
|
||||||
delete ca.prepicked_compaction;
|
delete ca.prepicked_compaction;
|
||||||
@ -2869,23 +2883,13 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
|
|||||||
void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
||||||
Env::Priority bg_thread_pri) {
|
Env::Priority bg_thread_pri) {
|
||||||
bool made_progress = false;
|
bool made_progress = false;
|
||||||
|
JobContext job_context(next_job_id_.fetch_add(1), true);
|
||||||
TEST_SYNC_POINT("BackgroundCallCompaction:0");
|
TEST_SYNC_POINT("BackgroundCallCompaction:0");
|
||||||
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
|
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
|
||||||
immutable_db_options_.info_log.get());
|
immutable_db_options_.info_log.get());
|
||||||
{
|
{
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
|
||||||
if (prepicked_compaction && prepicked_compaction->is_canceled) {
|
|
||||||
assert(prepicked_compaction->compaction);
|
|
||||||
ROCKS_LOG_BUFFER(&log_buffer, "[%s] Skip canceled manual compaction job",
|
|
||||||
prepicked_compaction->compaction->column_family_data()
|
|
||||||
->GetName()
|
|
||||||
.c_str());
|
|
||||||
prepicked_compaction->compaction->ReleaseCompactionFiles(
|
|
||||||
Status::Incomplete(Status::SubCode::kManualCompactionPaused));
|
|
||||||
delete prepicked_compaction->compaction;
|
|
||||||
} else {
|
|
||||||
JobContext job_context(next_job_id_.fetch_add(1), true);
|
|
||||||
// This call will unlock/lock the mutex to wait for current running
|
// This call will unlock/lock the mutex to wait for current running
|
||||||
// IngestExternalFile() calls to finish.
|
// IngestExternalFile() calls to finish.
|
||||||
WaitForIngestFile();
|
WaitForIngestFile();
|
||||||
@ -2928,8 +2932,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
|||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
} else if (s.IsManualCompactionPaused()) {
|
} else if (s.IsManualCompactionPaused()) {
|
||||||
assert(prepicked_compaction);
|
assert(prepicked_compaction);
|
||||||
ManualCompactionState* m =
|
ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
|
||||||
prepicked_compaction->manual_compaction_state;
|
|
||||||
assert(m);
|
assert(m);
|
||||||
ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
|
ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
|
||||||
m->cfd->GetName().c_str(), job_context.job_id);
|
m->cfd->GetName().c_str(), job_context.job_id);
|
||||||
@ -2958,8 +2961,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
|||||||
log_buffer.FlushBufferToLog();
|
log_buffer.FlushBufferToLog();
|
||||||
if (job_context.HaveSomethingToDelete()) {
|
if (job_context.HaveSomethingToDelete()) {
|
||||||
PurgeObsoleteFiles(job_context);
|
PurgeObsoleteFiles(job_context);
|
||||||
TEST_SYNC_POINT(
|
TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
|
||||||
"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
|
|
||||||
}
|
}
|
||||||
job_context.Clean();
|
job_context.Clean();
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
@ -2967,7 +2969,6 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
|||||||
|
|
||||||
assert(num_running_compactions_ > 0);
|
assert(num_running_compactions_ > 0);
|
||||||
num_running_compactions_--;
|
num_running_compactions_--;
|
||||||
}
|
|
||||||
|
|
||||||
if (bg_thread_pri == Env::Priority::LOW) {
|
if (bg_thread_pri == Env::Priority::LOW) {
|
||||||
bg_compaction_scheduled_--;
|
bg_compaction_scheduled_--;
|
||||||
|
Loading…
Reference in New Issue
Block a user