diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 3febded80..3eb5dd704 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -79,7 +79,7 @@ class CompactionJobTest : public testing::Test { shutting_down_(false), preserve_deletes_seqnum_(0), mock_table_factory_(new mock::MockTableFactory()), - error_handler_(db_options_, &mutex_) { + error_handler_(nullptr, db_options_, &mutex_) { EXPECT_OK(env_->CreateDirIfMissing(dbname_)); db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); diff --git a/db/db_impl.cc b/db/db_impl.cc index 7a9e49cc2..a583d3392 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -215,9 +215,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, // requires a custom gc for compaction, we use that to set use_custom_gc_ // as well. use_custom_gc_(seq_per_batch), + shutdown_initiated_(false), preserve_deletes_(options.preserve_deletes), closed_(false), - error_handler_(immutable_db_options_, &mutex_) { + error_handler_(this, immutable_db_options_, &mutex_) { // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); @@ -259,16 +260,62 @@ Status DBImpl::Resume() { return Status::OK(); } - Status s = error_handler_.GetBGError(); - if (s.severity() > Status::Severity::kHardError) { + if (error_handler_.IsRecoveryInProgress()) { + // Don't allow a mix of manual and automatic recovery + return Status::Busy(); + } + + mutex_.Unlock(); + Status s = error_handler_.RecoverFromBGError(true); + mutex_.Lock(); + return s; +} + +// This function implements the guts of recovery from a background error. It +// is eventually called for both manual as well as automatic recovery. It does +// the following - +// 1. Wait for currently scheduled background flush/compaction to exit, in +// order to inadvertently causing an error and thinking recovery failed +// 2. Flush memtables if there's any data for all the CFs. This may result +// another error, which will be saved by error_handler_ and reported later +// as the recovery status +// 3. Find and delete any obsolete files +// 4. Schedule compactions if needed for all the CFs. This is needed as the +// flush in the prior step might have been a no-op for some CFs, which +// means a new super version wouldn't have been installed +Status DBImpl::ResumeImpl() { + mutex_.AssertHeld(); + WaitForBackgroundWork(); + + Status bg_error = error_handler_.GetBGError(); + Status s; + if (shutdown_initiated_) { + // Returning shutdown status to SFM during auto recovery will cause it + // to abort the recovery and allow the shutdown to progress + s = Status::ShutdownInProgress(); + } + if (s.ok() && bg_error.severity() > Status::Severity::kHardError) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB resume requested but failed due to Fatal/Unrecoverable error"); - return s; + s = bg_error; + } + + // We cannot guarantee consistency of the WAL. So force flush Memtables of + // all the column families + if (s.ok()) { + s = FlushAllCFs(FlushReason::kErrorRecovery); + if (!s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "DB resume requested but failed due to Flush failure [%s]", + s.ToString().c_str()); + } } JobContext job_context(0); FindObsoleteFiles(&job_context, true); - error_handler_.ClearBGError(); + if (s.ok()) { + s = error_handler_.ClearBGError(); + } mutex_.Unlock(); job_context.manifest_file_number = 1; @@ -277,13 +324,36 @@ Status DBImpl::Resume() { } job_context.Clean(); - ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB"); + if (s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB"); + } mutex_.Lock(); - MaybeScheduleFlushOrCompaction(); + // Check for shutdown again before scheduling further compactions, + // since we released and re-acquired the lock above + if (shutdown_initiated_) { + s = Status::ShutdownInProgress(); + } + if (s.ok()) { + for (auto cfd : *versions_->GetColumnFamilySet()) { + SchedulePendingCompaction(cfd); + } + MaybeScheduleFlushOrCompaction(); + } + + // Wake up any waiters - in this case, it could be the shutdown thread + bg_cv_.SignalAll(); // No need to check BGError again. If something happened, event listener would be // notified and the operation causing it would have failed - return Status::OK(); + return s; +} + +void DBImpl::WaitForBackgroundWork() { + // Wait for background work to finish + while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || + bg_flush_scheduled_) { + bg_cv_.Wait(); + } } // Will lock the mutex_, will wait for completion if wait is true @@ -313,14 +383,20 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { if (!wait) { return; } - // Wait for background work to finish - while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || - bg_flush_scheduled_) { - bg_cv_.Wait(); - } + WaitForBackgroundWork(); } Status DBImpl::CloseHelper() { + // Guarantee that there is no background error recovery in progress before + // continuing with the shutdown + mutex_.Lock(); + shutdown_initiated_ = true; + error_handler_.CancelErrorRecovery(); + while (error_handler_.IsRecoveryInProgress()) { + bg_cv_.Wait(); + } + mutex_.Unlock(); + // CancelAllBackgroundWork called with false means we just set the shutdown // marker. After this we do a variant of the waiting and unschedule work // (to consider: moving all the waiting into CancelAllBackgroundWork(true)) @@ -338,7 +414,8 @@ Status DBImpl::CloseHelper() { // Wait for background work to finish while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || bg_flush_scheduled_ || bg_purge_scheduled_ || - pending_purge_obsolete_files_) { + pending_purge_obsolete_files_ || + error_handler_.IsRecoveryInProgress()) { TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob"); bg_cv_.Wait(); } diff --git a/db/db_impl.h b/db/db_impl.h index 8983c7895..ed8591990 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -795,6 +795,7 @@ class DBImpl : public DB { private: friend class DB; + friend class ErrorHandler; friend class InternalStats; friend class PessimisticTransaction; friend class TransactionBaseImpl; @@ -845,6 +846,8 @@ class DBImpl : public DB { bool read_only = false, bool error_if_log_file_exist = false, bool error_if_data_exists_in_logs = false); + Status ResumeImpl(); + void MaybeIgnoreError(Status* s) const; const Status CreateArchivalDirectory(); @@ -1046,9 +1049,10 @@ class DBImpl : public DB { LogBuffer* log_buffer, PrepickedCompaction* prepicked_compaction); Status BackgroundFlush(bool* madeProgress, JobContext* job_context, - LogBuffer* log_buffer); + LogBuffer* log_buffer, FlushReason* reason); - bool EnoughRoomForCompaction(const std::vector& inputs, + bool EnoughRoomForCompaction(ColumnFamilyData* cfd, + const std::vector& inputs, bool* sfm_bookkeeping, LogBuffer* log_buffer); void PrintStatistics(); @@ -1082,6 +1086,10 @@ class DBImpl : public DB { Status CloseHelper(); + Status FlushAllCFs(FlushReason flush_reason); + + void WaitForBackgroundWork(); + // table_cache_ provides its own synchronization std::shared_ptr table_cache_; @@ -1526,6 +1534,13 @@ class DBImpl : public DB { // flush/compaction and if it is not provided vis SnapshotChecker, we should // disable gc to be safe. const bool use_custom_gc_; + // Flag to indicate that the DB instance shutdown has been initiated. This + // different from shutting_down_ atomic in that it is set at the beginning + // of shutdown sequence, specifically in order to prevent any background + // error recovery from going on in parallel. The latter, shutting_down_, + // is set a little later during the shutdown after scheduling memtable + // flushes + bool shutdown_initiated_; // Clients must periodically call SetPreserveDeletesSequenceNumber() // to advance this seqnum. Default value is 0 which means ALL deletes are diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 9bbbc4734..eef8cf98d 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -26,7 +26,7 @@ namespace rocksdb { bool DBImpl::EnoughRoomForCompaction( - const std::vector& inputs, + ColumnFamilyData* cfd, const std::vector& inputs, bool* sfm_reserved_compact_space, LogBuffer* log_buffer) { // Check if we have enough room to do the compaction bool enough_room = true; @@ -34,12 +34,17 @@ bool DBImpl::EnoughRoomForCompaction( auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); if (sfm) { - enough_room = sfm->EnoughRoomForCompaction(inputs); + // Pass the current bg_error_ to SFM so it can decide what checks to + // perform. If this DB instance hasn't seen any error yet, the SFM can be + // optimistic and not do disk space checks + enough_room = + sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError()); if (enough_room) { *sfm_reserved_compact_space = true; } } #else + (void)cfd; (void)inputs; (void)sfm_reserved_compact_space; #endif // ROCKSDB_LITE @@ -584,7 +589,7 @@ Status DBImpl::CompactFilesImpl( bool sfm_reserved_compact_space = false; // First check if we have enough room to do the compaction bool enough_room = EnoughRoomForCompaction( - input_files, &sfm_reserved_compact_space, log_buffer); + cfd, input_files, &sfm_reserved_compact_space, log_buffer); if (!enough_room) { // m's vars will get set properly at the end of this function, @@ -914,6 +919,68 @@ Status DBImpl::Flush(const FlushOptions& flush_options, return s; } + +Status DBImpl::FlushAllCFs(FlushReason flush_reason) { + Status s; + WriteContext context; + WriteThread::Writer w; + + mutex_.AssertHeld(); + write_thread_.EnterUnbatched(&w, &mutex_); + + FlushRequest flush_req; + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() && + cached_recoverable_state_empty_.load()) { + // Nothing to flush + continue; + } + + // SwitchMemtable() will release and reacquire mutex during execution + s = SwitchMemtable(cfd, &context); + if (!s.ok()) { + break; + } + + cfd->imm()->FlushRequested(); + + flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID()); + } + + // schedule flush + if (s.ok() && !flush_req.empty()) { + SchedulePendingFlush(flush_req, flush_reason); + MaybeScheduleFlushOrCompaction(); + } + + write_thread_.ExitUnbatched(&w); + + if (s.ok()) { + for (auto& flush : flush_req) { + auto cfd = flush.first; + auto flush_memtable_id = flush.second; + while (cfd->imm()->NumNotFlushed() > 0 && + cfd->imm()->GetEarliestMemTableID() <= flush_memtable_id) { + if (!error_handler_.GetRecoveryError().ok()) { + break; + } + if (cfd->IsDropped()) { + // FlushJob cannot flush a dropped CF, if we did not break here + // we will loop forever since cfd->imm()->NumNotFlushed() will never + // drop to zero + continue; + } + cfd->Ref(); + bg_cv_.Wait(); + cfd->Unref(); + } + } + } + + flush_req.clear(); + return s; +} + Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, int output_level, uint32_t output_path_id, uint32_t max_subcompactions, @@ -1236,6 +1303,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { if (bg_work_paused_ > 0) { // we paused the background work return; + } else if (error_handler_.IsBGWorkStopped() && + !error_handler_.IsRecoveryInProgress()) { + // There has been a hard error and this call is not part of the recovery + // sequence. Bail out here so we don't get into an endless loop of + // scheduling BG work which will again call this function + return; } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions return; @@ -1263,6 +1336,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { if (bg_compaction_paused_ > 0) { // we paused the background compaction return; + } else if (error_handler_.IsBGWorkStopped()) { + // Compaction is not part of the recovery sequence from a hard error. We + // might get here because recovery might do a flush and install a new + // super version, which will try to schedule pending compactions. Bail + // out here and let the higher level recovery handle compactions + return; } if (HasExclusiveManualCompaction()) { @@ -1420,15 +1499,18 @@ void DBImpl::UnscheduleCallback(void* arg) { } Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, - LogBuffer* log_buffer) { + LogBuffer* log_buffer, FlushReason* reason) { mutex_.AssertHeld(); Status status; + *reason = FlushReason::kOthers; + // If BG work is stopped due to an error, but a recovery is in progress, + // that means this flush is part of the recovery. So allow it to go through if (!error_handler_.IsBGWorkStopped()) { if (shutting_down_.load(std::memory_order_acquire)) { status = Status::ShutdownInProgress(); } - } else { + } else if (!error_handler_.IsRecoveryInProgress()) { status = error_handler_.GetBGError(); } @@ -1479,6 +1561,9 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, } status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, job_context, log_buffer); + // All the CFDs in the FlushReq must have the same flush reason, so just + // grab the first one + *reason = bg_flush_args[0].cfd_->GetFlushReason(); for (auto& arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; if (cfd->Unref()) { @@ -1505,9 +1590,12 @@ void DBImpl::BackgroundCallFlush() { auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); + FlushReason reason; - Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer); - if (!s.ok() && !s.IsShutdownInProgress()) { + Status s = + BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason); + if (!s.ok() && !s.IsShutdownInProgress() && + reason != FlushReason::kErrorRecovery) { // Wait a little bit before retrying background flush in // case this is an environmental problem and we do not want to // chew up resources for failed flushes for the duration of @@ -1697,6 +1785,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } } else { status = error_handler_.GetBGError(); + // If we get here, it means a hard error happened after this compaction + // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got + // a chance to execute. Since we didn't pop a cfd from the compaction + // queue, increment unscheduled_compactions_ + unscheduled_compactions_++; } if (!status.ok()) { @@ -1732,7 +1825,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } else { // First check if we have enough room to do the compaction bool enough_room = EnoughRoomForCompaction( - *(c->inputs()), &sfm_reserved_compact_space, log_buffer); + m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer); if (!enough_room) { // Then don't do the compaction @@ -1795,7 +1888,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (c != nullptr) { bool enough_room = EnoughRoomForCompaction( - *(c->inputs()), &sfm_reserved_compact_space, log_buffer); + cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer); if (!enough_room) { // Then don't do the compaction @@ -2004,8 +2097,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status, compaction_job_stats, job_context->job_id); } - // this will unref its input_version and column_family_data - c.reset(); if (status.ok() || status.IsCompactionTooLarge()) { // Done @@ -2015,7 +2106,26 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", status.ToString().c_str()); error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); + if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) { + // Put this cfd back in the compaction queue so we can retry after some + // time + auto cfd = c->column_family_data(); + assert(cfd != nullptr); + // Since this compaction failed, we need to recompute the score so it + // takes the original input files into account + c->column_family_data() + ->current() + ->storage_info() + ->ComputeCompactionScore(*(c->immutable_cf_options()), + *(c->mutable_cf_options())); + if (!cfd->queued_for_compaction()) { + AddToCompactionQueue(cfd); + ++unscheduled_compactions_; + } + } } + // this will unref its input_version and column_family_data + c.reset(); if (is_manual) { ManualCompactionState* m = manual_compaction; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index f4709b849..6ed358d3b 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -137,7 +137,7 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) { while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || bg_flush_scheduled_ || (wait_unscheduled && unscheduled_compactions_)) && - !error_handler_.IsDBStopped()) { + (error_handler_.GetBGError() == Status::OK())) { bg_cv_.Wait(); } return error_handler_.GetBGError(); diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index d4614e7f8..8371b0720 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -134,8 +134,15 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { for (size_t i = 0; i < result.db_paths.size(); i++) { DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path); } -#endif + // Create a default SstFileManager for purposes of tracking compaction size + // and facilitating recovery from out of space errors. + if (result.sst_file_manager.get() == nullptr) { + std::shared_ptr sst_file_manager( + NewSstFileManager(result.env, result.info_log)); + result.sst_file_manager = sst_file_manager; + } +#endif return result; } @@ -1050,6 +1057,12 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, break; } } + + // For recovery from NoSpace() error, we can only handle + // the case where the database is stored in a single path + if (paths.size() <= 1) { + impl->error_handler_.EnableAutoRecovery(); + } } if (!s.ok()) { @@ -1213,6 +1226,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } } } + + // Reserve some disk buffer space. This is a heuristic - when we run out + // of disk space, this ensures that there is atleast write_buffer_size + // amount of free space before we resume DB writes. In low disk space + // conditions, we want to avoid a lot of small L0 files due to frequent + // WAL write failures and resultant forced flushes + sfm->ReserveDiskBuffer(max_write_buffer_size, + impl->immutable_db_options_.db_paths[0].path); } #endif // !ROCKSDB_LITE diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 61c5f15f6..ff786e113 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -710,6 +710,10 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, assert(write_context != nullptr && need_log_sync != nullptr); Status status; + if (error_handler_.IsDBStopped()) { + status = error_handler_.GetBGError(); + } + PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time); assert(!single_column_family_mode_ || @@ -728,10 +732,6 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, status = HandleWriteBufferFull(write_context); } - if (UNLIKELY(status.ok())) { - status = error_handler_.GetBGError(); - } - if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { status = ScheduleFlushes(write_context); } @@ -1184,7 +1184,11 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, mutex_.Lock(); } - while (!error_handler_.IsDBStopped() && write_controller_.IsStopped()) { + // Don't wait if there's a background error, even if its a soft error. We + // might wait here indefinitely as the background compaction may never + // finish successfully, resulting in the stall condition lasting + // indefinitely + while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) { if (write_options.no_slowdown) { return Status::Incomplete(); } @@ -1200,7 +1204,19 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, RecordTick(stats_, STALL_MICROS, time_delayed); } - return error_handler_.GetBGError(); + // If DB is not in read-only mode and write_controller is not stopping + // writes, we can ignore any background errors and allow the write to + // proceed + Status s; + if (write_controller_.IsStopped()) { + // If writes are still stopped, it means we bailed due to a background + // error + s = Status::Incomplete(error_handler_.GetBGError().ToString()); + } + if (error_handler_.IsDBStopped()) { + s = error_handler_.GetBGError(); + } + return s; } Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, diff --git a/db/error_handler.cc b/db/error_handler.cc index a5b23a4b8..8e297df98 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -4,7 +4,9 @@ // (found in the LICENSE.Apache file in the root directory). // #include "db/error_handler.h" +#include "db/db_impl.h" #include "db/event_helpers.h" +#include "util/sst_file_manager_impl.h" namespace rocksdb { @@ -33,7 +35,7 @@ std::map, // Errors during BG flush {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, Status::SubCode::kNoSpace, true), - Status::Severity::kSoftError}, + Status::Severity::kHardError}, {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, Status::SubCode::kNoSpace, false), Status::Severity::kNoError}, @@ -44,11 +46,11 @@ std::map, {std::make_tuple(BackgroundErrorReason::kWriteCallback, Status::Code::kIOError, Status::SubCode::kNoSpace, true), - Status::Severity::kFatalError}, + Status::Severity::kHardError}, {std::make_tuple(BackgroundErrorReason::kWriteCallback, Status::Code::kIOError, Status::SubCode::kNoSpace, false), - Status::Severity::kFatalError}, + Status::Severity::kHardError}, }; std::map, Status::Severity> @@ -118,6 +120,45 @@ std::map, Status::Severity> Status::Severity::kFatalError}, }; +void ErrorHandler::CancelErrorRecovery() { +#ifndef ROCKSDB_LITE + db_mutex_->AssertHeld(); + + // We'll release the lock before calling sfm, so make sure no new + // recovery gets scheduled at that point + auto_recovery_ = false; + SstFileManagerImpl* sfm = reinterpret_cast( + db_options_.sst_file_manager.get()); + if (sfm) { + // This may or may not cancel a pending recovery + db_mutex_->Unlock(); + bool cancelled = sfm->CancelErrorRecovery(this); + db_mutex_->Lock(); + if (cancelled) { + recovery_in_prog_ = false; + } + } +#endif +} + +// This is the main function for looking at an error during a background +// operation and deciding the severity, and error recovery strategy. The high +// level algorithm is as follows - +// 1. Classify the severity of the error based on the ErrorSeverityMap, +// DefaultErrorSeverityMap and DefaultReasonMap defined earlier +// 2. Call a Status code specific override function to adjust the severity +// if needed. The reason for this is our ability to recover may depend on +// the exact options enabled in DBOptions +// 3. Determine if auto recovery is possible. A listener notification callback +// is called, which can disable the auto recovery even if we decide its +// feasible +// 4. For Status::NoSpace() errors, rely on SstFileManagerImpl to control +// the actual recovery. If no sst file manager is specified in DBOptions, +// a default one is allocated during DB::Open(), so there will always be +// one. +// This can also get called as part of a recovery operation. In that case, we +// also track the error seperately in recovery_error_ so we can tell in the +// end whether recovery succeeded or not Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reason) { db_mutex_->AssertHeld(); @@ -125,6 +166,12 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas return Status::OK(); } + // Check if recovery is currently in progress. If it is, we will save this + // error so we can check it at the end to see if recovery succeeded or not + if (recovery_in_prog_ && recovery_error_.ok()) { + recovery_error_ = bg_err; + } + bool paranoid = db_options_.paranoid_checks; Status::Severity sev = Status::Severity::kFatalError; Status new_bg_err; @@ -156,15 +203,143 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas } new_bg_err = Status(bg_err, sev); + + bool auto_recovery = auto_recovery_; + if (new_bg_err.severity() >= Status::Severity::kFatalError && auto_recovery) { + auto_recovery = false; + ; + } + + // Allow some error specific overrides + if (new_bg_err == Status::NoSpace()) { + new_bg_err = OverrideNoSpaceError(new_bg_err, &auto_recovery); + } + if (!new_bg_err.ok()) { Status s = new_bg_err; - EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, db_mutex_); + EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, + db_mutex_, &auto_recovery); if (!s.ok() && (s.severity() > bg_error_.severity())) { bg_error_ = s; + } else { + // This error is less severe than previously encountered error. Don't + // take any further action + return bg_error_; } } + if (auto_recovery) { + recovery_in_prog_ = true; + + // Kick-off error specific recovery + if (bg_error_ == Status::NoSpace()) { + RecoverFromNoSpace(); + } + } return bg_error_; } +Status ErrorHandler::OverrideNoSpaceError(Status bg_error, + bool* auto_recovery) { +#ifndef ROCKSDB_LITE + if (bg_error.severity() >= Status::Severity::kFatalError) { + return bg_error; + } + + if (db_options_.sst_file_manager.get() == nullptr) { + // We rely on SFM to poll for enough disk space and recover + *auto_recovery = false; + return bg_error; + } + + if (db_options_.allow_2pc && + (bg_error.severity() <= Status::Severity::kSoftError)) { + // Don't know how to recover, as the contents of the current WAL file may + // be inconsistent, and it may be needed for 2PC. If 2PC is not enabled, + // we can just flush the memtable and discard the log + *auto_recovery = false; + return Status(bg_error, Status::Severity::kFatalError); + } + + { + uint64_t free_space; + if (db_options_.env->GetFreeSpace(db_options_.db_paths[0].path, + &free_space) == Status::NotSupported()) { + *auto_recovery = false; + } + } + + return bg_error; +#else + (void)auto_recovery; + return Status(bg_error, Status::Severity::kFatalError); +#endif +} + +void ErrorHandler::RecoverFromNoSpace() { +#ifndef ROCKSDB_LITE + SstFileManagerImpl* sfm = + reinterpret_cast(db_options_.sst_file_manager.get()); + + // Inform SFM of the error, so it can kick-off the recovery + if (sfm) { + sfm->StartErrorRecovery(this, bg_error_); + } +#endif +} + +Status ErrorHandler::ClearBGError() { +#ifndef ROCKSDB_LITE + db_mutex_->AssertHeld(); + + // Signal that recovery succeeded + if (recovery_error_.ok()) { + Status old_bg_error = bg_error_; + bg_error_ = Status::OK(); + recovery_in_prog_ = false; + EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners, + old_bg_error, db_mutex_); + } + return recovery_error_; +#else + return bg_error_; +#endif +} + +Status ErrorHandler::RecoverFromBGError(bool is_manual) { +#ifndef ROCKSDB_LITE + InstrumentedMutexLock l(db_mutex_); + if (is_manual) { + // If its a manual recovery and there's a background recovery in progress + // return busy status + if (recovery_in_prog_) { + return Status::Busy(); + } + recovery_in_prog_ = true; + } + + if (bg_error_.severity() == Status::Severity::kSoftError) { + // Simply clear the background error and return + recovery_error_ = Status::OK(); + return ClearBGError(); + } + + // Reset recovery_error_. We will use this to record any errors that happen + // during the recovery process. While recovering, the only operations that + // can generate background errors should be the flush operations + recovery_error_ = Status::OK(); + Status s = db_->ResumeImpl(); + // For manual recover, shutdown, and fatal error cases, set + // recovery_in_prog_ to false. For automatic background recovery, leave it + // as is regardless of success or failure as it will be retried + if (is_manual || s.IsShutdownInProgress() || + bg_error_.severity() >= Status::Severity::kFatalError) { + recovery_in_prog_ = false; + } + return s; +#else + (void)is_manual; + return bg_error_; +#endif +} } diff --git a/db/error_handler.h b/db/error_handler.h index 8a1a411fc..ce8454da6 100644 --- a/db/error_handler.h +++ b/db/error_handler.h @@ -11,42 +11,65 @@ namespace rocksdb { +class DBImpl; + class ErrorHandler { public: - ErrorHandler(const ImmutableDBOptions& db_options, - InstrumentedMutex* db_mutex) - : db_options_(db_options), - bg_error_(Status::OK()), - db_mutex_(db_mutex) - {} - ~ErrorHandler() {} + ErrorHandler(DBImpl* db, const ImmutableDBOptions& db_options, + InstrumentedMutex* db_mutex) + : db_(db), + db_options_(db_options), + bg_error_(Status::OK()), + recovery_error_(Status::OK()), + db_mutex_(db_mutex), + auto_recovery_(false), + recovery_in_prog_(false) {} + ~ErrorHandler() {} - Status::Severity GetErrorSeverity(BackgroundErrorReason reason, - Status::Code code, Status::SubCode subcode); + void EnableAutoRecovery() { auto_recovery_ = true; } - Status SetBGError(const Status& bg_err, BackgroundErrorReason reason); + Status::Severity GetErrorSeverity(BackgroundErrorReason reason, + Status::Code code, + Status::SubCode subcode); - Status GetBGError() - { - return bg_error_; - } + Status SetBGError(const Status& bg_err, BackgroundErrorReason reason); - void ClearBGError() { - bg_error_ = Status::OK(); - } + Status GetBGError() { return bg_error_; } - bool IsDBStopped() { - return !bg_error_.ok(); + Status GetRecoveryError() { return recovery_error_; } + + Status ClearBGError(); + + bool IsDBStopped() { + return !bg_error_.ok() && + bg_error_.severity() >= Status::Severity::kHardError; } bool IsBGWorkStopped() { - return !bg_error_.ok(); + return !bg_error_.ok() && + (bg_error_.severity() >= Status::Severity::kHardError || + !auto_recovery_); } - private: + bool IsRecoveryInProgress() { return recovery_in_prog_; } + + Status RecoverFromBGError(bool is_manual = false); + void CancelErrorRecovery(); + + private: + DBImpl* db_; const ImmutableDBOptions& db_options_; Status bg_error_; + // A seperate Status variable used to record any errors during the + // recovery process from hard errors + Status recovery_error_; InstrumentedMutex* db_mutex_; + // A flag indicating whether automatic recovery from errors is enabled + bool auto_recovery_; + bool recovery_in_prog_; + + Status OverrideNoSpaceError(Status bg_error, bool* auto_recovery); + void RecoverFromNoSpace(); }; } diff --git a/db/error_handler_test.cc b/db/error_handler_test.cc index abd5663d8..6efba2987 100644 --- a/db/error_handler_test.cc +++ b/db/error_handler_test.cc @@ -6,9 +6,12 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#ifndef ROCKSDB_LITE + #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/perf_context.h" +#include "rocksdb/sst_file_manager.h" #include "util/fault_injection_test_env.h" #if !defined(ROCKSDB_LITE) #include "util/sync_point.h" @@ -33,36 +36,137 @@ class DBErrorHandlingEnv : public EnvWrapper { bool trig_io_error; }; +class ErrorHandlerListener : public EventListener { + public: + ErrorHandlerListener() + : mutex_(), + cv_(&mutex_), + no_auto_recovery_(false), + recovery_complete_(false), + file_creation_started_(false), + override_bg_error_(false), + file_count_(0), + fault_env_(nullptr) {} + + void OnTableFileCreationStarted(const TableFileCreationBriefInfo& /*ti*/) { + InstrumentedMutexLock l(&mutex_); + file_creation_started_ = true; + if (file_count_ > 0) { + if (--file_count_ == 0) { + fault_env_->SetFilesystemActive(false, file_creation_error_); + file_creation_error_ = Status::OK(); + } + } + cv_.SignalAll(); + } + + void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/, + Status /*bg_error*/, bool* auto_recovery) { + if (*auto_recovery && no_auto_recovery_) { + *auto_recovery = false; + } + } + + void OnErrorRecoveryCompleted(Status /*old_bg_error*/) { + InstrumentedMutexLock l(&mutex_); + recovery_complete_ = true; + cv_.SignalAll(); + } + + bool WaitForRecovery(uint64_t /*abs_time_us*/) { + InstrumentedMutexLock l(&mutex_); + while (!recovery_complete_) { + cv_.Wait(/*abs_time_us*/); + } + if (recovery_complete_) { + recovery_complete_ = false; + return true; + } + return false; + } + + void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) { + InstrumentedMutexLock l(&mutex_); + while (!file_creation_started_) { + cv_.Wait(/*abs_time_us*/); + } + file_creation_started_ = false; + } + + void OnBackgroundError(BackgroundErrorReason /*reason*/, + Status* bg_error) override { + if (override_bg_error_) { + *bg_error = bg_error_; + override_bg_error_ = false; + } + } + + void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; } + + void OverrideBGError(Status bg_err) { + bg_error_ = bg_err; + override_bg_error_ = true; + } + + void InjectFileCreationError(FaultInjectionTestEnv* env, int file_count, + Status s) { + fault_env_ = env; + file_count_ = file_count; + file_creation_error_ = s; + } + + private: + InstrumentedMutex mutex_; + InstrumentedCondVar cv_; + bool no_auto_recovery_; + bool recovery_complete_; + bool file_creation_started_; + bool override_bg_error_; + int file_count_; + Status file_creation_error_; + Status bg_error_; + FaultInjectionTestEnv* fault_env_; +}; + TEST_F(DBErrorHandlingTest, FLushWriteError) { std::unique_ptr fault_env( new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); Options options = GetDefaultOptions(); options.create_if_missing = true; options.env = fault_env.get(); + options.listeners.emplace_back(listener); Status s; + + listener->EnableAutoRecovery(false); DestroyAndReopen(options); - Put(Key(0), "va;"); + Put(Key(0), "val"); SyncPoint::GetInstance()->SetCallBack( "FlushJob::Start", [&](void *) { fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); }); SyncPoint::GetInstance()->EnableProcessing(); s = Flush(); - ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError); + SyncPoint::GetInstance()->DisableProcessing(); fault_env->SetFilesystemActive(true); s = dbfull()->Resume(); ASSERT_EQ(s, Status::OK()); + Reopen(options); + ASSERT_EQ("val", Get(Key(0))); Destroy(options); } TEST_F(DBErrorHandlingTest, CompactionWriteError) { std::unique_ptr fault_env( new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); Options options = GetDefaultOptions(); options.create_if_missing = true; options.level0_file_num_compaction_trigger = 2; + options.listeners.emplace_back(listener); options.env = fault_env.get(); Status s; DestroyAndReopen(options); @@ -72,6 +176,10 @@ TEST_F(DBErrorHandlingTest, CompactionWriteError) { s = Flush(); ASSERT_EQ(s, Status::OK()); + listener->OverrideBGError( + Status(Status::NoSpace(), Status::Severity::kHardError) + ); + listener->EnableAutoRecovery(false); rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"FlushMemTableFinished", "BackgroundCallCompaction:0"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( @@ -85,7 +193,7 @@ TEST_F(DBErrorHandlingTest, CompactionWriteError) { ASSERT_EQ(s, Status::OK()); s = dbfull()->TEST_WaitForCompact(); - ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError); fault_env->SetFilesystemActive(true); s = dbfull()->Resume(); @@ -129,6 +237,453 @@ TEST_F(DBErrorHandlingTest, CorruptionError) { Destroy(options); } +#ifndef TRAVIS +TEST_F(DBErrorHandlingTest, AutoRecoverFlushError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.env = fault_env.get(); + options.listeners.emplace_back(listener); + Status s; + + listener->EnableAutoRecovery(); + DestroyAndReopen(options); + + Put(Key(0), "val"); + SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) { + fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError); + SyncPoint::GetInstance()->DisableProcessing(); + fault_env->SetFilesystemActive(true); + ASSERT_EQ(listener->WaitForRecovery(5000000), true); + + s = Put(Key(1), "val"); + ASSERT_EQ(s, Status::OK()); + + Reopen(options); + ASSERT_EQ("val", Get(Key(0))); + ASSERT_EQ("val", Get(Key(1))); + Destroy(options); +} + +TEST_F(DBErrorHandlingTest, FailRecoverFlushError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.env = fault_env.get(); + options.listeners.emplace_back(listener); + Status s; + + listener->EnableAutoRecovery(); + DestroyAndReopen(options); + + Put(Key(0), "val"); + SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) { + fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError); + // We should be able to shutdown the database while auto recovery is going + // on in the background + Close(); + DestroyDB(dbname_, options); +} + +TEST_F(DBErrorHandlingTest, WALWriteError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.writable_file_max_buffer_size = 32768; + options.env = fault_env.get(); + options.listeners.emplace_back(listener); + Status s; + + listener->EnableAutoRecovery(); + DestroyAndReopen(options); + + { + WriteBatch batch; + char val[1024]; + + for (auto i = 0; i<100; ++i) { + sprintf(val, "%d", i); + batch.Put(Key(i), Slice(val, sizeof(val))); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + { + WriteBatch batch; + char val[1024]; + int write_error = 0; + + for (auto i = 100; i<199; ++i) { + sprintf(val, "%d", i); + batch.Put(Key(i), Slice(val, sizeof(val))); + } + + SyncPoint::GetInstance()->SetCallBack("WritableFileWriter::Append:BeforePrepareWrite", [&](void*) { + write_error++; + if (write_error > 2) { + fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + WriteOptions wopts; + wopts.sync = true; + s = dbfull()->Write(wopts, &batch); + ASSERT_EQ(s, s.NoSpace()); + } + SyncPoint::GetInstance()->DisableProcessing(); + fault_env->SetFilesystemActive(true); + ASSERT_EQ(listener->WaitForRecovery(5000000), true); + for (auto i=0; i<199; ++i) { + if (i < 100) { + ASSERT_NE(Get(Key(i)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } + Reopen(options); + for (auto i=0; i<199; ++i) { + if (i < 100) { + ASSERT_NE(Get(Key(i)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } + Close(); +} + +TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.writable_file_max_buffer_size = 32768; + options.env = fault_env.get(); + options.listeners.emplace_back(listener); + Status s; + + listener->EnableAutoRecovery(); + CreateAndReopenWithCF({"one", "two", "three"}, options); + + { + WriteBatch batch; + char val[1024]; + + for (auto i = 1; i < 4; ++i) { + for (auto j = 0; j < 100; ++j) { + sprintf(val, "%d", j); + batch.Put(handles_[i], Key(j), Slice(val, sizeof(val))); + } + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + { + WriteBatch batch; + char val[1024]; + int write_error = 0; + + // Write to one CF + for (auto i = 100; i < 199; ++i) { + sprintf(val, "%d", i); + batch.Put(handles_[2], Key(i), Slice(val, sizeof(val))); + } + + SyncPoint::GetInstance()->SetCallBack( + "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) { + write_error++; + if (write_error > 2) { + fault_env->SetFilesystemActive(false, + Status::NoSpace("Out of space")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + WriteOptions wopts; + wopts.sync = true; + s = dbfull()->Write(wopts, &batch); + ASSERT_EQ(s, s.NoSpace()); + } + SyncPoint::GetInstance()->DisableProcessing(); + fault_env->SetFilesystemActive(true); + ASSERT_EQ(listener->WaitForRecovery(5000000), true); + + for (auto i = 1; i < 4; ++i) { + // Every CF should have been flushed + ASSERT_EQ(NumTableFilesAtLevel(0, i), 1); + } + + for (auto i = 1; i < 4; ++i) { + for (auto j = 0; j < 199; ++j) { + if (j < 100) { + ASSERT_NE(Get(i, Key(j)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND"); + } + } + } + ReopenWithColumnFamilies({"default", "one", "two", "three"}, options); + for (auto i = 1; i < 4; ++i) { + for (auto j = 0; j < 199; ++j) { + if (j < 100) { + ASSERT_NE(Get(i, Key(j)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND"); + } + } + } + Close(); +} + +TEST_F(DBErrorHandlingTest, MultiDBCompactionError) { + FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default()); + std::vector> fault_env; + std::vector options; + std::vector> listener; + std::vector db; + std::shared_ptr sfm(NewSstFileManager(def_env)); + int kNumDbInstances = 3; + + for (auto i = 0; i < kNumDbInstances; ++i) { + listener.emplace_back(new ErrorHandlerListener()); + options.emplace_back(GetDefaultOptions()); + fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default())); + options[i].create_if_missing = true; + options[i].level0_file_num_compaction_trigger = 2; + options[i].writable_file_max_buffer_size = 32768; + options[i].env = fault_env[i].get(); + options[i].listeners.emplace_back(listener[i]); + options[i].sst_file_manager = sfm; + DB* dbptr; + char buf[16]; + + listener[i]->EnableAutoRecovery(); + // Setup for returning error for the 3rd SST, which would be level 1 + listener[i]->InjectFileCreationError(fault_env[i].get(), 3, + Status::NoSpace("Out of space")); + snprintf(buf, sizeof(buf), "_%d", i); + DestroyDB(dbname_ + std::string(buf), options[i]); + ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr), + Status::OK()); + db.emplace_back(dbptr); + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + WriteBatch batch; + char val[1024]; + + for (auto j = 0; j <= 100; ++j) { + sprintf(val, "%d", j); + batch.Put(Key(j), Slice(val, sizeof(val))); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK()); + ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK()); + } + + def_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + for (auto i = 0; i < kNumDbInstances; ++i) { + WriteBatch batch; + char val[1024]; + + // Write to one CF + for (auto j = 100; j < 199; ++j) { + sprintf(val, "%d", j); + batch.Put(Key(j), Slice(val, sizeof(val))); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK()); + ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK()); + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + Status s = static_cast(db[i])->TEST_WaitForCompact(true); + ASSERT_EQ(s.severity(), Status::Severity::kSoftError); + fault_env[i]->SetFilesystemActive(true); + } + + def_env->SetFilesystemActive(true); + for (auto i = 0; i < kNumDbInstances; ++i) { + std::string prop; + ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true); + EXPECT_TRUE(db[i]->GetProperty( + "rocksdb.num-files-at-level" + NumberToString(0), &prop)); + EXPECT_EQ(atoi(prop.c_str()), 0); + EXPECT_TRUE(db[i]->GetProperty( + "rocksdb.num-files-at-level" + NumberToString(1), &prop)); + EXPECT_EQ(atoi(prop.c_str()), 1); + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + char buf[16]; + snprintf(buf, sizeof(buf), "_%d", i); + delete db[i]; + fault_env[i]->SetFilesystemActive(true); + if (getenv("KEEP_DB")) { + printf("DB is still at %s%s\n", dbname_.c_str(), buf); + } else { + Status s = DestroyDB(dbname_ + std::string(buf), options[i]); + } + } + options.clear(); + sfm.reset(); + delete def_env; +} + +TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) { + FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default()); + std::vector> fault_env; + std::vector options; + std::vector> listener; + std::vector db; + std::shared_ptr sfm(NewSstFileManager(def_env)); + int kNumDbInstances = 3; + + for (auto i = 0; i < kNumDbInstances; ++i) { + listener.emplace_back(new ErrorHandlerListener()); + options.emplace_back(GetDefaultOptions()); + fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default())); + options[i].create_if_missing = true; + options[i].level0_file_num_compaction_trigger = 2; + options[i].writable_file_max_buffer_size = 32768; + options[i].env = fault_env[i].get(); + options[i].listeners.emplace_back(listener[i]); + options[i].sst_file_manager = sfm; + DB* dbptr; + char buf[16]; + + listener[i]->EnableAutoRecovery(); + switch (i) { + case 0: + // Setup for returning error for the 3rd SST, which would be level 1 + listener[i]->InjectFileCreationError(fault_env[i].get(), 3, + Status::NoSpace("Out of space")); + break; + case 1: + // Setup for returning error after the 1st SST, which would result + // in a hard error + listener[i]->InjectFileCreationError(fault_env[i].get(), 2, + Status::NoSpace("Out of space")); + break; + default: + break; + } + snprintf(buf, sizeof(buf), "_%d", i); + DestroyDB(dbname_ + std::string(buf), options[i]); + ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr), + Status::OK()); + db.emplace_back(dbptr); + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + WriteBatch batch; + char val[1024]; + + for (auto j = 0; j <= 100; ++j) { + sprintf(val, "%d", j); + batch.Put(Key(j), Slice(val, sizeof(val))); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK()); + ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK()); + } + + def_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + for (auto i = 0; i < kNumDbInstances; ++i) { + WriteBatch batch; + char val[1024]; + + // Write to one CF + for (auto j = 100; j < 199; ++j) { + sprintf(val, "%d", j); + batch.Put(Key(j), Slice(val, sizeof(val))); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK()); + if (i != 1) { + ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK()); + } else { + ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::NoSpace()); + } + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + Status s = static_cast(db[i])->TEST_WaitForCompact(true); + switch (i) { + case 0: + ASSERT_EQ(s.severity(), Status::Severity::kSoftError); + break; + case 1: + ASSERT_EQ(s.severity(), Status::Severity::kHardError); + break; + case 2: + ASSERT_EQ(s, Status::OK()); + break; + } + fault_env[i]->SetFilesystemActive(true); + } + + def_env->SetFilesystemActive(true); + for (auto i = 0; i < kNumDbInstances; ++i) { + std::string prop; + if (i < 2) { + ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true); + } + if (i == 1) { + ASSERT_EQ(static_cast(db[i])->TEST_WaitForCompact(true), + Status::OK()); + } + EXPECT_TRUE(db[i]->GetProperty( + "rocksdb.num-files-at-level" + NumberToString(0), &prop)); + EXPECT_EQ(atoi(prop.c_str()), 0); + EXPECT_TRUE(db[i]->GetProperty( + "rocksdb.num-files-at-level" + NumberToString(1), &prop)); + EXPECT_EQ(atoi(prop.c_str()), 1); + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + char buf[16]; + snprintf(buf, sizeof(buf), "_%d", i); + fault_env[i]->SetFilesystemActive(true); + delete db[i]; + if (getenv("KEEP_DB")) { + printf("DB is still at %s%s\n", dbname_.c_str(), buf); + } else { + DestroyDB(dbname_ + std::string(buf), options[i]); + } + } + options.clear(); + delete def_env; +} +#endif + } // namespace rocksdb int main(int argc, char** argv) { @@ -136,3 +691,13 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } + +#else +#include + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // ROCKSDB_LITE diff --git a/db/event_helpers.cc b/db/event_helpers.cc index e4fd64226..c80c5aefb 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -40,8 +40,8 @@ void EventHelpers::NotifyTableFileCreationStarted( void EventHelpers::NotifyOnBackgroundError( const std::vector>& listeners, - BackgroundErrorReason reason, Status* bg_error, - InstrumentedMutex* db_mutex) { + BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex, + bool* auto_recovery) { #ifndef ROCKSDB_LITE if (listeners.size() == 0U) { return; @@ -51,6 +51,9 @@ void EventHelpers::NotifyOnBackgroundError( db_mutex->Unlock(); for (auto& listener : listeners) { listener->OnBackgroundError(reason, bg_error); + if (*auto_recovery) { + listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery); + } } db_mutex->Lock(); #else @@ -58,6 +61,7 @@ void EventHelpers::NotifyOnBackgroundError( (void)reason; (void)bg_error; (void)db_mutex; + (void)auto_recovery; #endif // ROCKSDB_LITE } @@ -167,4 +171,25 @@ void EventHelpers::LogAndNotifyTableFileDeletion( #endif // !ROCKSDB_LITE } +void EventHelpers::NotifyOnErrorRecoveryCompleted( + const std::vector>& listeners, + Status old_bg_error, InstrumentedMutex* db_mutex) { +#ifndef ROCKSDB_LITE + if (listeners.size() == 0U) { + return; + } + db_mutex->AssertHeld(); + // release lock while notifying events + db_mutex->Unlock(); + for (auto& listener : listeners) { + listener->OnErrorRecoveryCompleted(old_bg_error); + } + db_mutex->Lock(); +#else + (void)listeners; + (void)old_bg_error; + (void)db_mutex; +#endif // ROCKSDB_LITE +} + } // namespace rocksdb diff --git a/db/event_helpers.h b/db/event_helpers.h index 674e6c5f6..ea35b4b5b 100644 --- a/db/event_helpers.h +++ b/db/event_helpers.h @@ -28,7 +28,7 @@ class EventHelpers { static void NotifyOnBackgroundError( const std::vector>& listeners, BackgroundErrorReason reason, Status* bg_error, - InstrumentedMutex* db_mutex); + InstrumentedMutex* db_mutex, bool* auto_recovery); static void LogAndNotifyTableFileCreationFinished( EventLogger* event_logger, const std::vector>& listeners, @@ -41,6 +41,9 @@ class EventHelpers { uint64_t file_number, const std::string& file_path, const Status& status, const std::string& db_name, const std::vector>& listeners); + static void NotifyOnErrorRecoveryCompleted( + const std::vector>& listeners, + Status bg_error, InstrumentedMutex* db_mutex); private: static void LogAndNotifyTableFileCreation( diff --git a/db/flush_job.cc b/db/flush_job.cc index 1fee162ab..b64712fd1 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -78,6 +78,8 @@ const char* GetFlushReasonString (FlushReason flush_reason) { return "Auto Compaction"; case FlushReason::kManualFlush: return "Manual Flush"; + case FlushReason::kErrorRecovery: + return "Error Recovery"; default: return "Invalid"; } diff --git a/db/listener_test.cc b/db/listener_test.cc index 33c916119..77afcd9ed 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -882,10 +882,13 @@ TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) { ASSERT_EQ(1, listener->counter()); // trigger flush so compaction is triggered again; this time it succeeds + // The previous failed compaction may get retried automatically, so we may + // be left with 0 or 1 files in level 1, depending on when the retry gets + // scheduled ASSERT_OK(Put("key0", "val")); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); ASSERT_OK(dbfull()->TEST_WaitForCompact()); - ASSERT_EQ(0, NumTableFilesAtLevel(0)); + ASSERT_LE(1, NumTableFilesAtLevel(0)); } } // namespace rocksdb diff --git a/env/env_posix.cc b/env/env_posix.cc index db28a2f1a..2b53fd8ab 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -25,6 +25,7 @@ #include #include #endif +#include #include #include #include @@ -776,6 +777,18 @@ class PosixEnv : public Env { return gettid(pthread_self()); } + virtual Status GetFreeSpace(const std::string& fname, + uint64_t* free_space) override { + struct statvfs sbuf; + + if (statvfs(fname.c_str(), &sbuf) < 0) { + return IOError("While doing statvfs", fname, errno); + } + + *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree); + return Status::OK(); + } + virtual Status NewLogger(const std::string& fname, shared_ptr* result) override { FILE* f; diff --git a/env/posix_logger.h b/env/posix_logger.h index e983ba704..401df6a3f 100644 --- a/env/posix_logger.h +++ b/env/posix_logger.h @@ -165,7 +165,6 @@ class PosixLogger : public Logger { size_t sz = fwrite(base, 1, write_size, file_); flush_pending_ = true; - assert(sz == write_size); if (sz > 0) { log_size_ += write_size; } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 99127f766..755836461 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -477,6 +477,15 @@ class Env { // Returns the ID of the current thread. virtual uint64_t GetThreadID() const; +// This seems to clash with a macro on Windows, so #undef it here +#undef GetFreeSpace + + // Get the amount of free disk space + virtual Status GetFreeSpace(const std::string& /*path*/, + uint64_t* /*diskfree*/) { + return Status::NotSupported(); + } + protected: // The pointer to an internal structure that will update the // status of each thread. diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 601951cd0..46ce712dc 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -27,6 +27,7 @@ enum class TableFileCreationReason { kFlush, kCompaction, kRecovery, + kMisc, }; struct TableFileCreationBriefInfo { @@ -103,6 +104,7 @@ enum class FlushReason : int { kDeleteFiles = 0x08, kAutoCompaction = 0x09, kManualFlush = 0x0a, + kErrorRecovery = 0xb, }; enum class BackgroundErrorReason { @@ -393,6 +395,21 @@ class EventListener { // returns. Otherwise, RocksDB may be blocked. virtual void OnStallConditionsChanged(const WriteStallInfo& /*info*/) {} + // A callback function for RocksDB which will be called just before + // starting the automatic recovery process for recoverable background + // errors, such as NoSpace(). The callback can suppress the automatic + // recovery by setting *auto_recovery to false. The database will then + // have to be transitioned out of read-only mode by calling DB::Resume() + virtual void OnErrorRecoveryBegin(BackgroundErrorReason /* reason */, + Status /* bg_error */, + bool* /* auto_recovery */) {} + + // A callback function for RocksDB which will be called once the database + // is recovered from read-only mode after an error. When this is called, it + // means normal writes to the database can be issued and the user can + // initiate any further recovery actions needed + virtual void OnErrorRecoveryCompleted(Status /* old_bg_error */) {} + virtual ~EventListener() {} }; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index c32822c8f..c8ccfbd22 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1983,6 +1983,52 @@ class Benchmark { bool report_file_operations_; bool use_blob_db_; + class ErrorHandlerListener : public EventListener { + public: + ErrorHandlerListener() + : mutex_(), + cv_(&mutex_), + no_auto_recovery_(false), + recovery_complete_(false) {} + + ~ErrorHandlerListener() {} + + void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/, + Status /*bg_error*/, bool* auto_recovery) { + if (*auto_recovery && no_auto_recovery_) { + *auto_recovery = false; + } + } + + void OnErrorRecoveryCompleted(Status /*old_bg_error*/) { + InstrumentedMutexLock l(&mutex_); + recovery_complete_ = true; + cv_.SignalAll(); + } + + bool WaitForRecovery(uint64_t /*abs_time_us*/) { + InstrumentedMutexLock l(&mutex_); + if (!recovery_complete_) { + cv_.Wait(/*abs_time_us*/); + } + if (recovery_complete_) { + recovery_complete_ = false; + return true; + } + return false; + } + + void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; } + + private: + InstrumentedMutex mutex_; + InstrumentedCondVar cv_; + bool no_auto_recovery_; + bool recovery_complete_; + }; + + std::shared_ptr listener_; + bool SanityCheck() { if (FLAGS_compression_ratio > 1) { fprintf(stderr, "compression_ratio should be between 0 and 1\n"); @@ -2318,6 +2364,8 @@ class Benchmark { } } } + + listener_.reset(new ErrorHandlerListener()); } ~Benchmark() { @@ -3500,6 +3548,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { FLAGS_rate_limiter_auto_tuned)); } + options.listeners.emplace_back(listener_); if (FLAGS_num_multi_db <= 1) { OpenDb(options, FLAGS_db, &db_); } else { @@ -3892,6 +3941,10 @@ void VerifyDBFromDB(std::string& truth_db_name) { NewGenericRateLimiter(write_rate)); } } + if (!s.ok()) { + s = listener_->WaitForRecovery(600000000) ? Status::OK() : s; + } + if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); diff --git a/util/delete_scheduler_test.cc b/util/delete_scheduler_test.cc index 6c33f03da..bfd9954de 100644 --- a/util/delete_scheduler_test.cc +++ b/util/delete_scheduler_test.cc @@ -89,7 +89,7 @@ class DeleteSchedulerTest : public testing::Test { std::string data(size, 'A'); EXPECT_OK(f->Append(data)); EXPECT_OK(f->Close()); - sst_file_mgr_->OnAddFile(file_path); + sst_file_mgr_->OnAddFile(file_path, false); return file_path; } diff --git a/util/fault_injection_test_env.h b/util/fault_injection_test_env.h index f2d4aaad2..a8a8e38dc 100644 --- a/util/fault_injection_test_env.h +++ b/util/fault_injection_test_env.h @@ -115,6 +115,16 @@ class FaultInjectionTestEnv : public EnvWrapper { virtual Status RenameFile(const std::string& s, const std::string& t) override; + virtual Status GetFreeSpace(const std::string& path, + uint64_t* disk_free) override { + if (!IsFilesystemActive() && error_ == Status::NoSpace()) { + *disk_free = 0; + return Status::OK(); + } else { + return target()->GetFreeSpace(path, disk_free); + } + } + void WritableFileClosed(const FileState& state); // For every file that is not fully synced, make a call to `func` with diff --git a/util/sst_file_manager_impl.cc b/util/sst_file_manager_impl.cc index 74164e06a..a1280622b 100644 --- a/util/sst_file_manager_impl.cc +++ b/util/sst_file_manager_impl.cc @@ -7,6 +7,7 @@ #include +#include "db/db_impl.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/sst_file_manager.h" @@ -23,20 +24,38 @@ SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr logger, : env_(env), logger_(logger), total_files_size_(0), + in_progress_files_size_(0), compaction_buffer_size_(0), cur_compactions_reserved_size_(0), max_allowed_space_(0), delete_scheduler_(env, rate_bytes_per_sec, logger.get(), this, - max_trash_db_ratio, bytes_max_delete_chunk) {} + max_trash_db_ratio, bytes_max_delete_chunk), + cv_(&mu_), + closing_(false), + bg_thread_(nullptr), + reserved_disk_buffer_(0), + free_space_trigger_(0), + cur_instance_(nullptr) { +} -SstFileManagerImpl::~SstFileManagerImpl() {} +SstFileManagerImpl::~SstFileManagerImpl() { + { + MutexLock l(&mu_); + closing_ = true; + cv_.SignalAll(); + } + if (bg_thread_) { + bg_thread_->join(); + } +} -Status SstFileManagerImpl::OnAddFile(const std::string& file_path) { +Status SstFileManagerImpl::OnAddFile(const std::string& file_path, + bool compaction) { uint64_t file_size; Status s = env_->GetFileSize(file_path, &file_size); if (s.ok()) { MutexLock l(&mu_); - OnAddFileImpl(file_path, file_size); + OnAddFileImpl(file_path, file_size, compaction); } TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile"); return s; @@ -61,6 +80,19 @@ void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) { } } cur_compactions_reserved_size_ -= size_added_by_compaction; + + auto new_files = c->edit()->GetNewFiles(); + for (auto& new_file : new_files) { + auto fn = TableFileName(c->immutable_cf_options()->cf_paths, + new_file.second.fd.GetNumber(), + new_file.second.fd.GetPathId()); + if (in_progress_files_.find(fn) != in_progress_files_.end()) { + auto tracked_file = tracked_files_.find(fn); + assert(tracked_file != tracked_files_.end()); + in_progress_files_size_ -= tracked_file->second; + in_progress_files_.erase(fn); + } + } } Status SstFileManagerImpl::OnMoveFile(const std::string& old_path, @@ -71,7 +103,7 @@ Status SstFileManagerImpl::OnMoveFile(const std::string& old_path, if (file_size != nullptr) { *file_size = tracked_files_[old_path]; } - OnAddFileImpl(new_path, tracked_files_[old_path]); + OnAddFileImpl(new_path, tracked_files_[old_path], false); OnDeleteFileImpl(old_path); } TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile"); @@ -107,7 +139,8 @@ bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() { } bool SstFileManagerImpl::EnoughRoomForCompaction( - const std::vector& inputs) { + ColumnFamilyData* cfd, const std::vector& inputs, + Status bg_error) { MutexLock l(&mu_); uint64_t size_added_by_compaction = 0; // First check if we even have the space to do the compaction @@ -118,15 +151,47 @@ bool SstFileManagerImpl::EnoughRoomForCompaction( } } - if (max_allowed_space_ != 0 && - (size_added_by_compaction + cur_compactions_reserved_size_ + - total_files_size_ + compaction_buffer_size_ > - max_allowed_space_)) { - return false; - } // Update cur_compactions_reserved_size_ so concurrent compaction // don't max out space + size_t needed_headroom = + cur_compactions_reserved_size_ + size_added_by_compaction + + compaction_buffer_size_; + if (max_allowed_space_ != 0 && + (needed_headroom + total_files_size_ > max_allowed_space_)) { + return false; + } + + // Implement more aggressive checks only if this DB instance has already + // seen a NoSpace() error. This is tin order to contain a single potentially + // misbehaving DB instance and prevent it from slowing down compactions of + // other DB instances + if (CheckFreeSpace() && bg_error == Status::NoSpace()) { + auto fn = + TableFileName(cfd->ioptions()->cf_paths, inputs[0][0]->fd.GetNumber(), + inputs[0][0]->fd.GetPathId()); + uint64_t free_space = 0; + env_->GetFreeSpace(fn, &free_space); + // needed_headroom is based on current size reserved by compactions, + // minus any files created by running compactions as they would count + // against the reserved size. If user didn't specify any compaction + // buffer, add reserved_disk_buffer_ that's calculated by default so the + // compaction doesn't end up leaving nothing for logs and flush SSTs + if (compaction_buffer_size_ == 0) { + needed_headroom += reserved_disk_buffer_; + } + needed_headroom -= in_progress_files_size_; + if (free_space < needed_headroom + size_added_by_compaction) { + // We hit the condition of not enough disk space + ROCKS_LOG_ERROR(logger_, "free space [%d bytes] is less than " + "needed headroom [%d bytes]\n", free_space, needed_headroom); + return false; + } + } + cur_compactions_reserved_size_ += size_added_by_compaction; + // Take a snapshot of cur_compactions_reserved_size_ for when we encounter + // a NoSpace error. + free_space_trigger_ = cur_compactions_reserved_size_; return true; } @@ -166,6 +231,169 @@ uint64_t SstFileManagerImpl::GetTotalTrashSize() { return delete_scheduler_.GetTotalTrashSize(); } +void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size, + const std::string& path) { + MutexLock l(&mu_); + + reserved_disk_buffer_ += size; + if (path_.empty()) { + path_ = path; + } +} + +void SstFileManagerImpl::ClearError() { + while (true) { + MutexLock l(&mu_); + + if (closing_) { + return; + } + + uint64_t free_space; + Status s = env_->GetFreeSpace(path_, &free_space); + if (s.ok()) { + // In case of multi-DB instances, some of them may have experienced a + // soft error and some a hard error. In the SstFileManagerImpl, a hard + // error will basically override previously reported soft errors. Once + // we clear the hard error, we don't keep track of previous errors for + // now + if (bg_err_.severity() == Status::Severity::kHardError) { + if (free_space < reserved_disk_buffer_) { + ROCKS_LOG_ERROR(logger_, "free space [%d bytes] is less than " + "required disk buffer [%d bytes]\n", free_space, + reserved_disk_buffer_); + ROCKS_LOG_ERROR(logger_, "Cannot clear hard error\n"); + s = Status::NoSpace(); + } + } else if (bg_err_.severity() == Status::Severity::kSoftError) { + if (free_space < free_space_trigger_) { + ROCKS_LOG_WARN(logger_, "free space [%d bytes] is less than " + "free space for compaction trigger [%d bytes]\n", free_space, + free_space_trigger_); + ROCKS_LOG_WARN(logger_, "Cannot clear soft error\n"); + s = Status::NoSpace(); + } + } + } + + // Someone could have called CancelErrorRecovery() and the list could have + // become empty, so check again here + if (s.ok() && !error_handler_list_.empty()) { + auto error_handler = error_handler_list_.front(); + // Since we will release the mutex, set cur_instance_ to signal to the + // shutdown thread, if it calls // CancelErrorRecovery() the meantime, + // to indicate that this DB instance is busy. The DB instance is + // guaranteed to not be deleted before RecoverFromBGError() returns, + // since the ErrorHandler::recovery_in_prog_ flag would be true + cur_instance_ = error_handler; + mu_.Unlock(); + s = error_handler->RecoverFromBGError(); + mu_.Lock(); + // The DB instance might have been deleted while we were + // waiting for the mutex, so check cur_instance_ to make sure its + // still non-null + if (cur_instance_) { + // Check for error again, since the instance may have recovered but + // immediately got another error. If that's the case, and the new + // error is also a NoSpace() non-fatal error, leave the instance in + // the list + Status err = cur_instance_->GetBGError(); + if (s.ok() && err == Status::NoSpace() && + err.severity() < Status::Severity::kFatalError) { + s = err; + } + cur_instance_ = nullptr; + } + + if (s.ok() || s.IsShutdownInProgress() || + (!s.ok() && s.severity() >= Status::Severity::kFatalError)) { + // If shutdown is in progress, abandon this handler instance + // and continue with the others + error_handler_list_.pop_front(); + } + } + + if (!error_handler_list_.empty()) { + // If there are more instances to be recovered, reschedule after 5 + // seconds + int64_t wait_until = env_->NowMicros() + 5000000; + cv_.TimedWait(wait_until); + } + + // Check again for error_handler_list_ empty, as a DB instance shutdown + // could have removed it from the queue while we were in timed wait + if (error_handler_list_.empty()) { + ROCKS_LOG_INFO(logger_, "Clearing error\n"); + bg_err_ = Status::OK(); + return; + } + } +} + +void SstFileManagerImpl::StartErrorRecovery(ErrorHandler* handler, + Status bg_error) { + MutexLock l(&mu_); + if (bg_error.severity() == Status::Severity::kSoftError) { + if (bg_err_.ok()) { + // Setting bg_err_ basically means we're in degraded mode + // Assume that all pending compactions will fail similarly. The trigger + // for clearing this condition is set to current compaction reserved + // size, so we stop checking disk space available in + // EnoughRoomForCompaction once this much free space is available + bg_err_ = bg_error; + } + } else if (bg_error.severity() == Status::Severity::kHardError) { + bg_err_ = bg_error; + } else { + assert(false); + } + + // If this is the first instance of this error, kick of a thread to poll + // and recover from this condition + if (error_handler_list_.empty()) { + error_handler_list_.push_back(handler); + // Release lock before calling join. Its ok to do so because + // error_handler_list_ is now non-empty, so no other invocation of this + // function will execute this piece of code + mu_.Unlock(); + if (bg_thread_) { + bg_thread_->join(); + } + // Start a new thread. The previous one would have exited. + bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this)); + mu_.Lock(); + } else { + // Check if this DB instance is already in the list + for (auto iter = error_handler_list_.begin(); + iter != error_handler_list_.end(); ++iter) { + if ((*iter) == handler) { + return; + } + } + error_handler_list_.push_back(handler); + } +} + +bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) { + MutexLock l(&mu_); + + if (cur_instance_ == handler) { + // This instance is currently busy attempting to recover + // Nullify it so the recovery thread doesn't attempt to access it again + cur_instance_ = nullptr; + return false; + } + + for (auto iter = error_handler_list_.begin(); + iter != error_handler_list_.end(); ++iter) { + if ((*iter) == handler) { + error_handler_list_.erase(iter); + return true; + } + } + return false; +} + Status SstFileManagerImpl::ScheduleFileDeletion( const std::string& file_path, const std::string& path_to_sync) { return delete_scheduler_.DeleteFile(file_path, path_to_sync); @@ -176,14 +404,24 @@ void SstFileManagerImpl::WaitForEmptyTrash() { } void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path, - uint64_t file_size) { + uint64_t file_size, bool compaction) { auto tracked_file = tracked_files_.find(file_path); if (tracked_file != tracked_files_.end()) { // File was added before, we will just update the size + assert(!compaction); total_files_size_ -= tracked_file->second; total_files_size_ += file_size; + cur_compactions_reserved_size_ -= file_size; } else { total_files_size_ += file_size; + if (compaction) { + // Keep track of the size of files created by in-progress compactions. + // When calculating whether there's enough headroom for new compactions, + // this will be subtracted from cur_compactions_reserved_size_. + // Otherwise, compactions will be double counted. + in_progress_files_size_ += file_size; + in_progress_files_.insert(file_path); + } } tracked_files_[file_path] = file_size; } @@ -192,10 +430,16 @@ void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) { auto tracked_file = tracked_files_.find(file_path); if (tracked_file == tracked_files_.end()) { // File is not tracked + assert(in_progress_files_.find(file_path) == in_progress_files_.end()); return; } total_files_size_ -= tracked_file->second; + // Check if it belonged to an in-progress compaction + if (in_progress_files_.find(file_path) != in_progress_files_.end()) { + in_progress_files_size_ -= tracked_file->second; + in_progress_files_.erase(file_path); + } tracked_files_.erase(tracked_file); } diff --git a/util/sst_file_manager_impl.h b/util/sst_file_manager_impl.h index 90815d44f..8c8015310 100644 --- a/util/sst_file_manager_impl.h +++ b/util/sst_file_manager_impl.h @@ -12,6 +12,7 @@ #include "port/port.h" #include "db/compaction.h" +#include "db/error_handler.h" #include "rocksdb/sst_file_manager.h" #include "util/delete_scheduler.h" @@ -33,7 +34,7 @@ class SstFileManagerImpl : public SstFileManager { ~SstFileManagerImpl(); // DB will call OnAddFile whenever a new sst file is added. - Status OnAddFile(const std::string& file_path); + Status OnAddFile(const std::string& file_path, bool compaction = false); // DB will call OnDeleteFile whenever an sst file is deleted. Status OnDeleteFile(const std::string& file_path); @@ -67,7 +68,9 @@ class SstFileManagerImpl : public SstFileManager { // estimates how much space is currently being used by compactions (i.e. // if a compaction has started, this function bumps the used space by // the full compaction size). - bool EnoughRoomForCompaction(const std::vector& inputs); + bool EnoughRoomForCompaction(ColumnFamilyData* cfd, + const std::vector& inputs, + Status bg_error); // Bookkeeping so total_file_sizes_ goes back to normal after compaction // finishes @@ -96,6 +99,18 @@ class SstFileManagerImpl : public SstFileManager { // Return the total size of trash files uint64_t GetTotalTrashSize() override; + // Called by each DB instance using this sst file manager to reserve + // disk buffer space for recovery from out of space errors + void ReserveDiskBuffer(uint64_t buffer, const std::string& path); + + // Set a flag upon encountering disk full. May enqueue the ErrorHandler + // instance for background polling and recovery + void StartErrorRecovery(ErrorHandler* db, Status bg_error); + + // Remove the given Errorhandler instance from the recovery queue. Its + // not guaranteed + bool CancelErrorRecovery(ErrorHandler* db); + // Mark file as trash and schedule it's deletion. virtual Status ScheduleFileDeletion(const std::string& file_path, const std::string& dir_to_sync); @@ -108,16 +123,24 @@ class SstFileManagerImpl : public SstFileManager { private: // REQUIRES: mutex locked - void OnAddFileImpl(const std::string& file_path, uint64_t file_size); + void OnAddFileImpl(const std::string& file_path, uint64_t file_size, + bool compaction); // REQUIRES: mutex locked void OnDeleteFileImpl(const std::string& file_path); + void ClearError(); + bool CheckFreeSpace() { + return bg_err_.severity() == Status::Severity::kSoftError; + } + Env* env_; std::shared_ptr logger_; // Mutex to protect tracked_files_, total_files_size_ port::Mutex mu_; // The summation of the sizes of all files in tracked_files_ map uint64_t total_files_size_; + // The summation of all output files of in-progress compactions + uint64_t in_progress_files_size_; // Compactions should only execute if they can leave at least // this amount of buffer space for logs and flushes uint64_t compaction_buffer_size_; @@ -126,10 +149,32 @@ class SstFileManagerImpl : public SstFileManager { // A map containing all tracked files and there sizes // file_path => file_size std::unordered_map tracked_files_; + // A set of files belonging to in-progress compactions + std::unordered_set in_progress_files_; // The maximum allowed space (in bytes) for sst files. uint64_t max_allowed_space_; // DeleteScheduler used to throttle file deletition. DeleteScheduler delete_scheduler_; + port::CondVar cv_; + // Flag to force error recovery thread to exit + bool closing_; + // Background error recovery thread + std::unique_ptr bg_thread_; + // A path in the filesystem corresponding to this SFM. This is used for + // calling Env::GetFreeSpace. Posix requires a path in the filesystem + std::string path_; + // Save the current background error + Status bg_err_; + // Amount of free disk headroom before allowing recovery from hard errors + uint64_t reserved_disk_buffer_; + // For soft errors, amount of free disk space before we can allow + // compactions to run full throttle. If disk space is below this trigger, + // compactions will be gated by free disk space > input size + uint64_t free_space_trigger_; + // List of database error handler instances tracked by this sst file manager + std::list error_handler_list_; + // Pointer to ErrorHandler instance that is currently processing recovery + ErrorHandler* cur_instance_; }; } // namespace rocksdb