Ref and unref cfd before and after calling WaitForFlushMemTables (#5513)
Summary: This is to prevent bg flush thread from unrefing and deleting the cfd that has been dropped by a concurrent thread. Before RocksDB calls `DBImpl::WaitForFlushMemTables`, we should increase the refcount of each `ColumnFamilyData` so that its ref count will not drop to 0 even if the column family is dropped by another thread. Otherwise the bg flush thread can deref the cfd and deletes it, causing a segfault in `WaitForFlushMemtables` upon accessing `cfd`. Test plan (on devserver): ``` $make clean && COMPILE_WITH_ASAN=1 make -j32 $make check ``` All unit tests must pass. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5513 Differential Revision: D16062898 Pulled By: riversand963 fbshipit-source-id: 37dc511f1dc99f036d0201bbd7f0a8f5677c763d
This commit is contained in:
parent
1dcbe51bdf
commit
3e1a6dd665
@ -290,6 +290,39 @@ TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::FlushMemTable:AfterScheduleFlush",
|
||||
"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
|
||||
{"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
|
||||
"DBImpl::BackgroundCallFlush:start"},
|
||||
{"DBImpl::BackgroundCallFlush:start",
|
||||
"DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ASSERT_OK(Put(1, "key", "value"));
|
||||
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
|
||||
port::Thread drop_cf_thr([&]() {
|
||||
TEST_SYNC_POINT(
|
||||
"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
|
||||
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
|
||||
ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
|
||||
handles_.resize(1);
|
||||
TEST_SYNC_POINT(
|
||||
"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
|
||||
});
|
||||
FlushOptions flush_opts;
|
||||
flush_opts.allow_write_stall = true;
|
||||
ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts));
|
||||
drop_cf_thr.join();
|
||||
Close();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
@ -545,6 +578,49 @@ TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
|
||||
handles_.clear();
|
||||
}
|
||||
|
||||
TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) {
|
||||
bool atomic_flush = GetParam();
|
||||
if (!atomic_flush) {
|
||||
return;
|
||||
}
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
options.atomic_flush = atomic_flush;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
|
||||
"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
|
||||
{"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
|
||||
"DBImpl::BackgroundCallFlush:start"},
|
||||
{"DBImpl::BackgroundCallFlush:start",
|
||||
"DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
ASSERT_EQ(2, handles_.size());
|
||||
ASSERT_OK(Put(0, "key", "value"));
|
||||
ASSERT_OK(Put(1, "key", "value"));
|
||||
auto* cfd_default =
|
||||
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
|
||||
->cfd();
|
||||
auto* cfd_pikachu = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
|
||||
port::Thread drop_cf_thr([&]() {
|
||||
TEST_SYNC_POINT(
|
||||
"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
|
||||
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
|
||||
delete handles_[1];
|
||||
handles_.resize(1);
|
||||
TEST_SYNC_POINT(
|
||||
"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
|
||||
});
|
||||
FlushOptions flush_opts;
|
||||
flush_opts.allow_write_stall = true;
|
||||
ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu},
|
||||
flush_opts));
|
||||
drop_cf_thr.join();
|
||||
Close();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
|
||||
testing::Bool());
|
||||
|
||||
|
@ -756,6 +756,16 @@ class DBImpl : public DB {
|
||||
Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
|
||||
ColumnFamilyHandle* cfh = nullptr);
|
||||
|
||||
Status TEST_FlushMemTable(ColumnFamilyData* cfd,
|
||||
const FlushOptions& flush_opts);
|
||||
|
||||
// Flush (multiple) ColumnFamilyData without using ColumnFamilyHandle. This
|
||||
// is because in certain cases, we can flush column families, wait for the
|
||||
// flush to complete, but delete the column family handle before the wait
|
||||
// finishes. For example in CompactRange.
|
||||
Status TEST_AtomicFlushMemTables(const autovector<ColumnFamilyData*>& cfds,
|
||||
const FlushOptions& flush_opts);
|
||||
|
||||
// Wait for memtable compaction
|
||||
Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
|
||||
|
||||
|
@ -1565,6 +1565,16 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
ColumnFamilyData* loop_cfd = elem.first;
|
||||
loop_cfd->imm()->FlushRequested();
|
||||
}
|
||||
// If the caller wants to wait for this flush to complete, it indicates
|
||||
// that the caller expects the ColumnFamilyData not to be free'ed by
|
||||
// other threads which may drop the column family concurrently.
|
||||
// Therefore, we increase the cfd's ref count.
|
||||
if (flush_options.wait) {
|
||||
for (auto& elem : flush_req) {
|
||||
ColumnFamilyData* loop_cfd = elem.first;
|
||||
loop_cfd->Ref();
|
||||
}
|
||||
}
|
||||
SchedulePendingFlush(flush_req, flush_reason);
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
}
|
||||
@ -1573,7 +1583,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
write_thread_.ExitUnbatched(&w);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
|
||||
TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
|
||||
if (s.ok() && flush_options.wait) {
|
||||
autovector<ColumnFamilyData*> cfds;
|
||||
autovector<const uint64_t*> flush_memtable_ids;
|
||||
@ -1583,6 +1594,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
||||
}
|
||||
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
|
||||
(flush_reason == FlushReason::kErrorRecovery));
|
||||
for (auto* tmp_cfd : cfds) {
|
||||
if (tmp_cfd->Unref()) {
|
||||
// Only one thread can reach here.
|
||||
InstrumentedMutexLock lock_guard(&mutex_);
|
||||
delete tmp_cfd;
|
||||
}
|
||||
}
|
||||
}
|
||||
TEST_SYNC_POINT("FlushMemTableFinished");
|
||||
return s;
|
||||
@ -1646,6 +1664,15 @@ Status DBImpl::AtomicFlushMemTables(
|
||||
for (auto cfd : cfds) {
|
||||
cfd->imm()->FlushRequested();
|
||||
}
|
||||
// If the caller wants to wait for this flush to complete, it indicates
|
||||
// that the caller expects the ColumnFamilyData not to be free'ed by
|
||||
// other threads which may drop the column family concurrently.
|
||||
// Therefore, we increase the cfd's ref count.
|
||||
if (flush_options.wait) {
|
||||
for (auto cfd : cfds) {
|
||||
cfd->Ref();
|
||||
}
|
||||
}
|
||||
GenerateFlushRequest(cfds, &flush_req);
|
||||
SchedulePendingFlush(flush_req, flush_reason);
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
@ -1656,7 +1683,7 @@ Status DBImpl::AtomicFlushMemTables(
|
||||
}
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
|
||||
|
||||
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
|
||||
if (s.ok() && flush_options.wait) {
|
||||
autovector<const uint64_t*> flush_memtable_ids;
|
||||
for (auto& iter : flush_req) {
|
||||
@ -1664,6 +1691,13 @@ Status DBImpl::AtomicFlushMemTables(
|
||||
}
|
||||
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
|
||||
(flush_reason == FlushReason::kErrorRecovery));
|
||||
for (auto* cfd : cfds) {
|
||||
if (cfd->Unref()) {
|
||||
// Only one thread can reach here.
|
||||
InstrumentedMutexLock lock_guard(&mutex_);
|
||||
delete cfd;
|
||||
}
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@ -2125,6 +2159,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
|
||||
}
|
||||
status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
|
||||
job_context, log_buffer, thread_pri);
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
|
||||
// All the CFDs in the FlushReq must have the same flush reason, so just
|
||||
// grab the first one
|
||||
*reason = bg_flush_args[0].cfd_->GetFlushReason();
|
||||
|
@ -122,6 +122,16 @@ Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,
|
||||
return FlushMemTable(cfd, fo, FlushReason::kTest);
|
||||
}
|
||||
|
||||
Status DBImpl::TEST_FlushMemTable(ColumnFamilyData* cfd,
|
||||
const FlushOptions& flush_opts) {
|
||||
return FlushMemTable(cfd, flush_opts, FlushReason::kTest);
|
||||
}
|
||||
|
||||
Status DBImpl::TEST_AtomicFlushMemTables(
|
||||
const autovector<ColumnFamilyData*>& cfds, const FlushOptions& flush_opts) {
|
||||
return AtomicFlushMemTables(cfds, flush_opts, FlushReason::kTest);
|
||||
}
|
||||
|
||||
Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) {
|
||||
ColumnFamilyData* cfd;
|
||||
if (column_family == nullptr) {
|
||||
|
Loading…
Reference in New Issue
Block a user