diff --git a/db/db_impl.cc b/db/db_impl.cc index 66eac6ad0..857024c38 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2056,6 +2056,10 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); + if (!opened_successfully_) { + // Compaction may introduce data race to DB open + return; + } if (bg_work_gate_closed_) { // gate closed for background work return; @@ -2599,6 +2603,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, mutex_.Unlock(); status = compaction_job.Run(); + TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun"); mutex_.Lock(); compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); @@ -4355,11 +4360,14 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } } } - + TEST_SYNC_POINT("DBImpl::Open:Opened"); + if (s.ok()) { + impl->opened_successfully_ = true; + impl->MaybeScheduleFlushOrCompaction(); + } impl->mutex_.Unlock(); if (s.ok()) { - impl->opened_successfully_ = true; Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p", impl); *dbptr = impl; diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 85157c8e6..da269a069 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -25,6 +25,7 @@ #include "util/logging.h" #include "util/mock_env.h" #include "util/mutexlock.h" +#include "util/sync_point.h" #include "util/testharness.h" #include "util/testutil.h" @@ -185,6 +186,13 @@ class FaultInjectionTestEnv : public EnvWrapper { Status NewWritableFile(const std::string& fname, unique_ptr* result, const EnvOptions& soptions) override { + if (!IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } + // Not allow overwriting files + if (target()->FileExists(fname)) { + return Status::Corruption("File already exists."); + } Status s = target()->NewWritableFile(fname, result, soptions); if (s.ok()) { result->reset(new TestWritableFile(fname, std::move(*result), this)); @@ -201,6 +209,9 @@ class FaultInjectionTestEnv : public EnvWrapper { } virtual Status DeleteFile(const std::string& f) override { + if (!IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } Status s = EnvWrapper::DeleteFile(f); if (!s.ok()) { fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(), @@ -215,6 +226,9 @@ class FaultInjectionTestEnv : public EnvWrapper { virtual Status RenameFile(const std::string& s, const std::string& t) override { + if (!IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } Status ret = EnvWrapper::RenameFile(s, t); if (ret.ok()) { @@ -373,8 +387,11 @@ TestWritableFile::~TestWritableFile() { } Status TestWritableFile::Append(const Slice& data) { + if (!env_->IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } Status s = target_->Append(data); - if (s.ok() && env_->IsFilesystemActive()) { + if (s.ok()) { state_.pos_ += data.size(); } return s; @@ -544,33 +561,34 @@ class FaultInjectionTest : public testing::Test { ASSERT_OK(s); } - void Build(const WriteOptions& write_options, int start_idx, int num_vals) { + void Build(const WriteOptions& write_options, int start_idx, int num_vals, + bool sequential = true) { std::string key_space, value_space; WriteBatch batch; for (int i = start_idx; i < start_idx + num_vals; i++) { - Slice key = Key(i, &key_space); + Slice key = Key(sequential, i, &key_space); batch.Clear(); batch.Put(key, Value(i, &value_space)); ASSERT_OK(db_->Write(write_options, &batch)); } } - Status ReadValue(int i, std::string* val) const { + Status ReadValue(int i, std::string* val, bool sequential) const { std::string key_space, value_space; - Slice key = Key(i, &key_space); + Slice key = Key(sequential, i, &key_space); Value(i, &value_space); ReadOptions options; return db_->Get(options, key, val); } - Status Verify(int start_idx, int num_vals, - ExpectedVerifResult expected) const { + Status Verify(int start_idx, int num_vals, ExpectedVerifResult expected, + bool seqeuntial = true) const { std::string val; std::string value_space; Status s; for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) { Value(i, &value_space); - s = ReadValue(i, &val); + s = ReadValue(i, &val, seqeuntial); if (s.ok()) { EXPECT_EQ(value_space, val); } @@ -590,9 +608,16 @@ class FaultInjectionTest : public testing::Test { } // Return the ith key - Slice Key(int i, std::string* storage) const { + Slice Key(bool seqeuntial, int i, std::string* storage) const { + int num = i; + if (!seqeuntial) { + // random transfer + const int m = 0x5bd1e995; + num *= m; + num ^= num << 24; + } char buf[100]; - snprintf(buf, sizeof(buf), "%016d", i); + snprintf(buf, sizeof(buf), "%016d", num); storage->assign(buf, strlen(buf)); return Slice(*storage); } @@ -772,14 +797,14 @@ TEST_F(FaultInjectionTest, DISABLED_WriteOptionSyncTest) { write_options.sync = false; std::string key_space, value_space; - ASSERT_OK( - db_->Put(write_options, Key(1, &key_space), Value(1, &value_space))); + ASSERT_OK(db_->Put(write_options, Key(true, 1, &key_space), + Value(1, &value_space))); FlushOptions flush_options; flush_options.wait = false; ASSERT_OK(db_->Flush(flush_options)); write_options.sync = true; - ASSERT_OK( - db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); + ASSERT_OK(db_->Put(write_options, Key(true, 2, &key_space), + Value(2, &value_space))); env_->SetFilesystemActive(false); NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); @@ -788,14 +813,59 @@ TEST_F(FaultInjectionTest, DISABLED_WriteOptionSyncTest) { ASSERT_OK(OpenDB()); std::string val; Value(2, &value_space); - ASSERT_OK(ReadValue(2, &val)); + ASSERT_OK(ReadValue(2, &val, true)); ASSERT_EQ(value_space, val); Value(1, &value_space); - ASSERT_OK(ReadValue(1, &val)); + ASSERT_OK(ReadValue(1, &val, true)); ASSERT_EQ(value_space, val); } +TEST_F(FaultInjectionTest, UninstalledCompaction) { + options_.target_file_size_base = 32 * 1024; + options_.write_buffer_size = 100 << 10; // 100KB + options_.level0_file_num_compaction_trigger = 6; + options_.level0_stop_writes_trigger = 1 << 10; + options_.level0_slowdown_writes_trigger = 1 << 10; + options_.max_background_compactions = 1; + OpenDB(); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"}, + {"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"}, + {"FaultInjectionTest::FaultTest:2", + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + int kNumKeys = 1000; + Build(WriteOptions(), 0, kNumKeys, false); + FlushOptions flush_options; + flush_options.wait = true; + db_->Flush(flush_options); + ASSERT_OK(db_->Put(WriteOptions(), "", "")); + TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0"); + TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1"); + env_->SetFilesystemActive(false); + TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2"); + CloseDB(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + ResetDBState(kResetDropUnsyncedData); + + std::atomic opened(false); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::Open:Opened", [&](void* arg) { opened.store(true); }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BGWorkCompaction", + [&](void* arg) { ASSERT_TRUE(opened.load()); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(OpenDB()); + static_cast(db_)->TEST_WaitForCompact(); + ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound, false)); + ASSERT_OK(db_->Put(WriteOptions(), "", "")); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace rocksdb int main(int argc, char** argv) {