diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index b901a5a78..034ec6322 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -290,6 +290,39 @@ TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) { Close(); } +TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) { + Options options = CurrentOptions(); + options.create_if_missing = true; + CreateAndReopenWithCF({"pikachu"}, options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:AfterScheduleFlush", + "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"}, + {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree", + "DBImpl::BackgroundCallFlush:start"}, + {"DBImpl::BackgroundCallFlush:start", + "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}}); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_EQ(2, handles_.size()); + ASSERT_OK(Put(1, "key", "value")); + auto* cfd = static_cast(handles_[1])->cfd(); + port::Thread drop_cf_thr([&]() { + TEST_SYNC_POINT( + "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"); + ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); + ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1])); + handles_.resize(1); + TEST_SYNC_POINT( + "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree"); + }); + FlushOptions flush_opts; + flush_opts.allow_write_stall = true; + ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts)); + drop_cf_thr.join(); + Close(); + SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { Options options = CurrentOptions(); options.create_if_missing = true; @@ -545,6 +578,49 @@ TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) { handles_.clear(); } +TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = atomic_flush; + CreateAndReopenWithCF({"pikachu"}, options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush", + "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"}, + {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree", + "DBImpl::BackgroundCallFlush:start"}, + {"DBImpl::BackgroundCallFlush:start", + "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}}); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_EQ(2, handles_.size()); + ASSERT_OK(Put(0, "key", "value")); + ASSERT_OK(Put(1, "key", "value")); + auto* cfd_default = + static_cast(dbfull()->DefaultColumnFamily()) + ->cfd(); + auto* cfd_pikachu = static_cast(handles_[1])->cfd(); + port::Thread drop_cf_thr([&]() { + TEST_SYNC_POINT( + "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"); + ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); + delete handles_[1]; + handles_.resize(1); + TEST_SYNC_POINT( + "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree"); + }); + FlushOptions flush_opts; + flush_opts.allow_write_stall = true; + ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu}, + flush_opts)); + drop_cf_thr.join(); + Close(); + SyncPoint::GetInstance()->DisableProcessing(); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e57768a74..737f23376 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -788,6 +788,16 @@ class DBImpl : public DB { Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false, ColumnFamilyHandle* cfh = nullptr); + Status TEST_FlushMemTable(ColumnFamilyData* cfd, + const FlushOptions& flush_opts); + + // Flush (multiple) ColumnFamilyData without using ColumnFamilyHandle. This + // is because in certain cases, we can flush column families, wait for the + // flush to complete, but delete the column family handle before the wait + // finishes. For example in CompactRange. + Status TEST_AtomicFlushMemTables(const autovector& cfds, + const FlushOptions& flush_opts); + // Wait for memtable compaction Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index ff03e591d..672924016 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1591,6 +1591,16 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, ColumnFamilyData* loop_cfd = elem.first; loop_cfd->imm()->FlushRequested(); } + // If the caller wants to wait for this flush to complete, it indicates + // that the caller expects the ColumnFamilyData not to be free'ed by + // other threads which may drop the column family concurrently. + // Therefore, we increase the cfd's ref count. + if (flush_options.wait) { + for (auto& elem : flush_req) { + ColumnFamilyData* loop_cfd = elem.first; + loop_cfd->Ref(); + } + } SchedulePendingFlush(flush_req, flush_reason); MaybeScheduleFlushOrCompaction(); } @@ -1599,7 +1609,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, write_thread_.ExitUnbatched(&w); } } - + TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush"); + TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush"); if (s.ok() && flush_options.wait) { autovector cfds; autovector flush_memtable_ids; @@ -1609,6 +1620,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } s = WaitForFlushMemTables(cfds, flush_memtable_ids, (flush_reason == FlushReason::kErrorRecovery)); + for (auto* tmp_cfd : cfds) { + if (tmp_cfd->Unref()) { + // Only one thread can reach here. + InstrumentedMutexLock lock_guard(&mutex_); + delete tmp_cfd; + } + } } TEST_SYNC_POINT("FlushMemTableFinished"); return s; @@ -1672,6 +1690,15 @@ Status DBImpl::AtomicFlushMemTables( for (auto cfd : cfds) { cfd->imm()->FlushRequested(); } + // If the caller wants to wait for this flush to complete, it indicates + // that the caller expects the ColumnFamilyData not to be free'ed by + // other threads which may drop the column family concurrently. + // Therefore, we increase the cfd's ref count. + if (flush_options.wait) { + for (auto cfd : cfds) { + cfd->Ref(); + } + } GenerateFlushRequest(cfds, &flush_req); SchedulePendingFlush(flush_req, flush_reason); MaybeScheduleFlushOrCompaction(); @@ -1682,7 +1709,7 @@ Status DBImpl::AtomicFlushMemTables( } } TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush"); - + TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"); if (s.ok() && flush_options.wait) { autovector flush_memtable_ids; for (auto& iter : flush_req) { @@ -1690,6 +1717,13 @@ Status DBImpl::AtomicFlushMemTables( } s = WaitForFlushMemTables(cfds, flush_memtable_ids, (flush_reason == FlushReason::kErrorRecovery)); + for (auto* cfd : cfds) { + if (cfd->Unref()) { + // Only one thread can reach here. + InstrumentedMutexLock lock_guard(&mutex_); + delete cfd; + } + } } return s; } @@ -2151,6 +2185,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, } status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, job_context, log_buffer, thread_pri); + TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush"); // All the CFDs in the FlushReq must have the same flush reason, so just // grab the first one *reason = bg_flush_args[0].cfd_->GetFlushReason(); diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index ec1e1b477..ec8489848 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -122,6 +122,16 @@ Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall, return FlushMemTable(cfd, fo, FlushReason::kTest); } +Status DBImpl::TEST_FlushMemTable(ColumnFamilyData* cfd, + const FlushOptions& flush_opts) { + return FlushMemTable(cfd, flush_opts, FlushReason::kTest); +} + +Status DBImpl::TEST_AtomicFlushMemTables( + const autovector& cfds, const FlushOptions& flush_opts) { + return AtomicFlushMemTables(cfds, flush_opts, FlushReason::kTest); +} + Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; if (column_family == nullptr) {