From 19e217815d36a99263056949339c374a62584e74 Mon Sep 17 00:00:00 2001 From: anand76 Date: Thu, 23 Jan 2020 13:59:48 -0800 Subject: [PATCH] Fix queue manipulation in WriteThread::BeginWriteStall() (#6322) Summary: When there is a write stall, the active write group leader calls ```BeginWriteStall()``` to walk the queue of writers and remove any with the ```no_slowdown``` option set. There was a bug in the code which updated the back pointer but not the forward pointer (```link_newer```), corrupting the list and causing some threads to wait forever. This PR fixes it. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6322 Test Plan: Add a unit test in db_write_test Differential Revision: D19538313 Pulled By: anand1976 fbshipit-source-id: 6fbed819e594913f435886606f5d36f74f235c3a --- HISTORY.md | 4 ++ db/db_write_test.cc | 91 +++++++++++++++++++++++++++++++++++++++++++++ db/write_thread.cc | 7 ++++ 3 files changed, 102 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index 7b430709d..5ff60f4bc 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,8 @@ # Rocksdb Change Log +## 6.6.3 (01/24/2020) +### Bug Fixes +* Fix a bug that can cause write threads to hang when a slowdown/stall happens and there is a mix of writers with WriteOptions::no_slowdown set/unset. + ## 6.6.2 (01/13/2020) ### Bug Fixes * Fixed a bug where non-L0 compaction input files were not considered to compute the `creation_time` of new compaction outputs. diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 9eca823c2..d74dff03d 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -39,6 +39,97 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) { ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument()); } +TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) { + Options options = GetOptions(); + options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4; + std::vector threads; + std::atomic thread_num(0); + port::Mutex mutex; + port::CondVar cv(&mutex); + + Reopen(options); + + std::function write_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = false; + dbfull()->Put(wo, key, "bar"); + }; + std::function write_no_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = true; + dbfull()->Put(wo, key, "bar"); + }; + std::function unblock_main_thread_func = [&](void *) { + mutex.Lock(); + cv.SignalAll(); + mutex.Unlock(); + }; + + // Create 3 L0 files and schedule 4th without waiting + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + Flush(); + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + Flush(); + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + Flush(); + Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func); + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteTest::WriteThreadHangOnWriteStall:1", + "DBImpl::BackgroundCallFlush:start"}, + {"DBWriteTest::WriteThreadHangOnWriteStall:2", + "DBImpl::WriteImpl:BeforeLeaderEnters"}, + // Make compaction start wait for the write stall to be detected and + // implemented by a write group leader + {"DBWriteTest::WriteThreadHangOnWriteStall:3", + "BackgroundCallCompaction:0"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Schedule creation of 4th L0 file without waiting. This will seal the + // memtable and then wait for a sync point before writing the file. We need + // to do it this way because SwitchMemtable() needs to enter the + // write_thread + FlushOptions fopt; + fopt.wait = false; + dbfull()->Flush(fopt); + + // Create a mix of slowdown/no_slowdown write threads + mutex.Lock(); + // First leader + threads.emplace_back(write_slowdown_func); + cv.Wait(); + // Second leader. Will stall writes + threads.emplace_back(write_slowdown_func); + cv.Wait(); + threads.emplace_back(write_no_slowdown_func); + cv.Wait(); + threads.emplace_back(write_slowdown_func); + cv.Wait(); + threads.emplace_back(write_no_slowdown_func); + cv.Wait(); + threads.emplace_back(write_slowdown_func); + cv.Wait(); + mutex.Unlock(); + + TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1"); + dbfull()->TEST_WaitForFlushMemTable(nullptr); + // This would have triggered a write stall. Unblock the write group leader + TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2"); + // The leader is going to create missing newer links. When the leader finishes, + // the next leader is going to delay writes and fail writers with no_slowdown + + TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3"); + for (auto& t : threads) { + t.join(); + } +} + TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { constexpr int kNumThreads = 5; std::unique_ptr mock_env( diff --git a/db/write_thread.cc b/db/write_thread.cc index 1ded68fde..145f1f029 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -344,6 +344,9 @@ void WriteThread::BeginWriteStall() { prev->link_older = w->link_older; w->status = Status::Incomplete("Write stall"); SetState(w, STATE_COMPLETED); + if (prev->link_older) { + prev->link_older->link_newer = prev; + } w = prev->link_older; } else { prev = w; @@ -355,7 +358,11 @@ void WriteThread::BeginWriteStall() { void WriteThread::EndWriteStall() { MutexLock lock(&stall_mu_); + // Unlink write_stall_dummy_ from the write queue. This will unblock + // pending write threads to enqueue themselves assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_); + assert(write_stall_dummy_.link_older != nullptr); + write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer; newest_writer_.exchange(write_stall_dummy_.link_older); // Wake up writers