diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 1da1bf1da..aeefd4195 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -833,6 +833,7 @@ void DBImpl::WriteStatusCheckOnLocked(const Status& status) { // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. // Caller must hold mutex_. + assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok()); mutex_.AssertHeld(); if (immutable_db_options_.paranoid_checks && !status.ok() && !status.IsBusy() && !status.IsIncomplete()) { @@ -843,6 +844,7 @@ void DBImpl::WriteStatusCheckOnLocked(const Status& status) { void DBImpl::WriteStatusCheck(const Status& status) { // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. + assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok()); if (immutable_db_options_.paranoid_checks && !status.ok() && !status.IsBusy() && !status.IsIncomplete()) { mutex_.Lock(); @@ -854,8 +856,9 @@ void DBImpl::WriteStatusCheck(const Status& status) { void DBImpl::IOStatusCheck(const IOStatus& io_status) { // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. - if (immutable_db_options_.paranoid_checks && !io_status.ok() && - !io_status.IsBusy() && !io_status.IsIncomplete()) { + if ((immutable_db_options_.paranoid_checks && !io_status.ok() && + !io_status.IsBusy() && !io_status.IsIncomplete()) || + io_status.IsIOFenced()) { mutex_.Lock(); error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback); mutex_.Unlock(); diff --git a/db/error_handler.cc b/db/error_handler.cc index e344e9931..ae4fe7805 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -32,6 +32,14 @@ std::map, Status::Code::kIOError, Status::SubCode::kSpaceLimit, true), Status::Severity::kHardError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, Status::SubCode::kIOFenced, + true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, Status::SubCode::kIOFenced, + false), + Status::Severity::kFatalError}, // Errors during BG flush {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, Status::SubCode::kNoSpace, true), @@ -42,6 +50,12 @@ std::map, {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, Status::SubCode::kSpaceLimit, true), Status::Severity::kHardError}, + {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, + Status::SubCode::kIOFenced, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, + Status::SubCode::kIOFenced, false), + Status::Severity::kFatalError}, // Errors during Write {std::make_tuple(BackgroundErrorReason::kWriteCallback, Status::Code::kIOError, Status::SubCode::kNoSpace, @@ -51,6 +65,14 @@ std::map, Status::Code::kIOError, Status::SubCode::kNoSpace, false), Status::Severity::kHardError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kIOError, Status::SubCode::kIOFenced, + true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kIOError, Status::SubCode::kIOFenced, + false), + Status::Severity::kFatalError}, // Errors during MANIFEST write {std::make_tuple(BackgroundErrorReason::kManifestWrite, Status::Code::kIOError, Status::SubCode::kNoSpace, @@ -60,6 +82,14 @@ std::map, Status::Code::kIOError, Status::SubCode::kNoSpace, false), Status::Severity::kHardError}, + {std::make_tuple(BackgroundErrorReason::kManifestWrite, + Status::Code::kIOError, Status::SubCode::kIOFenced, + true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kManifestWrite, + Status::Code::kIOError, Status::SubCode::kIOFenced, + false), + Status::Severity::kFatalError}, }; std::map, diff --git a/db/error_handler_fs_test.cc b/db/error_handler_fs_test.cc index 980daee9a..ba3ac4494 100644 --- a/db/error_handler_fs_test.cc +++ b/db/error_handler_fs_test.cc @@ -1962,6 +1962,194 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) { Close(); } +class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest, + public testing::WithParamInterface {}; + +TEST_P(DBErrorHandlingFencingTest, FLushWriteFenced) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.paranoid_checks = GetParam(); + Status s; + + listener->EnableAutoRecovery(true); + DestroyAndReopen(options); + + Put(Key(0), "val"); + SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) { + fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError); + ASSERT_TRUE(s.IsIOFenced()); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_TRUE(s.IsIOFenced()); + Destroy(options); +} + +TEST_P(DBErrorHandlingFencingTest, ManifestWriteFenced) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.paranoid_checks = GetParam(); + Status s; + std::string old_manifest; + std::string new_manifest; + + listener->EnableAutoRecovery(true); + DestroyAndReopen(options); + old_manifest = GetManifestNameFromLiveFiles(); + + Put(Key(0), "val"); + Flush(); + Put(Key(1), "val"); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void*) { + fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError); + ASSERT_TRUE(s.IsIOFenced()); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_TRUE(s.IsIOFenced()); + Close(); +} + +TEST_P(DBErrorHandlingFencingTest, CompactionWriteFenced) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.level0_file_num_compaction_trigger = 2; + options.listeners.emplace_back(listener); + options.paranoid_checks = GetParam(); + Status s; + DestroyAndReopen(options); + + Put(Key(0), "va;"); + Put(Key(2), "va;"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + listener->EnableAutoRecovery(true); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void*) { + fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced")); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + Put(Key(1), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError); + ASSERT_TRUE(s.IsIOFenced()); + + fault_fs->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_TRUE(s.IsIOFenced()); + Destroy(options); +} + +TEST_P(DBErrorHandlingFencingTest, WALWriteFenced) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.writable_file_max_buffer_size = 32768; + options.listeners.emplace_back(listener); + options.paranoid_checks = GetParam(); + Status s; + Random rnd(301); + + listener->EnableAutoRecovery(true); + DestroyAndReopen(options); + + { + WriteBatch batch; + + for (auto i = 0; i < 100; ++i) { + batch.Put(Key(i), rnd.RandomString(1024)); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + { + WriteBatch batch; + int write_error = 0; + + for (auto i = 100; i < 199; ++i) { + batch.Put(Key(i), rnd.RandomString(1024)); + } + + SyncPoint::GetInstance()->SetCallBack( + "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) { + write_error++; + if (write_error > 2) { + fault_fs->SetFilesystemActive(false, + IOStatus::IOFenced("IO fenced")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + WriteOptions wopts; + wopts.sync = true; + s = dbfull()->Write(wopts, &batch); + ASSERT_TRUE(s.IsIOFenced()); + } + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + { + WriteBatch batch; + + for (auto i = 0; i < 100; ++i) { + batch.Put(Key(i), rnd.RandomString(1024)); + } + + WriteOptions wopts; + wopts.sync = true; + s = dbfull()->Write(wopts, &batch); + ASSERT_TRUE(s.IsIOFenced()); + } + Close(); +} + +INSTANTIATE_TEST_CASE_P(DBErrorHandlingFSTest, DBErrorHandlingFencingTest, + ::testing::Bool()); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/io_status.h b/include/rocksdb/io_status.h index 8846753b9..ea13d3bc5 100644 --- a/include/rocksdb/io_status.h +++ b/include/rocksdb/io_status.h @@ -126,6 +126,11 @@ class IOStatus : public Status { return IOStatus(kIOError, kPathNotFound, msg, msg2); } + static IOStatus IOFenced() { return IOStatus(kIOError, kIOFenced); } + static IOStatus IOFenced(const Slice& msg, const Slice& msg2 = Slice()) { + return IOStatus(kIOError, kIOFenced, msg, msg2); + } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. // std::string ToString() const; diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 6006fb16b..b9151471a 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -113,6 +113,7 @@ class Status { kManualCompactionPaused = 11, kOverwritten = 12, kTxnNotPrepared = 13, + kIOFenced = 14, kMaxSubCode }; @@ -482,6 +483,14 @@ class Status { return (code() == kInvalidArgument) && (subcode() == kTxnNotPrepared); } + // Returns true iff the status indicates a IOFenced error. + bool IsIOFenced() const { +#ifdef ROCKSDB_ASSERT_STATUS_CHECKED + checked_ = true; +#endif // ROCKSDB_ASSERT_STATUS_CHECKED + return (code() == kIOError) && (subcode() == kIOFenced); + } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; diff --git a/util/status.cc b/util/status.cc index c9fa2659b..ad948f017 100644 --- a/util/status.cc +++ b/util/status.cc @@ -52,7 +52,9 @@ static const char* msgs[static_cast(Status::kMaxSubCode)] = { "Insufficient capacity for merge operands", // kManualCompactionPaused "Manual compaction paused", - " (overwritten)", // kOverwritten, subcode of OK + " (overwritten)", // kOverwritten, subcode of OK + "Txn not prepared", // kTxnNotPrepared + "IO fenced off", // kIOFenced }; Status::Status(Code _code, SubCode _subcode, const Slice& msg,