From 3e1a6dd66515a0b60c39f0ef063fcc86e080d0e4 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 1 Jul 2019 14:04:10 -0700 Subject: [PATCH] Ref and unref cfd before and after calling WaitForFlushMemTables (#5513) Summary: This is to prevent bg flush thread from unrefing and deleting the cfd that has been dropped by a concurrent thread. Before RocksDB calls `DBImpl::WaitForFlushMemTables`, we should increase the refcount of each `ColumnFamilyData` so that its ref count will not drop to 0 even if the column family is dropped by another thread. Otherwise the bg flush thread can deref the cfd and deletes it, causing a segfault in `WaitForFlushMemtables` upon accessing `cfd`. Test plan (on devserver): ``` $make clean && COMPILE_WITH_ASAN=1 make -j32 $make check ``` All unit tests must pass. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5513 Differential Revision: D16062898 Pulled By: riversand963 fbshipit-source-id: 37dc511f1dc99f036d0201bbd7f0a8f5677c763d --- db/db_flush_test.cc | 76 ++++++++++++++++++++++++++ db/db_impl/db_impl.h | 10 ++++ db/db_impl/db_impl_compaction_flush.cc | 39 ++++++++++++- db/db_impl/db_impl_debug.cc | 10 ++++ 4 files changed, 133 insertions(+), 2 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index b901a5a78..034ec6322 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -290,6 +290,39 @@ TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) { Close(); } +TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) { + Options options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({"pikachu"}, options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:AfterScheduleFlush", + "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"}, + {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree", + "DBImpl::BackgroundCallFlush:start"}, + {"DBImpl::BackgroundCallFlush:start", + "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}}); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_EQ(2, handles_.size()); + ASSERT_OK(Put(1, "key", "value")); + auto* cfd = static_cast(handles_[1])->cfd(); + port::Thread drop_cf_thr([&]() { + TEST_SYNC_POINT( + "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"); + ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); + ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1])); + handles_.resize(1); + TEST_SYNC_POINT( + "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree"); + }); + FlushOptions flush_opts; + flush_opts.allow_write_stall = true; + ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts)); + drop_cf_thr.join(); + Close(); + SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { Options options = CurrentOptions(); options.create_if_missing = true; @@ -545,6 +578,49 @@ TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) { handles_.clear(); } +TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = atomic_flush; + CreateAndReopenWithCF({"pikachu"}, options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush", + "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"}, + {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree", + "DBImpl::BackgroundCallFlush:start"}, + {"DBImpl::BackgroundCallFlush:start", + "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}}); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_EQ(2, handles_.size()); + ASSERT_OK(Put(0, "key", "value")); + ASSERT_OK(Put(1, "key", "value")); + auto* cfd_default = + static_cast(dbfull()->DefaultColumnFamily()) + ->cfd(); + auto* cfd_pikachu = static_cast(handles_[1])->cfd(); + port::Thread drop_cf_thr([&]() { + TEST_SYNC_POINT( + "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"); + ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); + delete handles_[1]; + handles_.resize(1); + TEST_SYNC_POINT( + "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree"); + }); + FlushOptions flush_opts; + flush_opts.allow_write_stall = true; + ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu}, + flush_opts)); + drop_cf_thr.join(); + Close(); + SyncPoint::GetInstance()->DisableProcessing(); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e6d5a56e2..e5f8c6f49 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -756,6 +756,16 @@ class DBImpl : public DB { Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false, ColumnFamilyHandle* cfh = nullptr); + Status TEST_FlushMemTable(ColumnFamilyData* cfd, + const FlushOptions& flush_opts); + + // Flush (multiple) ColumnFamilyData without using ColumnFamilyHandle. This + // is because in certain cases, we can flush column families, wait for the + // flush to complete, but delete the column family handle before the wait + // finishes. For example in CompactRange. + Status TEST_AtomicFlushMemTables(const autovector& cfds, + const FlushOptions& flush_opts); + // Wait for memtable compaction Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 8cb37484c..4c230ea38 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1565,6 +1565,16 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, ColumnFamilyData* loop_cfd = elem.first; loop_cfd->imm()->FlushRequested(); } + // If the caller wants to wait for this flush to complete, it indicates + // that the caller expects the ColumnFamilyData not to be free'ed by + // other threads which may drop the column family concurrently. + // Therefore, we increase the cfd's ref count. + if (flush_options.wait) { + for (auto& elem : flush_req) { + ColumnFamilyData* loop_cfd = elem.first; + loop_cfd->Ref(); + } + } SchedulePendingFlush(flush_req, flush_reason); MaybeScheduleFlushOrCompaction(); } @@ -1573,7 +1583,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, write_thread_.ExitUnbatched(&w); } } - + TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush"); + TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush"); if (s.ok() && flush_options.wait) { autovector cfds; autovector flush_memtable_ids; @@ -1583,6 +1594,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } s = WaitForFlushMemTables(cfds, flush_memtable_ids, (flush_reason == FlushReason::kErrorRecovery)); + for (auto* tmp_cfd : cfds) { + if (tmp_cfd->Unref()) { + // Only one thread can reach here. + InstrumentedMutexLock lock_guard(&mutex_); + delete tmp_cfd; + } + } } TEST_SYNC_POINT("FlushMemTableFinished"); return s; @@ -1646,6 +1664,15 @@ Status DBImpl::AtomicFlushMemTables( for (auto cfd : cfds) { cfd->imm()->FlushRequested(); } + // If the caller wants to wait for this flush to complete, it indicates + // that the caller expects the ColumnFamilyData not to be free'ed by + // other threads which may drop the column family concurrently. + // Therefore, we increase the cfd's ref count. + if (flush_options.wait) { + for (auto cfd : cfds) { + cfd->Ref(); + } + } GenerateFlushRequest(cfds, &flush_req); SchedulePendingFlush(flush_req, flush_reason); MaybeScheduleFlushOrCompaction(); @@ -1656,7 +1683,7 @@ Status DBImpl::AtomicFlushMemTables( } } TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush"); - + TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"); if (s.ok() && flush_options.wait) { autovector flush_memtable_ids; for (auto& iter : flush_req) { @@ -1664,6 +1691,13 @@ Status DBImpl::AtomicFlushMemTables( } s = WaitForFlushMemTables(cfds, flush_memtable_ids, (flush_reason == FlushReason::kErrorRecovery)); + for (auto* cfd : cfds) { + if (cfd->Unref()) { + // Only one thread can reach here. + InstrumentedMutexLock lock_guard(&mutex_); + delete cfd; + } + } } return s; } @@ -2125,6 +2159,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, } status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, job_context, log_buffer, thread_pri); + TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush"); // All the CFDs in the FlushReq must have the same flush reason, so just // grab the first one *reason = bg_flush_args[0].cfd_->GetFlushReason(); diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index ec1e1b477..ec8489848 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -122,6 +122,16 @@ Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall, return FlushMemTable(cfd, fo, FlushReason::kTest); } +Status DBImpl::TEST_FlushMemTable(ColumnFamilyData* cfd, + const FlushOptions& flush_opts) { + return FlushMemTable(cfd, flush_opts, FlushReason::kTest); +} + +Status DBImpl::TEST_AtomicFlushMemTables( + const autovector& cfds, const FlushOptions& flush_opts) { + return AtomicFlushMemTables(cfds, flush_opts, FlushReason::kTest); +} + Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; if (column_family == nullptr) {