diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 63c329aa8..dd292c090 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -2016,6 +2016,7 @@ void DBImpl::BackgroundCallFlush() { job_context.Clean(); mutex_.Lock(); } + TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp"); assert(num_running_flushes_ > 0); num_running_flushes_--; diff --git a/db/db_test.cc b/db/db_test.cc index a0a32f6cd..ba196d9b2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5649,41 +5649,18 @@ TEST_F(DBTest, HardLimit) { #if !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) class WriteStallListener : public EventListener { public: - WriteStallListener() - : cond_(&mutex_), - condition_(WriteStallCondition::kNormal), - expected_(WriteStallCondition::kNormal), - expected_set_(false) {} + WriteStallListener() : condition_(WriteStallCondition::kNormal) {} void OnStallConditionsChanged(const WriteStallInfo& info) override { MutexLock l(&mutex_); condition_ = info.condition.cur; - if (expected_set_ && condition_ == expected_) { - cond_.Signal(); - expected_set_ = false; - } } bool CheckCondition(WriteStallCondition expected) { MutexLock l(&mutex_); - if (expected != condition_) { - expected_ = expected; - expected_set_ = true; - while (expected != condition_) { - // We bail out on timeout 500 milliseconds - const uint64_t timeout_us = 500000; - if (cond_.TimedWait(timeout_us)) { - expected_set_ = false; - return false; - } - } - } - return true; + return expected == condition_; } private: port::Mutex mutex_; - port::CondVar cond_; WriteStallCondition condition_; - WriteStallCondition expected_; - bool expected_set_; }; TEST_F(DBTest, SoftLimit) { @@ -5704,6 +5681,41 @@ TEST_F(DBTest, SoftLimit) { WriteStallListener* listener = new WriteStallListener(); options.listeners.emplace_back(listener); + // FlushMemtable with opt.wait=true does not wait for + // `OnStallConditionsChanged` being called. The event listener is triggered + // on `JobContext::Clean`, which happens after flush result is installed. + // We use sync point to create a custom WaitForFlush that waits for + // context cleanup. + port::Mutex flush_mutex; + port::CondVar flush_cv(&flush_mutex); + bool flush_finished = false; + auto InstallFlushCallback = [&]() { + { + MutexLock l(&flush_mutex); + flush_finished = false; + } + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:ContextCleanedUp", [&](void*) { + { + MutexLock l(&flush_mutex); + flush_finished = true; + } + flush_cv.SignalAll(); + }); + }; + auto WaitForFlush = [&]() { + { + MutexLock l(&flush_mutex); + while (!flush_finished) { + flush_cv.Wait(); + } + } + SyncPoint::GetInstance()->ClearCallBack( + "DBImpl::BackgroundCallFlush:ContextCleanedUp"); + }; + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); // Generating 360KB in Level 3 @@ -5739,7 +5751,9 @@ TEST_F(DBTest, SoftLimit) { Put(Key(i), std::string(5000, 'x')); Put(Key(100 - i), std::string(5000, 'x')); // Flush the file. File size is around 30KB. + InstallFlushCallback(); dbfull()->TEST_FlushMemTable(true, true); + WaitForFlush(); } ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); @@ -5764,8 +5778,6 @@ TEST_F(DBTest, SoftLimit) { &sleeping_task_low, Env::Priority::LOW); }); - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); sleeping_task_low.WaitUntilSleeping(); @@ -5774,7 +5786,9 @@ TEST_F(DBTest, SoftLimit) { Put(Key(10 + i), std::string(5000, 'x')); Put(Key(90 - i), std::string(5000, 'x')); // Flush the file. File size is around 30KB. + InstallFlushCallback(); dbfull()->TEST_FlushMemTable(true, true); + WaitForFlush(); } // Wake up sleep task to enable compaction to run and waits @@ -5795,7 +5809,9 @@ TEST_F(DBTest, SoftLimit) { Put(Key(20 + i), std::string(5000, 'x')); Put(Key(80 - i), std::string(5000, 'x')); // Flush the file. File size is around 30KB. + InstallFlushCallback(); dbfull()->TEST_FlushMemTable(true, true); + WaitForFlush(); } // Wake up sleep task to enable compaction to run and waits // for it to go to sleep state again to make sure one compaction