// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include #include #include #include #include #include "db/db_test_util.h" #include "db/write_batch_internal.h" #include "db/write_thread.h" #include "port/port.h" #include "port/stack_trace.h" #include "test_util/sync_point.h" #include "util/random.h" #include "util/string_util.h" #include "utilities/fault_injection_env.h" namespace ROCKSDB_NAMESPACE { // Test variations of WriteImpl. class DBWriteTest : public DBTestBase, public testing::WithParamInterface { public: DBWriteTest() : DBTestBase("/db_write_test", /*env_do_fsync=*/true) {} Options GetOptions() { return DBTestBase::GetOptions(GetParam()); } void Open() { DBTestBase::Reopen(GetOptions()); } }; // It is invalid to do sync write while disabling WAL. TEST_P(DBWriteTest, SyncAndDisableWAL) { WriteOptions write_options; write_options.sync = true; write_options.disableWAL = true; ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument()); WriteBatch batch; ASSERT_OK(batch.Put("foo", "bar")); ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument()); } TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) { Options options = GetOptions(); options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4; std::vector threads; std::atomic thread_num(0); port::Mutex mutex; port::CondVar cv(&mutex); // Guarded by mutex int writers = 0; Reopen(options); std::function write_slowdown_func = [&]() { int a = thread_num.fetch_add(1); std::string key = "foo" + std::to_string(a); WriteOptions wo; wo.no_slowdown = false; dbfull()->Put(wo, key, "bar"); }; std::function write_no_slowdown_func = [&]() { int a = thread_num.fetch_add(1); std::string key = "foo" + std::to_string(a); WriteOptions wo; wo.no_slowdown = true; dbfull()->Put(wo, key, "bar"); }; std::function unblock_main_thread_func = [&](void*) { mutex.Lock(); ++writers; cv.SignalAll(); mutex.Unlock(); }; // Create 3 L0 files and schedule 4th without waiting Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); Flush(); Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); Flush(); Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); Flush(); Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1", "DBImpl::BackgroundCallFlush:start"}, {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2", "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"}, // Make compaction start wait for the write stall to be detected and // implemented by a write group leader {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3", "BackgroundCallCompaction:0"}}); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); // Schedule creation of 4th L0 file without waiting. This will seal the // memtable and then wait for a sync point before writing the file. We need // to do it this way because SwitchMemtable() needs to enter the // write_thread FlushOptions fopt; fopt.wait = false; dbfull()->Flush(fopt); // Create a mix of slowdown/no_slowdown write threads mutex.Lock(); // First leader threads.emplace_back(write_slowdown_func); while (writers != 1) { cv.Wait(); } // Second leader. Will stall writes // Build a writers list with no slowdown in the middle: // +-------------+ // | slowdown +<----+ newest // +--+----------+ // | // v // +--+----------+ // | no slowdown | // +--+----------+ // | // v // +--+----------+ // | slowdown + // +-------------+ threads.emplace_back(write_slowdown_func); while (writers != 2) { cv.Wait(); } threads.emplace_back(write_no_slowdown_func); while (writers != 3) { cv.Wait(); } threads.emplace_back(write_slowdown_func); while (writers != 4) { cv.Wait(); } mutex.Unlock(); TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1"); dbfull()->TEST_WaitForFlushMemTable(nullptr); // This would have triggered a write stall. Unblock the write group leader TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2"); // The leader is going to create missing newer links. When the leader // finishes, the next leader is going to delay writes and fail writers with // no_slowdown TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3"); for (auto& t : threads) { t.join(); } ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); } TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) { Options options = GetOptions(); options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4; std::vector threads; std::atomic thread_num(0); port::Mutex mutex; port::CondVar cv(&mutex); Reopen(options); std::function write_slowdown_func = [&]() { int a = thread_num.fetch_add(1); std::string key = "foo" + std::to_string(a); WriteOptions wo; wo.no_slowdown = false; dbfull()->Put(wo, key, "bar"); }; std::function write_no_slowdown_func = [&]() { int a = thread_num.fetch_add(1); std::string key = "foo" + std::to_string(a); WriteOptions wo; wo.no_slowdown = true; dbfull()->Put(wo, key, "bar"); }; std::function unblock_main_thread_func = [&](void *) { mutex.Lock(); cv.SignalAll(); mutex.Unlock(); }; // Create 3 L0 files and schedule 4th without waiting Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); Flush(); Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); Flush(); Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); Flush(); Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {{"DBWriteTest::WriteThreadHangOnWriteStall:1", "DBImpl::BackgroundCallFlush:start"}, {"DBWriteTest::WriteThreadHangOnWriteStall:2", "DBImpl::WriteImpl:BeforeLeaderEnters"}, // Make compaction start wait for the write stall to be detected and // implemented by a write group leader {"DBWriteTest::WriteThreadHangOnWriteStall:3", "BackgroundCallCompaction:0"}}); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); // Schedule creation of 4th L0 file without waiting. This will seal the // memtable and then wait for a sync point before writing the file. We need // to do it this way because SwitchMemtable() needs to enter the // write_thread FlushOptions fopt; fopt.wait = false; dbfull()->Flush(fopt); // Create a mix of slowdown/no_slowdown write threads mutex.Lock(); // First leader threads.emplace_back(write_slowdown_func); cv.Wait(); // Second leader. Will stall writes threads.emplace_back(write_slowdown_func); cv.Wait(); threads.emplace_back(write_no_slowdown_func); cv.Wait(); threads.emplace_back(write_slowdown_func); cv.Wait(); threads.emplace_back(write_no_slowdown_func); cv.Wait(); threads.emplace_back(write_slowdown_func); cv.Wait(); mutex.Unlock(); TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1"); dbfull()->TEST_WaitForFlushMemTable(nullptr); // This would have triggered a write stall. Unblock the write group leader TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2"); // The leader is going to create missing newer links. When the leader finishes, // the next leader is going to delay writes and fail writers with no_slowdown TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3"); for (auto& t : threads) { t.join(); } } TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { constexpr int kNumThreads = 5; std::unique_ptr mock_env( new FaultInjectionTestEnv(Env::Default())); Options options = GetOptions(); options.env = mock_env.get(); Reopen(options); std::atomic ready_count{0}; std::atomic leader_count{0}; std::vector threads; mock_env->SetFilesystemActive(false); // Wait until all threads linked to write threads, to make sure // all threads join the same batch group. SyncPoint::GetInstance()->SetCallBack( "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { ready_count++; auto* w = reinterpret_cast(arg); if (w->state == WriteThread::STATE_GROUP_LEADER) { leader_count++; while (ready_count < kNumThreads) { // busy waiting } } }); SyncPoint::GetInstance()->EnableProcessing(); for (int i = 0; i < kNumThreads; i++) { threads.push_back(port::Thread( [&](int index) { // All threads should fail. auto res = Put("key" + ToString(index), "value"); if (options.manual_wal_flush) { ASSERT_TRUE(res.ok()); // we should see fs error when we do the flush // TSAN reports a false alarm for lock-order-inversion but Open and // FlushWAL are not run concurrently. Disabling this until TSAN is // fixed. // res = dbfull()->FlushWAL(false); // ASSERT_FALSE(res.ok()); } else { ASSERT_FALSE(res.ok()); } }, i)); } for (int i = 0; i < kNumThreads; i++) { threads[i].join(); } ASSERT_EQ(1, leader_count); // Close before mock_env destruct. Close(); } TEST_P(DBWriteTest, ManualWalFlushInEffect) { Options options = GetOptions(); Reopen(options); // try the 1st WAL created during open ASSERT_TRUE(Put("key" + ToString(0), "value").ok()); ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); // try the 2nd wal created during SwitchWAL dbfull()->TEST_SwitchWAL(); ASSERT_TRUE(Put("key" + ToString(0), "value").ok()); ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); } TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { std::unique_ptr mock_env( new FaultInjectionTestEnv(Env::Default())); Options options = GetOptions(); options.env = mock_env.get(); Reopen(options); for (int i = 0; i < 2; i++) { // Forcibly fail WAL write for the first Put only. Subsequent Puts should // fail due to read-only mode mock_env->SetFilesystemActive(i != 0); auto res = Put("key" + ToString(i), "value"); // TSAN reports a false alarm for lock-order-inversion but Open and // FlushWAL are not run concurrently. Disabling this until TSAN is // fixed. /* if (options.manual_wal_flush && i == 0) { // even with manual_wal_flush the 2nd Put should return error because of // the read-only mode ASSERT_TRUE(res.ok()); // we should see fs error when we do the flush res = dbfull()->FlushWAL(false); } */ if (!options.manual_wal_flush) { ASSERT_FALSE(res.ok()); } } // Close before mock_env destruct. Close(); } TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) { Random rnd(301); std::unique_ptr mock_env( new FaultInjectionTestEnv(Env::Default())); Options options = GetOptions(); options.env = mock_env.get(); options.writable_file_max_buffer_size = 4 * 1024 * 1024; options.write_buffer_size = 3 * 512 * 1024; options.wal_bytes_per_sync = 256 * 1024; options.manual_wal_flush = true; Reopen(options); mock_env->SetFilesystemActive(false, Status::IOError("Not active")); Status s; for (int i = 0; i < 4 * 512; ++i) { s = Put(Key(i), rnd.RandomString(1024)); if (!s.ok()) { break; } } ASSERT_EQ(s.severity(), Status::Severity::kFatalError); mock_env->SetFilesystemActive(true); // Close before mock_env destruct. Close(); } // Test that db->LockWAL() flushes the WAL after locking. TEST_P(DBWriteTest, LockWalInEffect) { Options options = GetOptions(); Reopen(options); // try the 1st WAL created during open ASSERT_OK(Put("key" + ToString(0), "value")); ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); ASSERT_OK(dbfull()->LockWAL()); ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false)); ASSERT_OK(dbfull()->UnlockWAL()); // try the 2nd wal created during SwitchWAL dbfull()->TEST_SwitchWAL(); ASSERT_OK(Put("key" + ToString(0), "value")); ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); ASSERT_OK(dbfull()->LockWAL()); ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false)); ASSERT_OK(dbfull()->UnlockWAL()); } TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) { Options options = GetOptions(); options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics->set_stats_level(StatsLevel::kAll); Reopen(options); std::string wal_key_prefix = "WAL_KEY_"; std::string no_wal_key_prefix = "K_"; // 100 KB value each for NO-WAL operation std::string no_wal_value(1024 * 100, 'X'); // 1B value each for WAL operation std::string wal_value = "0"; std::thread threads[10]; for (int t = 0; t < 10; t++) { threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, no_wal_value, this] { for(int i = 0; i < 10; i++) { ROCKSDB_NAMESPACE::WriteOptions write_option_disable; write_option_disable.disableWAL = true; ROCKSDB_NAMESPACE::WriteOptions write_option_default; std::string no_wal_key = no_wal_key_prefix + std::to_string(t) + "_" + std::to_string(i); this->Put(no_wal_key, no_wal_value, write_option_disable); std::string wal_key = wal_key_prefix + std::to_string(i) + "_" + std::to_string(i); this->Put(wal_key, wal_value, write_option_default); dbfull()->SyncWAL(); } return 0; }); } for (auto& t: threads) { t.join(); } uint64_t bytes_num = options.statistics->getTickerCount( ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES); // written WAL size should less than 100KB (even included HEADER & FOOTER overhead) ASSERT_LE(bytes_num, 1024 * 100); } INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, DBTestBase::kPipelinedWrite)); } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }