diff --git a/HISTORY.md b/HISTORY.md index 1d3fe1f58..8822461fc 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -31,7 +31,7 @@ ### Behavior Changes * File abstraction `FSRandomAccessFile.Prefetch()` default return status is changed from `OK` to `NotSupported`. If the user inherited file doesn't implement prefetch, RocksDB will create internal prefetch buffer to improve read performance. - +* When retryabel IO error happens during Flush (manifest write error is excluded) and WAL is disabled, originally it is mapped to kHardError. Now,it is mapped to soft error. So DB will not stall the writes unless the memtable is full. At the same time, when auto resume is triggered to recover the retryable IO error during Flush, SwitchMemtable is not called to avoid generating to many small immutable memtables. If WAL is enabled, no behavior changes. ### Others * Error in prefetching partitioned index blocks will not be swallowed. It will fail the query and return the IOError users. diff --git a/db/column_family_test.cc b/db/column_family_test.cc index db8acfc3f..73cc23c65 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -2427,7 +2427,7 @@ TEST_P(ColumnFamilyTest, FlushAndDropRaceCondition) { // Make sure the task is sleeping. Otherwise, it might start to execute // after sleeping_task.WaitUntilDone() and cause TSAN warning. sleeping_task.WaitUntilSleeping(); - + // 1MB should create ~10 files for each CF int kKeysNum = 10000; PutRandomData(1, kKeysNum, 100); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 87fdf9f92..567a04242 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -302,7 +302,7 @@ Status DBImpl::Resume() { // 4. Schedule compactions if needed for all the CFs. This is needed as the // flush in the prior step might have been a no-op for some CFs, which // means a new super version wouldn't have been installed -Status DBImpl::ResumeImpl() { +Status DBImpl::ResumeImpl(DBRecoverContext context) { mutex_.AssertHeld(); WaitForBackgroundWork(); @@ -364,7 +364,7 @@ Status DBImpl::ResumeImpl() { autovector cfds; SelectColumnFamiliesForAtomicFlush(&cfds); mutex_.Unlock(); - s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery); + s = AtomicFlushMemTables(cfds, flush_opts, context.flush_reason); mutex_.Lock(); } else { for (auto cfd : *versions_->GetColumnFamilySet()) { @@ -373,7 +373,7 @@ Status DBImpl::ResumeImpl() { } cfd->Ref(); mutex_.Unlock(); - s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery); + s = FlushMemTable(cfd, flush_opts, context.flush_reason); mutex_.Lock(); cfd->UnrefAndTryDelete(); if (!s.ok()) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 48607c7b1..d903964a2 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1379,7 +1379,7 @@ class DBImpl : public DB { // Required: DB mutex held Status PersistentStatsProcessFormatVersion(); - Status ResumeImpl(); + Status ResumeImpl(DBRecoverContext context); void MaybeIgnoreError(Status* s) const; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index d5dea4323..2cc1de095 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -125,7 +125,12 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { // "number < current_log_number". MarkLogsSynced(current_log_number - 1, true, io_s); if (!io_s.ok()) { - error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); + if (total_log_size_ > 0) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); + } else { + // If the WAL is empty, we use different error reason + error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL); + } TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); return io_s; } @@ -178,6 +183,10 @@ Status DBImpl::FlushMemTableToOutputFile( // SyncClosedLogs() may unlock and re-lock the db_mutex. io_s = SyncClosedLogs(job_context); s = io_s; + if (!io_s.ok() && !io_s.IsShutdownInProgress() && + !io_s.IsColumnFamilyDropped()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); + } } else { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip"); } @@ -217,10 +226,14 @@ Status DBImpl::FlushMemTableToOutputFile( // CURRENT file. With current code, it's just difficult to tell. So just // be pessimistic and try write to a new MANIFEST. // TODO: distinguish between MANIFEST write and CURRENT renaming - auto err_reason = versions_->io_status().ok() - ? BackgroundErrorReason::kFlush - : BackgroundErrorReason::kManifestWrite; - error_handler_.SetBGError(io_s, err_reason); + if (!versions_->io_status().ok()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite); + } else if (total_log_size_ > 0) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); + } else { + // If the WAL is empty, we use different error reason + error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL); + } } else { Status new_bg_error = s; error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); @@ -593,10 +606,14 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // CURRENT file. With current code, it's just difficult to tell. So just // be pessimistic and try write to a new MANIFEST. // TODO: distinguish between MANIFEST write and CURRENT renaming - auto err_reason = versions_->io_status().ok() - ? BackgroundErrorReason::kFlush - : BackgroundErrorReason::kManifestWrite; - error_handler_.SetBGError(io_s, err_reason); + if (!versions_->io_status().ok()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite); + } else if (total_log_size_ > 0) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); + } else { + // If the WAL is empty, we use different error reason + error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL); + } } else { Status new_bg_error = s; error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); @@ -1598,6 +1615,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, return s; } } + FlushRequest flush_req; { WriteContext context; @@ -1614,7 +1632,11 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, WaitForPendingWrites(); if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { - s = SwitchMemtable(cfd, &context); + if (flush_reason != FlushReason::kErrorRecoveryRetryFlush) { + s = SwitchMemtable(cfd, &context); + } else { + assert(cfd->imm()->NumNotFlushed() > 0); + } } if (s.ok()) { if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || @@ -1622,7 +1644,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, flush_memtable_id = cfd->imm()->GetLatestMemTableID(); flush_req.emplace_back(cfd, flush_memtable_id); } - if (immutable_db_options_.persist_stats_to_disk) { + if (immutable_db_options_.persist_stats_to_disk && + flush_reason != FlushReason::kErrorRecoveryRetryFlush) { ColumnFamilyData* cfd_stats = versions_->GetColumnFamilySet()->GetColumnFamily( kPersistentStatsColumnFamilyName); @@ -1651,7 +1674,6 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } } } - if (s.ok() && !flush_req.empty()) { for (auto& elem : flush_req) { ColumnFamilyData* loop_cfd = elem.first; @@ -1687,8 +1709,10 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, cfds.push_back(iter.first); flush_memtable_ids.push_back(&(iter.second)); } - s = WaitForFlushMemTables(cfds, flush_memtable_ids, - (flush_reason == FlushReason::kErrorRecovery)); + s = WaitForFlushMemTables( + cfds, flush_memtable_ids, + (flush_reason == FlushReason::kErrorRecovery || + flush_reason == FlushReason::kErrorRecoveryRetryFlush)); InstrumentedMutexLock lock_guard(&mutex_); for (auto* tmp_cfd : cfds) { tmp_cfd->UnrefAndTryDelete(); @@ -1746,7 +1770,8 @@ Status DBImpl::AtomicFlushMemTables( } } for (auto cfd : cfds) { - if (cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) { + if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) || + flush_reason == FlushReason::kErrorRecoveryRetryFlush) { continue; } cfd->Ref(); @@ -1789,8 +1814,10 @@ Status DBImpl::AtomicFlushMemTables( for (auto& iter : flush_req) { flush_memtable_ids.push_back(&(iter.second)); } - s = WaitForFlushMemTables(cfds, flush_memtable_ids, - (flush_reason == FlushReason::kErrorRecovery)); + s = WaitForFlushMemTables( + cfds, flush_memtable_ids, + (flush_reason == FlushReason::kErrorRecovery || + flush_reason == FlushReason::kErrorRecoveryRetryFlush)); InstrumentedMutexLock lock_guard(&mutex_); for (auto* cfd : cfds) { cfd->UnrefAndTryDelete(); @@ -1900,6 +1927,13 @@ Status DBImpl::WaitForFlushMemTables( if (!error_handler_.GetRecoveryError().ok()) { break; } + // If BGWorkStopped, which indicate that there is a BG error and + // 1) soft error but requires no BG work, 2) no in auto_recovery_ + if (!resuming_from_bg_err && error_handler_.IsBGWorkStopped() && + error_handler_.GetBGError().severity() < Status::Severity::kHardError) { + return error_handler_.GetBGError(); + } + // Number of column families that have been dropped. int num_dropped = 0; // Number of column families that have finished flush. diff --git a/db/error_handler.cc b/db/error_handler.cc index ae4fe7805..7dae0a638 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -90,6 +90,28 @@ std::map, Status::Code::kIOError, Status::SubCode::kIOFenced, false), Status::Severity::kFatalError}, + // Errors during BG flush with WAL disabled + {std::make_tuple(BackgroundErrorReason::kFlushNoWAL, + Status::Code::kIOError, Status::SubCode::kNoSpace, + true), + Status::Severity::kHardError}, + {std::make_tuple(BackgroundErrorReason::kFlushNoWAL, + Status::Code::kIOError, Status::SubCode::kNoSpace, + false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kFlushNoWAL, + Status::Code::kIOError, Status::SubCode::kSpaceLimit, + true), + Status::Severity::kHardError}, + {std::make_tuple(BackgroundErrorReason::kFlushNoWAL, + Status::Code::kIOError, Status::SubCode::kIOFenced, + true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kFlushNoWAL, + Status::Code::kIOError, Status::SubCode::kIOFenced, + false), + Status::Severity::kFatalError}, + }; std::map, @@ -140,6 +162,19 @@ std::map, {std::make_tuple(BackgroundErrorReason::kManifestWrite, Status::Code::kIOError, false), Status::Severity::kFatalError}, + // Errors during BG flush with WAL disabled + {std::make_tuple(BackgroundErrorReason::kFlushNoWAL, + Status::Code::kCorruption, true), + Status::Severity::kUnrecoverableError}, + {std::make_tuple(BackgroundErrorReason::kFlushNoWAL, + Status::Code::kCorruption, false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kFlushNoWAL, + Status::Code::kIOError, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kFlushNoWAL, + Status::Code::kIOError, false), + Status::Severity::kNoError}, }; std::map, Status::Severity> @@ -218,6 +253,7 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas bool paranoid = db_options_.paranoid_checks; Status::Severity sev = Status::Severity::kFatalError; Status new_bg_err; + DBRecoverContext context; bool found = false; { @@ -276,6 +312,7 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas } } + recover_context_ = context; if (auto_recovery) { recovery_in_prog_ = true; @@ -303,8 +340,10 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, // Always returns ok db_->DisableFileDeletionsWithLock(); } + Status new_bg_io_err = bg_io_err; Status s; + DBRecoverContext context; if (bg_io_err.GetDataLoss()) { // FIrst, data loss is treated as unrecoverable error. So it can directly // overwrite any existing bg_error_. @@ -316,6 +355,7 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, } EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, db_mutex_, &auto_recovery); + recover_context_ = context; return bg_error_; } else if (bg_io_err.GetRetryable()) { // Second, check if the error is a retryable IO error or not. if it is @@ -332,7 +372,27 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, if (bg_err.severity() > bg_error_.severity()) { bg_error_ = bg_err; } + recover_context_ = context; return bg_error_; + } else if (BackgroundErrorReason::kFlushNoWAL == reason) { + // When the BG Retryable IO error reason is flush without WAL, + // We map it to a soft error. At the same time, all the background work + // should be stopped except the BG work from recovery. Therefore, we + // set the soft_error_no_bg_work_ to true. At the same time, since DB + // continues to receive writes when BG error is soft error, to avoid + // to many small memtable being generated during auto resume, the flush + // reason is set to kErrorRecoveryRetryFlush. + Status bg_err(new_bg_io_err, Status::Severity::kSoftError); + if (recovery_in_prog_ && recovery_error_.ok()) { + recovery_error_ = bg_err; + } + if (bg_err.severity() > bg_error_.severity()) { + bg_error_ = bg_err; + } + soft_error_no_bg_work_ = true; + context.flush_reason = FlushReason::kErrorRecoveryRetryFlush; + recover_context_ = context; + return StartRecoverFromRetryableBGIOError(bg_io_err); } else { Status bg_err(new_bg_io_err, Status::Severity::kHardError); if (recovery_in_prog_ && recovery_error_.ok()) { @@ -341,6 +401,7 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, if (bg_err.severity() > bg_error_.severity()) { bg_error_ = bg_err; } + recover_context_ = context; return StartRecoverFromRetryableBGIOError(bg_io_err); } } else { @@ -407,6 +468,7 @@ Status ErrorHandler::ClearBGError() { Status old_bg_error = bg_error_; bg_error_ = Status::OK(); recovery_in_prog_ = false; + soft_error_no_bg_work_ = false; EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners, old_bg_error, db_mutex_); } @@ -419,6 +481,7 @@ Status ErrorHandler::ClearBGError() { Status ErrorHandler::RecoverFromBGError(bool is_manual) { #ifndef ROCKSDB_LITE InstrumentedMutexLock l(db_mutex_); + bool no_bg_work_original_flag = soft_error_no_bg_work_; if (is_manual) { // If its a manual recovery and there's a background recovery in progress // return busy status @@ -426,9 +489,24 @@ Status ErrorHandler::RecoverFromBGError(bool is_manual) { return Status::Busy(); } recovery_in_prog_ = true; + + // In manual resume, we allow the bg work to run. If it is a auto resume, + // the bg work should follow this tag. + soft_error_no_bg_work_ = false; + + // In manual resume, if the bg error is a soft error and also requires + // no bg work, the error must be recovered by call the flush with + // flush reason: kErrorRecoveryRetryFlush. In other case, the flush + // reason is set to kErrorRecovery. + if (no_bg_work_original_flag) { + recover_context_.flush_reason = FlushReason::kErrorRecoveryRetryFlush; + } else { + recover_context_.flush_reason = FlushReason::kErrorRecovery; + } } - if (bg_error_.severity() == Status::Severity::kSoftError) { + if (bg_error_.severity() == Status::Severity::kSoftError && + recover_context_.flush_reason == FlushReason::kErrorRecovery) { // Simply clear the background error and return recovery_error_ = Status::OK(); return ClearBGError(); @@ -438,7 +516,13 @@ Status ErrorHandler::RecoverFromBGError(bool is_manual) { // during the recovery process. While recovering, the only operations that // can generate background errors should be the flush operations recovery_error_ = Status::OK(); - Status s = db_->ResumeImpl(); + Status s = db_->ResumeImpl(recover_context_); + if (s.ok()) { + soft_error_no_bg_work_ = false; + } else { + soft_error_no_bg_work_ = no_bg_work_original_flag; + } + // For manual recover, shutdown, and fatal error cases, set // recovery_in_prog_ to false. For automatic background recovery, leave it // as is regardless of success or failure as it will be retried @@ -491,6 +575,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() { if (end_recovery_) { return; } + DBRecoverContext context = recover_context_; int resume_count = db_options_.max_bgerror_resume_count; uint64_t wait_interval = db_options_.bgerror_resume_retry_interval; // Recover from the retryable error. Create a separate thread to do it. @@ -502,7 +587,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() { TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeResume1"); recovery_io_error_ = IOStatus::OK(); recovery_error_ = Status::OK(); - Status s = db_->ResumeImpl(); + Status s = db_->ResumeImpl(context); TEST_SYNC_POINT("RecoverFromRetryableBGIOError:AfterResume0"); TEST_SYNC_POINT("RecoverFromRetryableBGIOError:AfterResume1"); if (s.IsShutdownInProgress() || @@ -537,6 +622,9 @@ void ErrorHandler::RecoverFromRetryableBGIOError() { EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners, old_bg_error, db_mutex_); recovery_in_prog_ = false; + if (soft_error_no_bg_work_) { + soft_error_no_bg_work_ = false; + } return; } else { TEST_SYNC_POINT("RecoverFromRetryableBGIOError:RecoverFail1"); diff --git a/db/error_handler.h b/db/error_handler.h index 6ede5559e..084434101 100644 --- a/db/error_handler.h +++ b/db/error_handler.h @@ -14,6 +14,17 @@ namespace ROCKSDB_NAMESPACE { class DBImpl; +// This structure is used to store the DB recovery context. The context is +// the information that related to the recover actions. For example, it contains +// FlushReason, which tells the flush job why this flush is called. +struct DBRecoverContext { + FlushReason flush_reason; + + DBRecoverContext() : flush_reason(FlushReason::kErrorRecovery) {} + + DBRecoverContext(FlushReason reason) : flush_reason(reason) {} +}; + class ErrorHandler { public: ErrorHandler(DBImpl* db, const ImmutableDBOptions& db_options, @@ -28,7 +39,8 @@ class ErrorHandler { recovery_thread_(nullptr), db_mutex_(db_mutex), auto_recovery_(false), - recovery_in_prog_(false) {} + recovery_in_prog_(false), + soft_error_no_bg_work_(false) {} ~ErrorHandler() { bg_error_.PermitUncheckedError(); recovery_error_.PermitUncheckedError(); @@ -59,9 +71,11 @@ class ErrorHandler { bool IsBGWorkStopped() { return !bg_error_.ok() && (bg_error_.severity() >= Status::Severity::kHardError || - !auto_recovery_); + !auto_recovery_ || soft_error_no_bg_work_); } + bool IsSoftErrorNoBGWork() { return soft_error_no_bg_work_; } + bool IsRecoveryInProgress() { return recovery_in_prog_; } Status RecoverFromBGError(bool is_manual = false); @@ -89,6 +103,12 @@ class ErrorHandler { // A flag indicating whether automatic recovery from errors is enabled bool auto_recovery_; bool recovery_in_prog_; + // A flag to indicate that for the soft error, we should not allow any + // backrgound work execpt the work is from recovery. + bool soft_error_no_bg_work_; + + // Used to store the context for recover, such as flush reason. + DBRecoverContext recover_context_; Status OverrideNoSpaceError(Status bg_error, bool* auto_recovery); void RecoverFromNoSpace(); diff --git a/db/error_handler_fs_test.cc b/db/error_handler_fs_test.cc index ba3ac4494..24291cca1 100644 --- a/db/error_handler_fs_test.cc +++ b/db/error_handler_fs_test.cc @@ -183,7 +183,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteError) { Destroy(options); } -TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) { +TEST_F(DBErrorHandlingFSTest, FLushWritRetryableError) { std::shared_ptr fault_fs( new FaultInjectionTestFS(FileSystem::Default())); std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); @@ -247,6 +247,92 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) { Destroy(options); } +TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 0; + Status s; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + WriteOptions wo = WriteOptions(); + wo.disableWAL = true; + Put(Key(1), "val1", wo); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + Put(Key(2), "val2", wo); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); + ASSERT_EQ("val2", Get(Key(2))); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ("val1", Get(Key(1))); + ASSERT_EQ("val2", Get(Key(2))); + Put(Key(3), "val3", wo); + ASSERT_EQ("val3", Get(Key(3))); + s = Flush(); + ASSERT_OK(s); + ASSERT_EQ("val3", Get(Key(3))); + + Put(Key(4), "val4", wo); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeCloseTableFile", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + Put(Key(5), "val5", wo); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); + ASSERT_EQ("val5", Get(Key(5))); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ("val4", Get(Key(4))); + ASSERT_EQ("val5", Get(Key(5))); + Put(Key(6), "val6", wo); + ASSERT_EQ("val6", Get(Key(6))); + s = Flush(); + ASSERT_OK(s); + ASSERT_EQ("val6", Get(Key(6))); + + Put(Key(7), "val7", wo); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeSyncTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + Put(Key(8), "val8", wo); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); + ASSERT_EQ("val8", Get(Key(8))); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ("val7", Get(Key(7))); + ASSERT_EQ("val8", Get(Key(8))); + Put(Key(9), "val9", wo); + ASSERT_EQ("val9", Get(Key(9))); + s = Flush(); + ASSERT_OK(s); + ASSERT_EQ("val9", Get(Key(9))); + Destroy(options); +} + TEST_F(DBErrorHandlingFSTest, ManifestWriteError) { std::shared_ptr fault_fs( new FaultInjectionTestFS(FileSystem::Default())); @@ -1213,6 +1299,114 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) { delete def_env; } +// When Put the KV-pair, the write option is set to disable WAL. +// If retryable error happens in this condition, map the bg error +// to soft error and trigger auto resume. During auto resume, SwitchMemtable +// is disabled to avoid small SST tables. Write can still be applied before +// the bg error is cleaned unless the memtable is full. +TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover1) { + // Activate the FS before the first resume + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 100000; // 0.1 second + Status s; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + WriteOptions wo = WriteOptions(); + wo.disableWAL = true; + Put(Key(1), "val1", wo); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"RecoverFromRetryableBGIOError:LoopOut", + "FLushWritNoWALRetryableeErrorAutoRecover1:1"}}); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ("val1", Get(Key(1))); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); + TEST_SYNC_POINT("FLushWritNoWALRetryableeErrorAutoRecover1:1"); + ASSERT_EQ("val1", Get(Key(1))); + ASSERT_EQ("val1", Get(Key(1))); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + Put(Key(2), "val2", wo); + s = Flush(); + // Since auto resume fails, the bg error is not cleand, flush will + // return the bg_error set before. + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); + ASSERT_EQ("val2", Get(Key(2))); + + // call auto resume + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + Put(Key(3), "val3", wo); + s = Flush(); + // After resume is successful, the flush should be ok. + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ("val3", Get(Key(3))); + Destroy(options); +} + +TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover2) { + // Activate the FS before the first resume + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 100000; // 0.1 second + Status s; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + WriteOptions wo = WriteOptions(); + wo.disableWAL = true; + Put(Key(1), "val1", wo); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ("val1", Get(Key(1))); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + ASSERT_EQ(listener->WaitForRecovery(5000000), true); + ASSERT_EQ("val1", Get(Key(1))); + Put(Key(2), "val2", wo); + s = Flush(); + // Since auto resume is successful, the bg error is cleaned, flush will + // be successful. + ASSERT_OK(s); + ASSERT_EQ("val2", Get(Key(2))); + Destroy(options); +} + TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover1) { // Fail the first resume and make the second resume successful std::shared_ptr fault_fs( diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 1c15fc0fe..861488c10 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -115,6 +115,9 @@ enum class FlushReason : int { kAutoCompaction = 0x09, kManualFlush = 0x0a, kErrorRecovery = 0xb, + // When set the flush reason to kErrorRecoveryRetryFlush, SwitchMemtable + // will not be called to avoid many small immutable memtables. + kErrorRecoveryRetryFlush = 0xc, }; enum class BackgroundErrorReason { @@ -123,6 +126,7 @@ enum class BackgroundErrorReason { kWriteCallback, kMemTable, kManifestWrite, + kFlushNoWAL, }; enum class WriteStallCondition {