Fix queue manipulation in WriteThread::BeginWriteStall() (#6322)
Summary: When there is a write stall, the active write group leader calls ```BeginWriteStall()``` to walk the queue of writers and remove any with the ```no_slowdown``` option set. There was a bug in the code which updated the back pointer but not the forward pointer (```link_newer```), corrupting the list and causing some threads to wait forever. This PR fixes it. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6322 Test Plan: Add a unit test in db_write_test Differential Revision: D19538313 Pulled By: anand1976 fbshipit-source-id: 6fbed819e594913f435886606f5d36f74f235c3a
This commit is contained in:
parent
1fab610a29
commit
19e217815d
@ -1,4 +1,8 @@
|
|||||||
# Rocksdb Change Log
|
# Rocksdb Change Log
|
||||||
|
## 6.6.3 (01/24/2020)
|
||||||
|
### Bug Fixes
|
||||||
|
* Fix a bug that can cause write threads to hang when a slowdown/stall happens and there is a mix of writers with WriteOptions::no_slowdown set/unset.
|
||||||
|
|
||||||
## 6.6.2 (01/13/2020)
|
## 6.6.2 (01/13/2020)
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
* Fixed a bug where non-L0 compaction input files were not considered to compute the `creation_time` of new compaction outputs.
|
* Fixed a bug where non-L0 compaction input files were not considered to compute the `creation_time` of new compaction outputs.
|
||||||
|
@ -39,6 +39,97 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) {
|
|||||||
ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
|
ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
|
||||||
|
Options options = GetOptions();
|
||||||
|
options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
|
||||||
|
std::vector<port::Thread> threads;
|
||||||
|
std::atomic<int> thread_num(0);
|
||||||
|
port::Mutex mutex;
|
||||||
|
port::CondVar cv(&mutex);
|
||||||
|
|
||||||
|
Reopen(options);
|
||||||
|
|
||||||
|
std::function<void()> write_slowdown_func = [&]() {
|
||||||
|
int a = thread_num.fetch_add(1);
|
||||||
|
std::string key = "foo" + std::to_string(a);
|
||||||
|
WriteOptions wo;
|
||||||
|
wo.no_slowdown = false;
|
||||||
|
dbfull()->Put(wo, key, "bar");
|
||||||
|
};
|
||||||
|
std::function<void()> write_no_slowdown_func = [&]() {
|
||||||
|
int a = thread_num.fetch_add(1);
|
||||||
|
std::string key = "foo" + std::to_string(a);
|
||||||
|
WriteOptions wo;
|
||||||
|
wo.no_slowdown = true;
|
||||||
|
dbfull()->Put(wo, key, "bar");
|
||||||
|
};
|
||||||
|
std::function<void(void *)> unblock_main_thread_func = [&](void *) {
|
||||||
|
mutex.Lock();
|
||||||
|
cv.SignalAll();
|
||||||
|
mutex.Unlock();
|
||||||
|
};
|
||||||
|
|
||||||
|
// Create 3 L0 files and schedule 4th without waiting
|
||||||
|
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||||
|
Flush();
|
||||||
|
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||||
|
Flush();
|
||||||
|
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||||
|
Flush();
|
||||||
|
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
||||||
|
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
|
||||||
|
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||||
|
{{"DBWriteTest::WriteThreadHangOnWriteStall:1",
|
||||||
|
"DBImpl::BackgroundCallFlush:start"},
|
||||||
|
{"DBWriteTest::WriteThreadHangOnWriteStall:2",
|
||||||
|
"DBImpl::WriteImpl:BeforeLeaderEnters"},
|
||||||
|
// Make compaction start wait for the write stall to be detected and
|
||||||
|
// implemented by a write group leader
|
||||||
|
{"DBWriteTest::WriteThreadHangOnWriteStall:3",
|
||||||
|
"BackgroundCallCompaction:0"}});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
// Schedule creation of 4th L0 file without waiting. This will seal the
|
||||||
|
// memtable and then wait for a sync point before writing the file. We need
|
||||||
|
// to do it this way because SwitchMemtable() needs to enter the
|
||||||
|
// write_thread
|
||||||
|
FlushOptions fopt;
|
||||||
|
fopt.wait = false;
|
||||||
|
dbfull()->Flush(fopt);
|
||||||
|
|
||||||
|
// Create a mix of slowdown/no_slowdown write threads
|
||||||
|
mutex.Lock();
|
||||||
|
// First leader
|
||||||
|
threads.emplace_back(write_slowdown_func);
|
||||||
|
cv.Wait();
|
||||||
|
// Second leader. Will stall writes
|
||||||
|
threads.emplace_back(write_slowdown_func);
|
||||||
|
cv.Wait();
|
||||||
|
threads.emplace_back(write_no_slowdown_func);
|
||||||
|
cv.Wait();
|
||||||
|
threads.emplace_back(write_slowdown_func);
|
||||||
|
cv.Wait();
|
||||||
|
threads.emplace_back(write_no_slowdown_func);
|
||||||
|
cv.Wait();
|
||||||
|
threads.emplace_back(write_slowdown_func);
|
||||||
|
cv.Wait();
|
||||||
|
mutex.Unlock();
|
||||||
|
|
||||||
|
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
|
||||||
|
dbfull()->TEST_WaitForFlushMemTable(nullptr);
|
||||||
|
// This would have triggered a write stall. Unblock the write group leader
|
||||||
|
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
|
||||||
|
// The leader is going to create missing newer links. When the leader finishes,
|
||||||
|
// the next leader is going to delay writes and fail writers with no_slowdown
|
||||||
|
|
||||||
|
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
|
||||||
|
for (auto& t : threads) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
|
TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
|
||||||
constexpr int kNumThreads = 5;
|
constexpr int kNumThreads = 5;
|
||||||
std::unique_ptr<FaultInjectionTestEnv> mock_env(
|
std::unique_ptr<FaultInjectionTestEnv> mock_env(
|
||||||
|
@ -344,6 +344,9 @@ void WriteThread::BeginWriteStall() {
|
|||||||
prev->link_older = w->link_older;
|
prev->link_older = w->link_older;
|
||||||
w->status = Status::Incomplete("Write stall");
|
w->status = Status::Incomplete("Write stall");
|
||||||
SetState(w, STATE_COMPLETED);
|
SetState(w, STATE_COMPLETED);
|
||||||
|
if (prev->link_older) {
|
||||||
|
prev->link_older->link_newer = prev;
|
||||||
|
}
|
||||||
w = prev->link_older;
|
w = prev->link_older;
|
||||||
} else {
|
} else {
|
||||||
prev = w;
|
prev = w;
|
||||||
@ -355,7 +358,11 @@ void WriteThread::BeginWriteStall() {
|
|||||||
void WriteThread::EndWriteStall() {
|
void WriteThread::EndWriteStall() {
|
||||||
MutexLock lock(&stall_mu_);
|
MutexLock lock(&stall_mu_);
|
||||||
|
|
||||||
|
// Unlink write_stall_dummy_ from the write queue. This will unblock
|
||||||
|
// pending write threads to enqueue themselves
|
||||||
assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
|
assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
|
||||||
|
assert(write_stall_dummy_.link_older != nullptr);
|
||||||
|
write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
|
||||||
newest_writer_.exchange(write_stall_dummy_.link_older);
|
newest_writer_.exchange(write_stall_dummy_.link_older);
|
||||||
|
|
||||||
// Wake up writers
|
// Wake up writers
|
||||||
|
Loading…
Reference in New Issue
Block a user