Fix IngestExternalFile's bug with two_write_queue (#5976)

Summary:
When two_write_queue enable, IngestExternalFile performs EnterUnbatched on both write queues. SwitchMemtable also EnterUnbatched on 2nd write queue when this option is enabled. When the call stack includes IngestExternalFile -> FlushMemTable -> SwitchMemtable, this results into a deadlock.
The implemented solution is to pass on the existing writes_stopped argument in FlushMemTable to skip EnterUnbatched in SwitchMemtable.
Fixes https://github.com/facebook/rocksdb/issues/5974
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5976

Differential Revision: D18535943

Pulled By: maysamyabandeh

fbshipit-source-id: a4f9d4964c10d4a7ca06b1e0102ca2ec395512bc
This commit is contained in:
Little-Wallace 2019-11-15 13:59:03 -08:00 committed by Facebook Github Bot
parent 0058daef7b
commit f65ec09ef8
4 changed files with 73 additions and 14 deletions

View File

@ -1532,8 +1532,12 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
InstrumentedMutexLock guard_lock(&mutex_); InstrumentedMutexLock guard_lock(&mutex_);
WriteThread::Writer w; WriteThread::Writer w;
WriteThread::Writer nonmem_w;
if (!writes_stopped) { if (!writes_stopped) {
write_thread_.EnterUnbatched(&w, &mutex_); write_thread_.EnterUnbatched(&w, &mutex_);
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
} }
if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) {
@ -1596,6 +1600,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
if (!writes_stopped) { if (!writes_stopped) {
write_thread_.ExitUnbatched(&w); write_thread_.ExitUnbatched(&w);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
} }
} }
TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush"); TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
@ -1650,8 +1657,12 @@ Status DBImpl::AtomicFlushMemTables(
InstrumentedMutexLock guard_lock(&mutex_); InstrumentedMutexLock guard_lock(&mutex_);
WriteThread::Writer w; WriteThread::Writer w;
WriteThread::Writer nonmem_w;
if (!writes_stopped) { if (!writes_stopped) {
write_thread_.EnterUnbatched(&w, &mutex_); write_thread_.EnterUnbatched(&w, &mutex_);
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
} }
for (auto cfd : column_family_datas) { for (auto cfd : column_family_datas) {
@ -1695,6 +1706,9 @@ Status DBImpl::AtomicFlushMemTables(
if (!writes_stopped) { if (!writes_stopped) {
write_thread_.ExitUnbatched(&w); write_thread_.ExitUnbatched(&w);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
} }
} }
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush"); TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");

View File

@ -105,7 +105,16 @@ Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {
if (cfd == nullptr) { if (cfd == nullptr) {
cfd = default_cf_handle_->cfd(); cfd = default_cf_handle_->cfd();
} }
return SwitchMemtable(cfd, &write_context);
if (two_write_queues_) {
WriteThread::Writer nonmem_w;
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
Status s = SwitchMemtable(cfd, &write_context);
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
return s;
} else {
return SwitchMemtable(cfd, &write_context);
}
} }
Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall, Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,

View File

@ -1246,6 +1246,11 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
} }
MaybeFlushStatsCF(&cfds); MaybeFlushStatsCF(&cfds);
} }
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
for (const auto cfd : cfds) { for (const auto cfd : cfds) {
cfd->Ref(); cfd->Ref();
status = SwitchMemtable(cfd, write_context); status = SwitchMemtable(cfd, write_context);
@ -1254,6 +1259,10 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
break; break;
} }
} }
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
if (status.ok()) { if (status.ok()) {
if (immutable_db_options_.atomic_flush) { if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds); AssignAtomicFlushSeq(cfds);
@ -1314,6 +1323,10 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
MaybeFlushStatsCF(&cfds); MaybeFlushStatsCF(&cfds);
} }
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
for (const auto cfd : cfds) { for (const auto cfd : cfds) {
if (cfd->mem()->IsEmpty()) { if (cfd->mem()->IsEmpty()) {
continue; continue;
@ -1325,6 +1338,10 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
break; break;
} }
} }
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
if (status.ok()) { if (status.ok()) {
if (immutable_db_options_.atomic_flush) { if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds); AssignAtomicFlushSeq(cfds);
@ -1530,6 +1547,11 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
MaybeFlushStatsCF(&cfds); MaybeFlushStatsCF(&cfds);
} }
Status status; Status status;
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
for (auto& cfd : cfds) { for (auto& cfd : cfds) {
if (!cfd->mem()->IsEmpty()) { if (!cfd->mem()->IsEmpty()) {
status = SwitchMemtable(cfd, context); status = SwitchMemtable(cfd, context);
@ -1542,6 +1564,11 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
break; break;
} }
} }
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
if (status.ok()) { if (status.ok()) {
if (immutable_db_options_.atomic_flush) { if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds); AssignAtomicFlushSeq(cfds);
@ -1572,15 +1599,11 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
// REQUIRES: mutex_ is held // REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the writer queue
// REQUIRES: this thread is currently at the front of the 2nd writer queue if
// two_write_queues_ is true (This is to simplify the reasoning.)
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
mutex_.AssertHeld(); mutex_.AssertHeld();
WriteThread::Writer nonmem_w; WriteThread::Writer nonmem_w;
if (two_write_queues_) {
// SwitchMemtable is a rare event. To simply the reasoning, we make sure
// that there is no concurrent thread writing to WAL.
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
std::unique_ptr<WritableFile> lfile; std::unique_ptr<WritableFile> lfile;
log::Writer* new_log = nullptr; log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr; MemTable* new_mem = nullptr;
@ -1687,10 +1710,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable); error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
// Read back bg_error in order to get the right severity // Read back bg_error in order to get the right severity
s = error_handler_.GetBGError(); s = error_handler_.GetBGError();
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
return s; return s;
} }
@ -1721,9 +1740,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
NotifyOnMemTableSealed(cfd, memtable_info); NotifyOnMemTableSealed(cfd, memtable_info);
mutex_.Lock(); mutex_.Lock();
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
return s; return s;
} }

View File

@ -2750,6 +2750,26 @@ TEST_P(ExternalSSTFileTest,
Destroy(options, true /* delete_cf_paths */); Destroy(options, true /* delete_cf_paths */);
} }
TEST_P(ExternalSSTFileTest, IngestFilesTriggerFlushingWithTwoWriteQueue) {
Options options = CurrentOptions();
// Use large buffer to avoid memtable flush
options.write_buffer_size = 1024 * 1024;
options.two_write_queues = true;
DestroyAndReopen(options);
ASSERT_OK(dbfull()->Put(WriteOptions(), "1000", "v1"));
ASSERT_OK(dbfull()->Put(WriteOptions(), "1001", "v1"));
ASSERT_OK(dbfull()->Put(WriteOptions(), "9999", "v1"));
// Put one key which is overlap with keys in memtable.
// It will trigger flushing memtable and require this thread is
// currently at the front of the 2nd writer queue. We must make
// sure that it won't enter the 2nd writer queue for the second time.
std::vector<std::pair<std::string, std::string>> data;
data.push_back(std::make_pair("1001", "v2"));
GenerateAndAddExternalFile(options, data);
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest, INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
testing::Values(std::make_tuple(false, false), testing::Values(std::make_tuple(false, false),
std::make_tuple(false, true), std::make_tuple(false, true),