diff --git a/HISTORY.md b/HISTORY.md index 3b27a6a6a..f1205633a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,8 @@ # Rocksdb Change Log +## Unreleased +### Bug Fixes +* Fixed a race condition when disable and re-enable manual compaction. + ## 7.0.3 (03/23/2022) ### Bug Fixes * Fixed a major performance bug in which Bloom filters generated by pre-7.0 releases are not read by early 7.0.x releases (and vice-versa) due to changes to FilterPolicy::Name() in #9590. This can severely impact read performance and read I/O on upgrade or downgrade with existing DB, but not data correctness. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index fd0eb66eb..03502157d 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6973,8 +6973,7 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) { {{"DBImpl::BGWorkCompaction", "DBCompactionTest::DisableJustStartedManualCompaction:" "PreDisableManualCompaction"}, - {"DBCompactionTest::DisableJustStartedManualCompaction:" - "ManualCompactionReturn", + {"DBImpl::RunManualCompaction:Unscheduled", "BackgroundCallCompaction:0"}}); SyncPoint::GetInstance()->EnableProcessing(); @@ -6983,9 +6982,6 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) { cro.exclusive_manual_compaction = true; auto s = db_->CompactRange(cro, nullptr, nullptr); ASSERT_TRUE(s.IsIncomplete()); - TEST_SYNC_POINT( - "DBCompactionTest::DisableJustStartedManualCompaction:" - "ManualCompactionReturn"); }); TEST_SYNC_POINT( "DBCompactionTest::DisableJustStartedManualCompaction:" @@ -6995,6 +6991,43 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) { compact_thread.join(); } +TEST_F(DBCompactionTest, DisableInProgressManualCompaction) { + const int kNumL0Files = 4; + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumL0Files; + Reopen(options); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BackgroundCompaction:InProgress", + "DBCompactionTest::DisableInProgressManualCompaction:" + "PreDisableManualCompaction"}, + {"DBImpl::RunManualCompaction:Unscheduled", + "CompactionJob::Run():Start"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + // generate files, but avoid trigger auto compaction + for (int i = 0; i < kNumL0Files / 2; i++) { + ASSERT_OK(Put(Key(1), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + } + + port::Thread compact_thread([&]() { + CompactRangeOptions cro; + cro.exclusive_manual_compaction = true; + auto s = db_->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + }); + + TEST_SYNC_POINT( + "DBCompactionTest::DisableInProgressManualCompaction:" + "PreDisableManualCompaction"); + db_->DisableManualCompaction(); + + compact_thread.join(); +} + TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { const int kNumL0Files = 4; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 3fa039793..f632d5e9b 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1548,8 +1548,7 @@ class DBImpl : public DB { Compaction* compaction; // caller retains ownership of `manual_compaction_state` as it is reused // across background compactions. - std::shared_ptr - manual_compaction_state; // nullptr if non-manual + ManualCompactionState* manual_compaction_state; // nullptr if non-manual // task limiter token is requested during compaction picking. std::unique_ptr task_token; }; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index c2135f732..a99025675 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1789,10 +1789,11 @@ Status DBImpl::RunManualCompaction( CompactionArg* ca = nullptr; bool scheduled = false; + bool unscheduled = false; Env::Priority thread_pool_priority = Env::Priority::TOTAL; bool manual_conflict = false; - auto manual = std::make_shared( + ManualCompactionState manual( cfd, input_level, output_level, compact_range_options.target_path_id, exclusive, disallow_trivial_move, compact_range_options.canceled); // For universal compaction, we enforce every manual compaction to compact @@ -1800,18 +1801,18 @@ Status DBImpl::RunManualCompaction( if (begin == nullptr || cfd->ioptions()->compaction_style == kCompactionStyleUniversal || cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { - manual->begin = nullptr; + manual.begin = nullptr; } else { begin_storage.SetMinPossibleForUserKey(*begin); - manual->begin = &begin_storage; + manual.begin = &begin_storage; } if (end == nullptr || cfd->ioptions()->compaction_style == kCompactionStyleUniversal || cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { - manual->end = nullptr; + manual.end = nullptr; } else { end_storage.SetMaxPossibleForUserKey(*end); - manual->end = &end_storage; + manual.end = &end_storage; } TEST_SYNC_POINT("DBImpl::RunManualCompaction:0"); @@ -1823,10 +1824,10 @@ Status DBImpl::RunManualCompaction( // `DisableManualCompaction()` just waited for the manual compaction queue // to drain. So return immediately. TEST_SYNC_POINT("DBImpl::RunManualCompaction:PausedAtStart"); - manual->status = + manual.status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); - manual->done = true; - return manual->status; + manual.done = true; + return manual.status; } // When a manual compaction arrives, temporarily disable scheduling of @@ -1846,7 +1847,7 @@ Status DBImpl::RunManualCompaction( // However, only one of them will actually schedule compaction, while // others will wait on a condition variable until it completes. - AddManualCompaction(manual.get()); + AddManualCompaction(&manual); TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); if (exclusive) { // Limitation: there's no way to wake up the below loop when user sets @@ -1855,11 +1856,11 @@ Status DBImpl::RunManualCompaction( while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0) { if (manual_compaction_paused_ > 0 || - (manual->canceled != nullptr && *manual->canceled == true)) { + (manual.canceled != nullptr && *manual.canceled == true)) { // Pretend the error came from compaction so the below cleanup/error // handling code can process it. - manual->done = true; - manual->status = + manual.done = true; + manual.status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); break; } @@ -1881,64 +1882,63 @@ Status DBImpl::RunManualCompaction( // We don't check bg_error_ here, because if we get the error in compaction, // the compaction will set manual.status to bg_error_ and set manual.done to // true. - while (!manual->done) { + while (!manual.done) { assert(HasPendingManualCompaction()); manual_conflict = false; Compaction* compaction = nullptr; - if (ShouldntRunManualCompaction(manual.get()) || - (manual->in_progress == true) || scheduled || - (((manual->manual_end = &manual->tmp_storage1) != nullptr) && - ((compaction = manual->cfd->CompactRange( - *manual->cfd->GetLatestMutableCFOptions(), mutable_db_options_, - manual->input_level, manual->output_level, compact_range_options, - manual->begin, manual->end, &manual->manual_end, - &manual_conflict, max_file_num_to_ignore)) == nullptr && + if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) || + scheduled || + (((manual.manual_end = &manual.tmp_storage1) != nullptr) && + ((compaction = manual.cfd->CompactRange( + *manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_, + manual.input_level, manual.output_level, compact_range_options, + manual.begin, manual.end, &manual.manual_end, &manual_conflict, + max_file_num_to_ignore)) == nullptr && manual_conflict))) { // exclusive manual compactions should not see a conflict during // CompactRange assert(!exclusive || !manual_conflict); // Running either this or some other manual compaction bg_cv_.Wait(); - if (manual_compaction_paused_ > 0) { - manual->done = true; - manual->status = - Status::Incomplete(Status::SubCode::kManualCompactionPaused); - if (scheduled) { - assert(thread_pool_priority != Env::Priority::TOTAL); - auto unscheduled_task_num = env_->UnSchedule( - GetTaskTag(TaskType::kManualCompaction), thread_pool_priority); - if (unscheduled_task_num > 0) { - ROCKS_LOG_INFO( - immutable_db_options_.info_log, - "[%s] Unscheduled %d number of manual compactions from the " - "thread-pool", - cfd->GetName().c_str(), unscheduled_task_num); - } + if (manual_compaction_paused_ > 0 && scheduled && !unscheduled) { + assert(thread_pool_priority != Env::Priority::TOTAL); + // unschedule all manual compactions + auto unscheduled_task_num = env_->UnSchedule( + GetTaskTag(TaskType::kManualCompaction), thread_pool_priority); + if (unscheduled_task_num > 0) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "[%s] Unscheduled %d number of manual compactions from the " + "thread-pool", + cfd->GetName().c_str(), unscheduled_task_num); + // it may unschedule other manual compactions, notify others. + bg_cv_.SignalAll(); } - break; + unscheduled = true; + TEST_SYNC_POINT("DBImpl::RunManualCompaction:Unscheduled"); } - if (scheduled && manual->incomplete == true) { - assert(!manual->in_progress); + if (scheduled && manual.incomplete == true) { + assert(!manual.in_progress); scheduled = false; - manual->incomplete = false; + manual.incomplete = false; } } else if (!scheduled) { if (compaction == nullptr) { - manual->done = true; + manual.done = true; bg_cv_.SignalAll(); continue; } ca = new CompactionArg; ca->db = this; ca->prepicked_compaction = new PrepickedCompaction; - ca->prepicked_compaction->manual_compaction_state = manual; + ca->prepicked_compaction->manual_compaction_state = &manual; ca->prepicked_compaction->compaction = compaction; if (!RequestCompactionToken( cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) { // Don't throttle manual compaction, only count outstanding tasks. assert(false); } - manual->incomplete = false; + manual.incomplete = false; if (compaction->bottommost_level() && env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { bg_bottom_compaction_scheduled_++; @@ -1962,18 +1962,18 @@ Status DBImpl::RunManualCompaction( } log_buffer.FlushBufferToLog(); - assert(!manual->in_progress); + assert(!manual.in_progress); assert(HasPendingManualCompaction()); - RemoveManualCompaction(manual.get()); + RemoveManualCompaction(&manual); // if the manual job is unscheduled, try schedule other jobs in case there's // any unscheduled compaction job which was blocked by exclusive manual // compaction. - if (manual->status.IsIncomplete() && - manual->status.subcode() == Status::SubCode::kManualCompactionPaused) { + if (manual.status.IsIncomplete() && + manual.status.subcode() == Status::SubCode::kManualCompactionPaused) { MaybeScheduleFlushOrCompaction(); } bg_cv_.SignalAll(); - return manual->status; + return manual.status; } void DBImpl::GenerateFlushRequest(const autovector& cfds, @@ -2699,6 +2699,12 @@ void DBImpl::UnscheduleCompactionCallback(void* arg) { CompactionArg ca = *(ca_ptr); delete reinterpret_cast(arg); if (ca.prepicked_compaction != nullptr) { + // if it's a manual compaction, set status to ManualCompactionPaused + if (ca.prepicked_compaction->manual_compaction_state) { + ca.prepicked_compaction->manual_compaction_state->done = true; + ca.prepicked_compaction->manual_compaction_state->status = + Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } if (ca.prepicked_compaction->compaction != nullptr) { ca.prepicked_compaction->compaction->ReleaseCompactionFiles( Status::Incomplete(Status::SubCode::kManualCompactionPaused)); @@ -2941,7 +2947,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, mutex_.Lock(); } else if (s.IsManualCompactionPaused()) { assert(prepicked_compaction); - auto m = prepicked_compaction->manual_compaction_state; + ManualCompactionState* m = prepicked_compaction->manual_compaction_state; assert(m); ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused", m->cfd->GetName().c_str(), job_context.job_id); @@ -3023,7 +3029,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, LogBuffer* log_buffer, PrepickedCompaction* prepicked_compaction, Env::Priority thread_pri) { - std::shared_ptr manual_compaction = + ManualCompactionState* manual_compaction = prepicked_compaction == nullptr ? nullptr : prepicked_compaction->manual_compaction_state; @@ -3067,10 +3073,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (!status.ok()) { if (is_manual) { manual_compaction->status = status; - manual_compaction->status - .PermitUncheckedError(); // the manual compaction thread may exit - // first, which won't be able to check the - // status manual_compaction->done = true; manual_compaction->in_progress = false; manual_compaction = nullptr; @@ -3087,13 +3089,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, manual_compaction->in_progress = true; } + TEST_SYNC_POINT("DBImpl::BackgroundCompaction:InProgress"); + std::unique_ptr task_token; // InternalKey manual_end_storage; // InternalKey* manual_end = &manual_end_storage; bool sfm_reserved_compact_space = false; if (is_manual) { - auto m = manual_compaction; + ManualCompactionState* m = manual_compaction; assert(m->in_progress); if (!c) { m->done = true; @@ -3477,7 +3481,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c.reset(); if (is_manual) { - auto m = manual_compaction; + ManualCompactionState* m = manual_compaction; if (!status.ok()) { m->status = status; m->done = true;