From 6d2577e5672a7abe7b41a67f1cccce3a6601b30e Mon Sep 17 00:00:00 2001 From: RoeyMaor Date: Mon, 25 Apr 2022 18:52:33 -0700 Subject: [PATCH] Bugfix/fix manual flush blocking bug (#9893) Summary: Fix https://github.com/facebook/rocksdb/issues/9892 Pull Request resolved: https://github.com/facebook/rocksdb/pull/9893 Reviewed By: jay-zhuang Differential Revision: D35880959 Pulled By: ajkr fbshipit-source-id: dad1139ad0983cfbd5c5cd6fa6b71022f889735a --- HISTORY.md | 3 ++ db/db_impl/db_impl_compaction_flush.cc | 6 ++-- db/db_test.cc | 45 ++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 66a80f4c7..cf387ca46 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,8 @@ # Rocksdb Change Log ## Unreleased +### Bug Fixes +* Fixed a bug where manual flush would block forever even though flush options had wait=false. + ### New Features * DB::GetLiveFilesStorageInfo is ready for production use. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 7808de8de..5b508c00d 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1704,9 +1704,11 @@ Status DBImpl::Flush(const FlushOptions& flush_options, Status s; if (immutable_db_options_.atomic_flush) { s = AtomicFlushMemTables({cfh->cfd()}, flush_options, - FlushReason::kManualFlush); + FlushReason::kManualFlush, + write_controller().IsStopped()); } else { - s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush); + s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush, + write_controller().IsStopped()); } ROCKS_LOG_INFO(immutable_db_options_.info_log, diff --git a/db/db_test.cc b/db/db_test.cc index a0557d272..97d6b2976 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7034,6 +7034,51 @@ TEST_F(DBTest, MemoryUsageWithMaxWriteBufferSizeToMaintain) { } } +TEST_F(DBTest, ManualFlushWithStoppedWritesTest) { + Options options = CurrentOptions(); + + // Setting a small write buffer size. This will block writes + // rather quickly until a flush is made. + constexpr unsigned int memtableSize = 1000; + options.db_write_buffer_size = memtableSize; + options.max_background_flushes = 1; + Reopen(options); + + std::atomic done(false); + + // Will suppress future flushes. + // This simulates a situation where the writes will be stopped for any reason. + ASSERT_OK(dbfull()->PauseBackgroundWork()); + + port::Thread backgroundWriter([&]() { + Random rnd(301); + // These writes won't finish. + ASSERT_OK(Put("k1", rnd.RandomString(memtableSize / 2))); + ASSERT_OK(Put("k2", rnd.RandomString(memtableSize / 2))); + ASSERT_OK(Put("k3", rnd.RandomString(memtableSize / 2))); + ASSERT_OK(Put("k4", rnd.RandomString(memtableSize / 2))); + done.store(true); + }); + + env_->SleepForMicroseconds(1000000 / 2); + // make sure thread is stuck waiting for flush. + ASSERT_FALSE(done.load()); + + FlushOptions flushOptions; + flushOptions.wait = false; + flushOptions.allow_write_stall = true; + + // This is the main goal of the test. This manual flush should not deadlock + // as we use wait=false, and even allow_write_stall=true for extra safety. + ASSERT_OK(dbfull()->Flush(flushOptions)); + + // This will re-allow background flushes. + ASSERT_OK(dbfull()->ContinueBackgroundWork()); + + backgroundWriter.join(); + ASSERT_TRUE(done.load()); +} + #endif } // namespace ROCKSDB_NAMESPACE