0672a6db64
Summary: When there is a write stall, the active write group leader calls ```BeginWriteStall()``` to walk the queue of writers and remove any with the ```no_slowdown``` option set. There was a bug in the code which updated the back pointer but not the forward pointer (```link_newer```), corrupting the list and causing some threads to wait forever. This PR fixes it. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6322 Test Plan: Add a unit test in db_write_test Differential Revision: D19538313 Pulled By: anand1976 fbshipit-source-id: 6fbed819e594913f435886606f5d36f74f235c3a
327 lines
11 KiB
C++
327 lines
11 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include <atomic>
|
|
#include <memory>
|
|
#include <thread>
|
|
#include <vector>
|
|
#include <fstream>
|
|
#include "db/db_test_util.h"
|
|
#include "db/write_batch_internal.h"
|
|
#include "db/write_thread.h"
|
|
#include "port/port.h"
|
|
#include "port/stack_trace.h"
|
|
#include "test_util/fault_injection_test_env.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/string_util.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
// Test variations of WriteImpl.
|
|
class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
|
|
public:
|
|
DBWriteTest() : DBTestBase("/db_write_test") {}
|
|
|
|
Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
|
|
|
|
void Open() { DBTestBase::Reopen(GetOptions()); }
|
|
};
|
|
|
|
// It is invalid to do sync write while disabling WAL.
|
|
TEST_P(DBWriteTest, SyncAndDisableWAL) {
|
|
WriteOptions write_options;
|
|
write_options.sync = true;
|
|
write_options.disableWAL = true;
|
|
ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());
|
|
WriteBatch batch;
|
|
ASSERT_OK(batch.Put("foo", "bar"));
|
|
ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
|
|
}
|
|
|
|
TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
|
|
Options options = GetOptions();
|
|
options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
|
|
std::vector<port::Thread> threads;
|
|
std::atomic<int> thread_num(0);
|
|
port::Mutex mutex;
|
|
port::CondVar cv(&mutex);
|
|
|
|
Reopen(options);
|
|
|
|
std::function<void()> write_slowdown_func = [&]() {
|
|
int a = thread_num.fetch_add(1);
|
|
std::string key = "foo" + std::to_string(a);
|
|
WriteOptions wo;
|
|
wo.no_slowdown = false;
|
|
dbfull()->Put(wo, key, "bar");
|
|
};
|
|
std::function<void()> write_no_slowdown_func = [&]() {
|
|
int a = thread_num.fetch_add(1);
|
|
std::string key = "foo" + std::to_string(a);
|
|
WriteOptions wo;
|
|
wo.no_slowdown = true;
|
|
dbfull()->Put(wo, key, "bar");
|
|
};
|
|
std::function<void(void *)> unblock_main_thread_func = [&](void *) {
|
|
mutex.Lock();
|
|
cv.SignalAll();
|
|
mutex.Unlock();
|
|
};
|
|
|
|
// Create 3 L0 files and schedule 4th without waiting
|
|
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
|
Flush();
|
|
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
|
Flush();
|
|
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
|
Flush();
|
|
Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
|
|
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
|
{{"DBWriteTest::WriteThreadHangOnWriteStall:1",
|
|
"DBImpl::BackgroundCallFlush:start"},
|
|
{"DBWriteTest::WriteThreadHangOnWriteStall:2",
|
|
"DBImpl::WriteImpl:BeforeLeaderEnters"},
|
|
// Make compaction start wait for the write stall to be detected and
|
|
// implemented by a write group leader
|
|
{"DBWriteTest::WriteThreadHangOnWriteStall:3",
|
|
"BackgroundCallCompaction:0"}});
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
// Schedule creation of 4th L0 file without waiting. This will seal the
|
|
// memtable and then wait for a sync point before writing the file. We need
|
|
// to do it this way because SwitchMemtable() needs to enter the
|
|
// write_thread
|
|
FlushOptions fopt;
|
|
fopt.wait = false;
|
|
dbfull()->Flush(fopt);
|
|
|
|
// Create a mix of slowdown/no_slowdown write threads
|
|
mutex.Lock();
|
|
// First leader
|
|
threads.emplace_back(write_slowdown_func);
|
|
cv.Wait();
|
|
// Second leader. Will stall writes
|
|
threads.emplace_back(write_slowdown_func);
|
|
cv.Wait();
|
|
threads.emplace_back(write_no_slowdown_func);
|
|
cv.Wait();
|
|
threads.emplace_back(write_slowdown_func);
|
|
cv.Wait();
|
|
threads.emplace_back(write_no_slowdown_func);
|
|
cv.Wait();
|
|
threads.emplace_back(write_slowdown_func);
|
|
cv.Wait();
|
|
mutex.Unlock();
|
|
|
|
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
|
|
dbfull()->TEST_WaitForFlushMemTable(nullptr);
|
|
// This would have triggered a write stall. Unblock the write group leader
|
|
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
|
|
// The leader is going to create missing newer links. When the leader finishes,
|
|
// the next leader is going to delay writes and fail writers with no_slowdown
|
|
|
|
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
|
|
for (auto& t : threads) {
|
|
t.join();
|
|
}
|
|
}
|
|
|
|
TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
|
|
constexpr int kNumThreads = 5;
|
|
std::unique_ptr<FaultInjectionTestEnv> mock_env(
|
|
new FaultInjectionTestEnv(Env::Default()));
|
|
Options options = GetOptions();
|
|
options.env = mock_env.get();
|
|
Reopen(options);
|
|
std::atomic<int> ready_count{0};
|
|
std::atomic<int> leader_count{0};
|
|
std::vector<port::Thread> threads;
|
|
mock_env->SetFilesystemActive(false);
|
|
|
|
// Wait until all threads linked to write threads, to make sure
|
|
// all threads join the same batch group.
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
|
|
ready_count++;
|
|
auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
|
|
if (w->state == WriteThread::STATE_GROUP_LEADER) {
|
|
leader_count++;
|
|
while (ready_count < kNumThreads) {
|
|
// busy waiting
|
|
}
|
|
}
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
for (int i = 0; i < kNumThreads; i++) {
|
|
threads.push_back(port::Thread(
|
|
[&](int index) {
|
|
// All threads should fail.
|
|
auto res = Put("key" + ToString(index), "value");
|
|
if (options.manual_wal_flush) {
|
|
ASSERT_TRUE(res.ok());
|
|
// we should see fs error when we do the flush
|
|
|
|
// TSAN reports a false alarm for lock-order-inversion but Open and
|
|
// FlushWAL are not run concurrently. Disabling this until TSAN is
|
|
// fixed.
|
|
// res = dbfull()->FlushWAL(false);
|
|
// ASSERT_FALSE(res.ok());
|
|
} else {
|
|
ASSERT_FALSE(res.ok());
|
|
}
|
|
},
|
|
i));
|
|
}
|
|
for (int i = 0; i < kNumThreads; i++) {
|
|
threads[i].join();
|
|
}
|
|
ASSERT_EQ(1, leader_count);
|
|
// Close before mock_env destruct.
|
|
Close();
|
|
}
|
|
|
|
TEST_P(DBWriteTest, ManualWalFlushInEffect) {
|
|
Options options = GetOptions();
|
|
Reopen(options);
|
|
// try the 1st WAL created during open
|
|
ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
|
|
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
|
|
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
|
|
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
|
|
// try the 2nd wal created during SwitchWAL
|
|
dbfull()->TEST_SwitchWAL();
|
|
ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
|
|
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
|
|
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
|
|
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
|
|
}
|
|
|
|
TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
|
|
std::unique_ptr<FaultInjectionTestEnv> mock_env(
|
|
new FaultInjectionTestEnv(Env::Default()));
|
|
Options options = GetOptions();
|
|
options.env = mock_env.get();
|
|
Reopen(options);
|
|
for (int i = 0; i < 2; i++) {
|
|
// Forcibly fail WAL write for the first Put only. Subsequent Puts should
|
|
// fail due to read-only mode
|
|
mock_env->SetFilesystemActive(i != 0);
|
|
auto res = Put("key" + ToString(i), "value");
|
|
// TSAN reports a false alarm for lock-order-inversion but Open and
|
|
// FlushWAL are not run concurrently. Disabling this until TSAN is
|
|
// fixed.
|
|
/*
|
|
if (options.manual_wal_flush && i == 0) {
|
|
// even with manual_wal_flush the 2nd Put should return error because of
|
|
// the read-only mode
|
|
ASSERT_TRUE(res.ok());
|
|
// we should see fs error when we do the flush
|
|
res = dbfull()->FlushWAL(false);
|
|
}
|
|
*/
|
|
if (!options.manual_wal_flush) {
|
|
ASSERT_FALSE(res.ok());
|
|
}
|
|
}
|
|
// Close before mock_env destruct.
|
|
Close();
|
|
}
|
|
|
|
TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
|
|
Random rnd(301);
|
|
std::unique_ptr<FaultInjectionTestEnv> mock_env(
|
|
new FaultInjectionTestEnv(Env::Default()));
|
|
Options options = GetOptions();
|
|
options.env = mock_env.get();
|
|
options.writable_file_max_buffer_size = 4 * 1024 * 1024;
|
|
options.write_buffer_size = 3 * 512 * 1024;
|
|
options.wal_bytes_per_sync = 256 * 1024;
|
|
options.manual_wal_flush = true;
|
|
Reopen(options);
|
|
mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
|
|
Status s;
|
|
for (int i = 0; i < 4 * 512; ++i) {
|
|
s = Put(Key(i), RandomString(&rnd, 1024));
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
}
|
|
ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
|
|
|
|
mock_env->SetFilesystemActive(true);
|
|
// Close before mock_env destruct.
|
|
Close();
|
|
}
|
|
|
|
// Test that db->LockWAL() flushes the WAL after locking.
|
|
TEST_P(DBWriteTest, LockWalInEffect) {
|
|
Options options = GetOptions();
|
|
Reopen(options);
|
|
// try the 1st WAL created during open
|
|
ASSERT_OK(Put("key" + ToString(0), "value"));
|
|
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
|
|
ASSERT_OK(dbfull()->LockWAL());
|
|
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
|
|
ASSERT_OK(dbfull()->UnlockWAL());
|
|
// try the 2nd wal created during SwitchWAL
|
|
dbfull()->TEST_SwitchWAL();
|
|
ASSERT_OK(Put("key" + ToString(0), "value"));
|
|
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
|
|
ASSERT_OK(dbfull()->LockWAL());
|
|
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
|
|
ASSERT_OK(dbfull()->UnlockWAL());
|
|
}
|
|
|
|
TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
|
|
Options options = GetOptions();
|
|
options.statistics = rocksdb::CreateDBStatistics();
|
|
options.statistics->set_stats_level(StatsLevel::kAll);
|
|
Reopen(options);
|
|
std::string wal_key_prefix = "WAL_KEY_";
|
|
std::string no_wal_key_prefix = "K_";
|
|
// 100 KB value each for NO-WAL operation
|
|
std::string no_wal_value(1024 * 100, 'X');
|
|
// 1B value each for WAL operation
|
|
std::string wal_value = "0";
|
|
std::thread threads[10];
|
|
for (int t = 0; t < 10; t++) {
|
|
threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, no_wal_value, this] {
|
|
for(int i = 0; i < 10; i++) {
|
|
rocksdb::WriteOptions write_option_disable;
|
|
write_option_disable.disableWAL = true;
|
|
rocksdb::WriteOptions write_option_default;
|
|
std::string no_wal_key = no_wal_key_prefix + std::to_string(t) + "_" + std::to_string(i);
|
|
this->Put(no_wal_key, no_wal_value, write_option_disable);
|
|
std::string wal_key = wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
|
|
this->Put(wal_key, wal_value, write_option_default);
|
|
dbfull()->SyncWAL();
|
|
}
|
|
return 0;
|
|
});
|
|
}
|
|
for (auto& t: threads) {
|
|
t.join();
|
|
}
|
|
uint64_t bytes_num = options.statistics->getTickerCount(rocksdb::Tickers::WAL_FILE_BYTES);
|
|
// written WAL size should less than 100KB (even included HEADER & FOOTER overhead)
|
|
ASSERT_LE(bytes_num, 1024 * 100);
|
|
}
|
|
|
|
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
|
|
testing::Values(DBTestBase::kDefault,
|
|
DBTestBase::kConcurrentWALWrites,
|
|
DBTestBase::kPipelinedWrite));
|
|
|
|
} // namespace rocksdb
|
|
|
|
int main(int argc, char** argv) {
|
|
rocksdb::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|