Fix StallWrite crash with mixed of slowdown/no_slowdown writes (#7508)
Summary: `BeginWriteStall()` removes no_slowdown write from the write list and updates `link_newer`, which makes `CreateMissingNewerLinks()` thought all write list has valid `link_newer` and failed to create link for all writers. It caused flaky test and SegFault for release build. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7508 Test Plan: Add unittest to reproduce the issue. Reviewed By: anand1976 Differential Revision: D24126601 Pulled By: jay-zhuang fbshipit-source-id: f8ac5dba653f7ee1b0950296427d4f5f8ee34a06
This commit is contained in:
parent
2496d3d4b8
commit
53089038de
@ -473,6 +473,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
|
||||
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
|
||||
disable_memtable);
|
||||
write_thread_.JoinBatchGroup(&w);
|
||||
TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup");
|
||||
if (w.state == WriteThread::STATE_GROUP_LEADER) {
|
||||
WriteThread::WriteGroup wal_write_group;
|
||||
if (w.callback && !w.callback->AllowWriteBatching()) {
|
||||
|
@ -42,6 +42,125 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) {
|
||||
ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
|
||||
}
|
||||
|
||||
TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
|
||||
Options options = GetOptions();
|
||||
options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
|
||||
4;
|
||||
std::vector<port::Thread> threads;
|
||||
std::atomic<int> thread_num(0);
|
||||
port::Mutex mutex;
|
||||
port::CondVar cv(&mutex);
|
||||
// Guarded by mutex
|
||||
int writers = 0;
|
||||
|
||||
Reopen(options);
|
||||
|
||||
std::function<void()> write_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = false;
|
||||
dbfull()->Put(wo, key, "bar");
|
||||
};
|
||||
std::function<void()> write_no_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = true;
|
||||
dbfull()->Put(wo, key, "bar");
|
||||
};
|
||||
std::function<void(void*)> unblock_main_thread_func = [&](void*) {
|
||||
mutex.Lock();
|
||||
++writers;
|
||||
cv.SignalAll();
|
||||
mutex.Unlock();
|
||||
};
|
||||
|
||||
// Create 3 L0 files and schedule 4th without waiting
|
||||
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||
Flush();
|
||||
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||
Flush();
|
||||
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||
Flush();
|
||||
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
|
||||
"DBImpl::BackgroundCallFlush:start"},
|
||||
{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
|
||||
"DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
|
||||
// Make compaction start wait for the write stall to be detected and
|
||||
// implemented by a write group leader
|
||||
{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
|
||||
"BackgroundCallCompaction:0"}});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Schedule creation of 4th L0 file without waiting. This will seal the
|
||||
// memtable and then wait for a sync point before writing the file. We need
|
||||
// to do it this way because SwitchMemtable() needs to enter the
|
||||
// write_thread
|
||||
FlushOptions fopt;
|
||||
fopt.wait = false;
|
||||
dbfull()->Flush(fopt);
|
||||
|
||||
// Create a mix of slowdown/no_slowdown write threads
|
||||
mutex.Lock();
|
||||
// First leader
|
||||
threads.emplace_back(write_slowdown_func);
|
||||
while (writers != 1) {
|
||||
cv.Wait();
|
||||
}
|
||||
|
||||
// Second leader. Will stall writes
|
||||
// Build a writers list with no slowdown in the middle:
|
||||
// +-------------+
|
||||
// | slowdown +<----+ newest
|
||||
// +--+----------+
|
||||
// |
|
||||
// v
|
||||
// +--+----------+
|
||||
// | no slowdown |
|
||||
// +--+----------+
|
||||
// |
|
||||
// v
|
||||
// +--+----------+
|
||||
// | slowdown +
|
||||
// +-------------+
|
||||
threads.emplace_back(write_slowdown_func);
|
||||
while (writers != 2) {
|
||||
cv.Wait();
|
||||
}
|
||||
threads.emplace_back(write_no_slowdown_func);
|
||||
while (writers != 3) {
|
||||
cv.Wait();
|
||||
}
|
||||
threads.emplace_back(write_slowdown_func);
|
||||
while (writers != 4) {
|
||||
cv.Wait();
|
||||
}
|
||||
|
||||
mutex.Unlock();
|
||||
|
||||
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
|
||||
dbfull()->TEST_WaitForFlushMemTable(nullptr);
|
||||
// This would have triggered a write stall. Unblock the write group leader
|
||||
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
|
||||
// The leader is going to create missing newer links. When the leader
|
||||
// finishes, the next leader is going to delay writes and fail writers with
|
||||
// no_slowdown
|
||||
|
||||
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
|
||||
TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
|
||||
Options options = GetOptions();
|
||||
options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
|
||||
|
@ -344,7 +344,13 @@ void WriteThread::BeginWriteStall() {
|
||||
prev->link_older = w->link_older;
|
||||
w->status = Status::Incomplete("Write stall");
|
||||
SetState(w, STATE_COMPLETED);
|
||||
if (prev->link_older) {
|
||||
// Only update `link_newer` if it's already set.
|
||||
// `CreateMissingNewerLinks()` will update the nullptr `link_newer` later,
|
||||
// which assumes the the first non-nullptr `link_newer` is the last
|
||||
// nullptr link in the writer list.
|
||||
// If `link_newer` is set here, `CreateMissingNewerLinks()` may stop
|
||||
// updating the whole list when it sees the first non nullptr link.
|
||||
if (prev->link_older && prev->link_older->link_newer) {
|
||||
prev->link_older->link_newer = prev;
|
||||
}
|
||||
w = prev->link_older;
|
||||
|
Loading…
Reference in New Issue
Block a user