// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_test_util.h" #include "port/stack_trace.h" #include "util/fault_injection_test_env.h" #include "util/sync_point.h" namespace rocksdb { class DBFlushTest : public DBTestBase { public: DBFlushTest() : DBTestBase("/db_flush_test") {} }; class DBFlushDirectIOTest : public DBFlushTest, public ::testing::WithParamInterface { public: DBFlushDirectIOTest() : DBFlushTest() {} }; // We had issue when two background threads trying to flush at the same time, // only one of them get committed. The test verifies the issue is fixed. TEST_F(DBFlushTest, FlushWhileWritingManifest) { Options options; options.disable_auto_compactions = true; options.max_background_flushes = 2; options.env = env_; Reopen(options); FlushOptions no_wait; no_wait.wait = false; SyncPoint::GetInstance()->LoadDependency( {{"VersionSet::LogAndApply:WriteManifest", "DBFlushTest::FlushWhileWritingManifest:1"}, {"MemTableList::InstallMemtableFlushResults:InProgress", "VersionSet::LogAndApply:WriteManifestDone"}}); SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(Put("foo", "v")); ASSERT_OK(dbfull()->Flush(no_wait)); TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1"); ASSERT_OK(Put("bar", "v")); ASSERT_OK(dbfull()->Flush(no_wait)); // If the issue is hit we will wait here forever. dbfull()->TEST_WaitForFlushMemTable(); #ifndef ROCKSDB_LITE ASSERT_EQ(2, TotalTableFiles()); #endif // ROCKSDB_LITE } TEST_F(DBFlushTest, SyncFail) { std::unique_ptr fault_injection_env( new FaultInjectionTestEnv(env_)); Options options; options.disable_auto_compactions = true; options.env = fault_injection_env.get(); SyncPoint::GetInstance()->LoadDependency( {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"}, {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}}); SyncPoint::GetInstance()->EnableProcessing(); Reopen(options); Put("key", "value"); auto* cfd = reinterpret_cast(db_->DefaultColumnFamily()) ->cfd(); int refs_before = cfd->current()->TEST_refs(); FlushOptions flush_options; flush_options.wait = false; ASSERT_OK(dbfull()->Flush(flush_options)); fault_injection_env->SetFilesystemActive(false); TEST_SYNC_POINT("DBFlushTest::SyncFail:1"); TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); fault_injection_env->SetFilesystemActive(true); dbfull()->TEST_WaitForFlushMemTable(); #ifndef ROCKSDB_LITE ASSERT_EQ("", FilesPerLevel()); // flush failed. #endif // ROCKSDB_LITE // Flush job should release ref count to current version. ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); Destroy(options); } TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; options.disable_auto_compactions = true; options.max_background_flushes = 2; options.use_direct_io_for_flush_and_compaction = GetParam(); options.env = new MockEnv(Env::Default()); SyncPoint::GetInstance()->SetCallBack( "BuildTable:create_file", [&](void* arg) { bool* use_direct_writes = static_cast(arg); ASSERT_EQ(*use_direct_writes, options.use_direct_io_for_flush_and_compaction); }); SyncPoint::GetInstance()->EnableProcessing(); Reopen(options); ASSERT_OK(Put("foo", "v")); FlushOptions flush_options; flush_options.wait = true; ASSERT_OK(dbfull()->Flush(flush_options)); Destroy(options); delete options.env; } INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); } // namespace rocksdb int main(int argc, char** argv) { rocksdb::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }