Handle mixed slowdown/no_slowdown writer properly (#4475)
Summary: There is a bug when the write queue leader is blocked on a write delay/stop, and the queue has writers with WriteOptions::no_slowdown set to true. They are not woken up until the write stall is cleared. The fix introduces a dummy writer inserted at the tail to indicate a write stall and prevent further inserts into the queue, and a condition variable that writers who can tolerate slowdown wait on before adding themselves to the queue. The leader calls WriteThread::BeginWriteStall() to add the dummy writer and then walk the queue to fail any writers with no_slowdown set. Once the stall clears, the leader calls WriteThread::EndWriteStall() to remove the dummy writer and signal the condition variable. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4475 Differential Revision: D10285827 Pulled By: anand1976 fbshipit-source-id: 747465e5e7f07a829b1fb0bc1afcd7b93f4ab1a9
This commit is contained in:
parent
141ef7f8d3
commit
854a4be03f
@ -3,6 +3,9 @@
|
||||
### New Features
|
||||
* Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache.
|
||||
|
||||
### Bug Fixes
|
||||
* Fix corner case where a write group leader blocked due to write stall blocks other writers in queue with WriteOptions::no_slowdown set.
|
||||
|
||||
## 5.17.0 (10/05/2018)
|
||||
### Public API Change
|
||||
* `OnTableFileCreated` will now be called for empty files generated during compaction. In that case, `TableFileCreationInfo::file_path` will be "(nil)" and `TableFileCreationInfo::file_size` will be zero.
|
||||
|
@ -816,6 +816,7 @@ class DBImpl : public DB {
|
||||
friend struct SuperVersion;
|
||||
friend class CompactedDBImpl;
|
||||
friend class DBTest_ConcurrentFlushWAL_Test;
|
||||
friend class DBTest_MixedSlowdownOptionsStop_Test;
|
||||
#ifndef NDEBUG
|
||||
friend class DBTest2_ReadCallbackTest_Test;
|
||||
friend class WriteCallbackTest_WriteWithCallbackTest_Test;
|
||||
|
@ -1162,10 +1162,14 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
|
||||
uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
|
||||
if (delay > 0) {
|
||||
if (write_options.no_slowdown) {
|
||||
return Status::Incomplete();
|
||||
return Status::Incomplete("Write stall");
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
|
||||
|
||||
// Notify write_thread_ about the stall so it can setup a barrier and
|
||||
// fail any pending writers with no_slowdown
|
||||
write_thread_.BeginWriteStall();
|
||||
TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
|
||||
mutex_.Unlock();
|
||||
// We will delay the write until we have slept for delay ms or
|
||||
// we don't need a delay anymore
|
||||
@ -1182,6 +1186,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
|
||||
env_->SleepForMicroseconds(kDelayInterval);
|
||||
}
|
||||
mutex_.Lock();
|
||||
write_thread_.EndWriteStall();
|
||||
}
|
||||
|
||||
// Don't wait if there's a background error, even if its a soft error. We
|
||||
@ -1190,11 +1195,16 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
|
||||
// indefinitely
|
||||
while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) {
|
||||
if (write_options.no_slowdown) {
|
||||
return Status::Incomplete();
|
||||
return Status::Incomplete("Write stall");
|
||||
}
|
||||
delayed = true;
|
||||
|
||||
// Notify write_thread_ about the stall so it can setup a barrier and
|
||||
// fail any pending writers with no_slowdown
|
||||
write_thread_.BeginWriteStall();
|
||||
TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
|
||||
bg_cv_.Wait();
|
||||
write_thread_.EndWriteStall();
|
||||
}
|
||||
}
|
||||
assert(!delayed || !write_options.no_slowdown);
|
||||
|
190
db/db_test.cc
190
db/db_test.cc
@ -262,6 +262,196 @@ TEST_F(DBTest, SkipDelay) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest, MixedSlowdownOptions) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.write_buffer_size = 100000;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
std::vector<port::Thread> threads;
|
||||
std::atomic<int> thread_num(0);
|
||||
|
||||
std::function<void()> write_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = false;
|
||||
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
|
||||
};
|
||||
std::function<void()> write_no_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = true;
|
||||
ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
|
||||
};
|
||||
// Use a small number to ensure a large delay that is still effective
|
||||
// when we do Put
|
||||
// TODO(myabandeh): this is time dependent and could potentially make
|
||||
// the test flaky
|
||||
auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
|
||||
std::atomic<int> sleep_count(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::DelayWrite:BeginWriteStallDone",
|
||||
[&](void* /*arg*/) {
|
||||
sleep_count.fetch_add(1);
|
||||
if (threads.empty()) {
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
threads.emplace_back(write_slowdown_func);
|
||||
}
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
threads.emplace_back(write_no_slowdown_func);
|
||||
}
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
WriteOptions wo;
|
||||
wo.sync = false;
|
||||
wo.disableWAL = false;
|
||||
wo.no_slowdown = false;
|
||||
dbfull()->Put(wo, "foo", "bar");
|
||||
// We need the 2nd write to trigger delay. This is because delay is
|
||||
// estimated based on the last write size which is 0 for the first write.
|
||||
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
|
||||
token.reset();
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
ASSERT_GE(sleep_count.load(), 1);
|
||||
|
||||
wo.no_slowdown = true;
|
||||
ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
|
||||
}
|
||||
|
||||
TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.write_buffer_size = 100000;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
std::vector<port::Thread> threads;
|
||||
std::atomic<int> thread_num(0);
|
||||
|
||||
std::function<void()> write_no_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = true;
|
||||
ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
|
||||
};
|
||||
// Use a small number to ensure a large delay that is still effective
|
||||
// when we do Put
|
||||
// TODO(myabandeh): this is time dependent and could potentially make
|
||||
// the test flaky
|
||||
auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
|
||||
std::atomic<int> sleep_count(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::DelayWrite:Sleep",
|
||||
[&](void* /*arg*/) {
|
||||
sleep_count.fetch_add(1);
|
||||
if (threads.empty()) {
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
threads.emplace_back(write_no_slowdown_func);
|
||||
}
|
||||
// Sleep for 2s to allow the threads to insert themselves into the
|
||||
// write queue
|
||||
env_->SleepForMicroseconds(3000000ULL);
|
||||
}
|
||||
});
|
||||
std::atomic<int> wait_count(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::DelayWrite:Wait",
|
||||
[&](void* /*arg*/) { wait_count.fetch_add(1); });
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
WriteOptions wo;
|
||||
wo.sync = false;
|
||||
wo.disableWAL = false;
|
||||
wo.no_slowdown = false;
|
||||
dbfull()->Put(wo, "foo", "bar");
|
||||
// We need the 2nd write to trigger delay. This is because delay is
|
||||
// estimated based on the last write size which is 0 for the first write.
|
||||
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
|
||||
token.reset();
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
ASSERT_EQ(sleep_count.load(), 1);
|
||||
ASSERT_GE(wait_count.load(), 0);
|
||||
}
|
||||
|
||||
TEST_F(DBTest, MixedSlowdownOptionsStop) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.write_buffer_size = 100000;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
std::vector<port::Thread> threads;
|
||||
std::atomic<int> thread_num(0);
|
||||
|
||||
std::function<void()> write_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = false;
|
||||
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
|
||||
};
|
||||
std::function<void()> write_no_slowdown_func = [&]() {
|
||||
int a = thread_num.fetch_add(1);
|
||||
std::string key = "foo" + std::to_string(a);
|
||||
WriteOptions wo;
|
||||
wo.no_slowdown = true;
|
||||
ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
|
||||
};
|
||||
std::function<void()> wakeup_writer = [&]() {
|
||||
dbfull()->mutex_.Lock();
|
||||
dbfull()->bg_cv_.SignalAll();
|
||||
dbfull()->mutex_.Unlock();
|
||||
};
|
||||
// Use a small number to ensure a large delay that is still effective
|
||||
// when we do Put
|
||||
// TODO(myabandeh): this is time dependent and could potentially make
|
||||
// the test flaky
|
||||
auto token = dbfull()->TEST_write_controler().GetStopToken();
|
||||
std::atomic<int> wait_count(0);
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::DelayWrite:Wait",
|
||||
[&](void* /*arg*/) {
|
||||
wait_count.fetch_add(1);
|
||||
if (threads.empty()) {
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
threads.emplace_back(write_slowdown_func);
|
||||
}
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
threads.emplace_back(write_no_slowdown_func);
|
||||
}
|
||||
// Sleep for 2s to allow the threads to insert themselves into the
|
||||
// write queue
|
||||
env_->SleepForMicroseconds(3000000ULL);
|
||||
}
|
||||
token.reset();
|
||||
threads.emplace_back(wakeup_writer);
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
WriteOptions wo;
|
||||
wo.sync = false;
|
||||
wo.disableWAL = false;
|
||||
wo.no_slowdown = false;
|
||||
dbfull()->Put(wo, "foo", "bar");
|
||||
// We need the 2nd write to trigger delay. This is because delay is
|
||||
// estimated based on the last write size which is 0 for the first write.
|
||||
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
|
||||
token.reset();
|
||||
|
||||
for (auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
ASSERT_GE(wait_count.load(), 1);
|
||||
|
||||
wo.no_slowdown = true;
|
||||
ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
|
||||
}
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBTest, LevelLimitReopen) {
|
||||
|
@ -24,7 +24,10 @@ WriteThread::WriteThread(const ImmutableDBOptions& db_options)
|
||||
enable_pipelined_write_(db_options.enable_pipelined_write),
|
||||
newest_writer_(nullptr),
|
||||
newest_memtable_writer_(nullptr),
|
||||
last_sequence_(0) {}
|
||||
last_sequence_(0),
|
||||
write_stall_dummy_(),
|
||||
stall_mu_(),
|
||||
stall_cv_(&stall_mu_) {}
|
||||
|
||||
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
|
||||
// We're going to block. Lazily create the mutex. We guarantee
|
||||
@ -219,6 +222,28 @@ bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
|
||||
assert(w->state == STATE_INIT);
|
||||
Writer* writers = newest_writer->load(std::memory_order_relaxed);
|
||||
while (true) {
|
||||
// If write stall in effect, and w->no_slowdown is not true,
|
||||
// block here until stall is cleared. If its true, then return
|
||||
// immediately
|
||||
if (writers == &write_stall_dummy_) {
|
||||
if (w->no_slowdown) {
|
||||
w->status = Status::Incomplete("Write stall");
|
||||
SetState(w, STATE_COMPLETED);
|
||||
return false;
|
||||
}
|
||||
// Since no_slowdown is false, wait here to be notified of the write
|
||||
// stall clearing
|
||||
{
|
||||
MutexLock lock(&stall_mu_);
|
||||
writers = newest_writer->load(std::memory_order_relaxed);
|
||||
if (writers == &write_stall_dummy_) {
|
||||
stall_cv_.Wait();
|
||||
// Load newest_writers_ again since it may have changed
|
||||
writers = newest_writer->load(std::memory_order_relaxed);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
w->link_older = writers;
|
||||
if (newest_writer->compare_exchange_weak(writers, w)) {
|
||||
return (writers == nullptr);
|
||||
@ -303,12 +328,44 @@ void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
|
||||
SetState(w, STATE_COMPLETED);
|
||||
}
|
||||
|
||||
void WriteThread::BeginWriteStall() {
|
||||
LinkOne(&write_stall_dummy_, &newest_writer_);
|
||||
|
||||
// Walk writer list until w->write_group != nullptr. The current write group
|
||||
// will not have a mix of slowdown/no_slowdown, so its ok to stop at that
|
||||
// point
|
||||
Writer* w = write_stall_dummy_.link_older;
|
||||
Writer* prev = &write_stall_dummy_;
|
||||
while (w != nullptr && w->write_group == nullptr) {
|
||||
if (w->no_slowdown) {
|
||||
prev->link_older = w->link_older;
|
||||
w->status = Status::Incomplete("Write stall");
|
||||
SetState(w, STATE_COMPLETED);
|
||||
w = prev->link_older;
|
||||
} else {
|
||||
prev = w;
|
||||
w = w->link_older;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WriteThread::EndWriteStall() {
|
||||
MutexLock lock(&stall_mu_);
|
||||
|
||||
assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
|
||||
newest_writer_.exchange(write_stall_dummy_.link_older);
|
||||
|
||||
// Wake up writers
|
||||
stall_cv_.SignalAll();
|
||||
}
|
||||
|
||||
static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
|
||||
void WriteThread::JoinBatchGroup(Writer* w) {
|
||||
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
|
||||
assert(w->batch != nullptr);
|
||||
|
||||
bool linked_as_leader = LinkOne(w, &newest_writer_);
|
||||
|
||||
if (linked_as_leader) {
|
||||
SetState(w, STATE_GROUP_LEADER);
|
||||
}
|
||||
|
@ -342,6 +342,13 @@ class WriteThread {
|
||||
return last_sequence_;
|
||||
}
|
||||
|
||||
// Insert a dummy writer at the tail of the write queue to indicate a write
|
||||
// stall, and fail any writers in the queue with no_slowdown set to true
|
||||
void BeginWriteStall();
|
||||
|
||||
// Remove the dummy writer and wake up waiting writers
|
||||
void EndWriteStall();
|
||||
|
||||
private:
|
||||
// See AwaitState.
|
||||
const uint64_t max_yield_usec_;
|
||||
@ -365,6 +372,17 @@ class WriteThread {
|
||||
// is not necessary visible to reads because the writer can be ongoing.
|
||||
SequenceNumber last_sequence_;
|
||||
|
||||
// A dummy writer to indicate a write stall condition. This will be inserted
|
||||
// at the tail of the writer queue by the leader, so newer writers can just
|
||||
// check for this and bail
|
||||
Writer write_stall_dummy_;
|
||||
|
||||
// Mutex and condvar for writers to block on a write stall. During a write
|
||||
// stall, writers with no_slowdown set to false will wait on this rather
|
||||
// on the writer queue
|
||||
port::Mutex stall_mu_;
|
||||
port::CondVar stall_cv_;
|
||||
|
||||
// Waits for w->state & goal_mask using w->StateMutex(). Returns
|
||||
// the state that satisfies goal_mask.
|
||||
uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
|
||||
|
Loading…
Reference in New Issue
Block a user