From 854a4be03f80a785387a7048d01b6d9923b95cef Mon Sep 17 00:00:00 2001 From: Anand Ananthabhotla Date: Tue, 9 Oct 2018 22:50:59 -0700 Subject: [PATCH] Handle mixed slowdown/no_slowdown writer properly (#4475) Summary: There is a bug when the write queue leader is blocked on a write delay/stop, and the queue has writers with WriteOptions::no_slowdown set to true. They are not woken up until the write stall is cleared. The fix introduces a dummy writer inserted at the tail to indicate a write stall and prevent further inserts into the queue, and a condition variable that writers who can tolerate slowdown wait on before adding themselves to the queue. The leader calls WriteThread::BeginWriteStall() to add the dummy writer and then walk the queue to fail any writers with no_slowdown set. Once the stall clears, the leader calls WriteThread::EndWriteStall() to remove the dummy writer and signal the condition variable. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4475 Differential Revision: D10285827 Pulled By: anand1976 fbshipit-source-id: 747465e5e7f07a829b1fb0bc1afcd7b93f4ab1a9 --- HISTORY.md | 3 + db/db_impl.h | 1 + db/db_impl_write.cc | 14 +++- db/db_test.cc | 190 ++++++++++++++++++++++++++++++++++++++++++++ db/write_thread.cc | 59 +++++++++++++- db/write_thread.h | 18 +++++ 6 files changed, 282 insertions(+), 3 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index a9446fbe1..75161a326 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,9 @@ ### New Features * Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache. +### Bug Fixes +* Fix corner case where a write group leader blocked due to write stall blocks other writers in queue with WriteOptions::no_slowdown set. + ## 5.17.0 (10/05/2018) ### Public API Change * `OnTableFileCreated` will now be called for empty files generated during compaction. In that case, `TableFileCreationInfo::file_path` will be "(nil)" and `TableFileCreationInfo::file_size` will be zero. diff --git a/db/db_impl.h b/db/db_impl.h index 645a55553..c9640e0ef 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -816,6 +816,7 @@ class DBImpl : public DB { friend struct SuperVersion; friend class CompactedDBImpl; friend class DBTest_ConcurrentFlushWAL_Test; + friend class DBTest_MixedSlowdownOptionsStop_Test; #ifndef NDEBUG friend class DBTest2_ReadCallbackTest_Test; friend class WriteCallbackTest_WriteWithCallbackTest_Test; diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index ff786e113..29b54bfd1 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1162,10 +1162,14 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t delay = write_controller_.GetDelay(env_, num_bytes); if (delay > 0) { if (write_options.no_slowdown) { - return Status::Incomplete(); + return Status::Incomplete("Write stall"); } TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); + // Notify write_thread_ about the stall so it can setup a barrier and + // fail any pending writers with no_slowdown + write_thread_.BeginWriteStall(); + TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone"); mutex_.Unlock(); // We will delay the write until we have slept for delay ms or // we don't need a delay anymore @@ -1182,6 +1186,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, env_->SleepForMicroseconds(kDelayInterval); } mutex_.Lock(); + write_thread_.EndWriteStall(); } // Don't wait if there's a background error, even if its a soft error. We @@ -1190,11 +1195,16 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, // indefinitely while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) { if (write_options.no_slowdown) { - return Status::Incomplete(); + return Status::Incomplete("Write stall"); } delayed = true; + + // Notify write_thread_ about the stall so it can setup a barrier and + // fail any pending writers with no_slowdown + write_thread_.BeginWriteStall(); TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); bg_cv_.Wait(); + write_thread_.EndWriteStall(); } } assert(!delayed || !write_options.no_slowdown); diff --git a/db/db_test.cc b/db/db_test.cc index f331f8e77..682a382d7 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -262,6 +262,196 @@ TEST_F(DBTest, SkipDelay) { } } +TEST_F(DBTest, MixedSlowdownOptions) { + Options options = CurrentOptions(); + options.env = env_; + options.write_buffer_size = 100000; + CreateAndReopenWithCF({"pikachu"}, options); + std::vector threads; + std::atomic thread_num(0); + + std::function write_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = false; + ASSERT_OK(dbfull()->Put(wo, key, "bar")); + }; + std::function write_no_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = true; + ASSERT_NOK(dbfull()->Put(wo, key, "bar")); + }; + // Use a small number to ensure a large delay that is still effective + // when we do Put + // TODO(myabandeh): this is time dependent and could potentially make + // the test flaky + auto token = dbfull()->TEST_write_controler().GetDelayToken(1); + std::atomic sleep_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:BeginWriteStallDone", + [&](void* /*arg*/) { + sleep_count.fetch_add(1); + if (threads.empty()) { + for (int i = 0; i < 2; ++i) { + threads.emplace_back(write_slowdown_func); + } + for (int i = 0; i < 2; ++i) { + threads.emplace_back(write_no_slowdown_func); + } + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + WriteOptions wo; + wo.sync = false; + wo.disableWAL = false; + wo.no_slowdown = false; + dbfull()->Put(wo, "foo", "bar"); + // We need the 2nd write to trigger delay. This is because delay is + // estimated based on the last write size which is 0 for the first write. + ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2")); + token.reset(); + + for (auto& t : threads) { + t.join(); + } + ASSERT_GE(sleep_count.load(), 1); + + wo.no_slowdown = true; + ASSERT_OK(dbfull()->Put(wo, "foo3", "bar")); +} + +TEST_F(DBTest, MixedSlowdownOptionsInQueue) { + Options options = CurrentOptions(); + options.env = env_; + options.write_buffer_size = 100000; + CreateAndReopenWithCF({"pikachu"}, options); + std::vector threads; + std::atomic thread_num(0); + + std::function write_no_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = true; + ASSERT_NOK(dbfull()->Put(wo, key, "bar")); + }; + // Use a small number to ensure a large delay that is still effective + // when we do Put + // TODO(myabandeh): this is time dependent and could potentially make + // the test flaky + auto token = dbfull()->TEST_write_controler().GetDelayToken(1); + std::atomic sleep_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:Sleep", + [&](void* /*arg*/) { + sleep_count.fetch_add(1); + if (threads.empty()) { + for (int i = 0; i < 2; ++i) { + threads.emplace_back(write_no_slowdown_func); + } + // Sleep for 2s to allow the threads to insert themselves into the + // write queue + env_->SleepForMicroseconds(3000000ULL); + } + }); + std::atomic wait_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:Wait", + [&](void* /*arg*/) { wait_count.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + WriteOptions wo; + wo.sync = false; + wo.disableWAL = false; + wo.no_slowdown = false; + dbfull()->Put(wo, "foo", "bar"); + // We need the 2nd write to trigger delay. This is because delay is + // estimated based on the last write size which is 0 for the first write. + ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2")); + token.reset(); + + for (auto& t : threads) { + t.join(); + } + ASSERT_EQ(sleep_count.load(), 1); + ASSERT_GE(wait_count.load(), 0); +} + +TEST_F(DBTest, MixedSlowdownOptionsStop) { + Options options = CurrentOptions(); + options.env = env_; + options.write_buffer_size = 100000; + CreateAndReopenWithCF({"pikachu"}, options); + std::vector threads; + std::atomic thread_num(0); + + std::function write_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = false; + ASSERT_OK(dbfull()->Put(wo, key, "bar")); + }; + std::function write_no_slowdown_func = [&]() { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + wo.no_slowdown = true; + ASSERT_NOK(dbfull()->Put(wo, key, "bar")); + }; + std::function wakeup_writer = [&]() { + dbfull()->mutex_.Lock(); + dbfull()->bg_cv_.SignalAll(); + dbfull()->mutex_.Unlock(); + }; + // Use a small number to ensure a large delay that is still effective + // when we do Put + // TODO(myabandeh): this is time dependent and could potentially make + // the test flaky + auto token = dbfull()->TEST_write_controler().GetStopToken(); + std::atomic wait_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:Wait", + [&](void* /*arg*/) { + wait_count.fetch_add(1); + if (threads.empty()) { + for (int i = 0; i < 2; ++i) { + threads.emplace_back(write_slowdown_func); + } + for (int i = 0; i < 2; ++i) { + threads.emplace_back(write_no_slowdown_func); + } + // Sleep for 2s to allow the threads to insert themselves into the + // write queue + env_->SleepForMicroseconds(3000000ULL); + } + token.reset(); + threads.emplace_back(wakeup_writer); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + WriteOptions wo; + wo.sync = false; + wo.disableWAL = false; + wo.no_slowdown = false; + dbfull()->Put(wo, "foo", "bar"); + // We need the 2nd write to trigger delay. This is because delay is + // estimated based on the last write size which is 0 for the first write. + ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2")); + token.reset(); + + for (auto& t : threads) { + t.join(); + } + ASSERT_GE(wait_count.load(), 1); + + wo.no_slowdown = true; + ASSERT_OK(dbfull()->Put(wo, "foo3", "bar")); +} #ifndef ROCKSDB_LITE TEST_F(DBTest, LevelLimitReopen) { diff --git a/db/write_thread.cc b/db/write_thread.cc index 6eb140b6b..5ea7715c6 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -24,7 +24,10 @@ WriteThread::WriteThread(const ImmutableDBOptions& db_options) enable_pipelined_write_(db_options.enable_pipelined_write), newest_writer_(nullptr), newest_memtable_writer_(nullptr), - last_sequence_(0) {} + last_sequence_(0), + write_stall_dummy_(), + stall_mu_(), + stall_cv_(&stall_mu_) {} uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) { // We're going to block. Lazily create the mutex. We guarantee @@ -219,6 +222,28 @@ bool WriteThread::LinkOne(Writer* w, std::atomic* newest_writer) { assert(w->state == STATE_INIT); Writer* writers = newest_writer->load(std::memory_order_relaxed); while (true) { + // If write stall in effect, and w->no_slowdown is not true, + // block here until stall is cleared. If its true, then return + // immediately + if (writers == &write_stall_dummy_) { + if (w->no_slowdown) { + w->status = Status::Incomplete("Write stall"); + SetState(w, STATE_COMPLETED); + return false; + } + // Since no_slowdown is false, wait here to be notified of the write + // stall clearing + { + MutexLock lock(&stall_mu_); + writers = newest_writer->load(std::memory_order_relaxed); + if (writers == &write_stall_dummy_) { + stall_cv_.Wait(); + // Load newest_writers_ again since it may have changed + writers = newest_writer->load(std::memory_order_relaxed); + continue; + } + } + } w->link_older = writers; if (newest_writer->compare_exchange_weak(writers, w)) { return (writers == nullptr); @@ -303,12 +328,44 @@ void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) { SetState(w, STATE_COMPLETED); } +void WriteThread::BeginWriteStall() { + LinkOne(&write_stall_dummy_, &newest_writer_); + + // Walk writer list until w->write_group != nullptr. The current write group + // will not have a mix of slowdown/no_slowdown, so its ok to stop at that + // point + Writer* w = write_stall_dummy_.link_older; + Writer* prev = &write_stall_dummy_; + while (w != nullptr && w->write_group == nullptr) { + if (w->no_slowdown) { + prev->link_older = w->link_older; + w->status = Status::Incomplete("Write stall"); + SetState(w, STATE_COMPLETED); + w = prev->link_older; + } else { + prev = w; + w = w->link_older; + } + } +} + +void WriteThread::EndWriteStall() { + MutexLock lock(&stall_mu_); + + assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_); + newest_writer_.exchange(write_stall_dummy_.link_older); + + // Wake up writers + stall_cv_.SignalAll(); +} + static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup"); void WriteThread::JoinBatchGroup(Writer* w) { TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w); assert(w->batch != nullptr); bool linked_as_leader = LinkOne(w, &newest_writer_); + if (linked_as_leader) { SetState(w, STATE_GROUP_LEADER); } diff --git a/db/write_thread.h b/db/write_thread.h index 31190199b..a3802c996 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -342,6 +342,13 @@ class WriteThread { return last_sequence_; } + // Insert a dummy writer at the tail of the write queue to indicate a write + // stall, and fail any writers in the queue with no_slowdown set to true + void BeginWriteStall(); + + // Remove the dummy writer and wake up waiting writers + void EndWriteStall(); + private: // See AwaitState. const uint64_t max_yield_usec_; @@ -365,6 +372,17 @@ class WriteThread { // is not necessary visible to reads because the writer can be ongoing. SequenceNumber last_sequence_; + // A dummy writer to indicate a write stall condition. This will be inserted + // at the tail of the writer queue by the leader, so newer writers can just + // check for this and bail + Writer write_stall_dummy_; + + // Mutex and condvar for writers to block on a write stall. During a write + // stall, writers with no_slowdown set to false will wait on this rather + // on the writer queue + port::Mutex stall_mu_; + port::CondVar stall_cv_; + // Waits for w->state & goal_mask using w->StateMutex(). Returns // the state that satisfies goal_mask. uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);