From a27fce408e197f68d4d4a613aefc1d84b9a57058 Mon Sep 17 00:00:00 2001 From: Anand Ananthabhotla Date: Sat, 15 Sep 2018 13:36:19 -0700 Subject: [PATCH] Auto recovery from out of space errors (#4164) Summary: This commit implements automatic recovery from a Status::NoSpace() error during background operations such as write callback, flush and compaction. The broad design is as follows - 1. Compaction errors are treated as soft errors and don't put the database in read-only mode. A compaction is delayed until enough free disk space is available to accomodate the compaction outputs, which is estimated based on the input size. This means that users can continue to write, and we rely on the WriteController to delay or stop writes if the compaction debt becomes too high due to persistent low disk space condition 2. Errors during write callback and flush are treated as hard errors, i.e the database is put in read-only mode and goes back to read-write only fater certain recovery actions are taken. 3. Both types of recovery rely on the SstFileManagerImpl to poll for sufficient disk space. We assume that there is a 1-1 mapping between an SFM and the underlying OS storage container. For cases where multiple DBs are hosted on a single storage container, the user is expected to allocate a single SFM instance and use the same one for all the DBs. If no SFM is specified by the user, DBImpl::Open() will allocate one, but this will be one per DB and each DB will recover independently. The recovery implemented by SFM is as follows - a) On the first occurance of an out of space error during compaction, subsequent compactions will be delayed until the disk free space check indicates enough available space. The required space is computed as the sum of input sizes. b) The free space check requirement will be removed once the amount of free space is greater than the size reserved by in progress compactions when the first error occured c) If the out of space error is a hard error, a background thread in SFM will poll for sufficient headroom before triggering the recovery of the database and putting it in write-only mode. The headroom is calculated as the sum of the write_buffer_size of all the DB instances associated with the SFM 4. EventListener callbacks will be called at the start and completion of automatic recovery. Users can disable the auto recov ery in the start callback, and later initiate it manually by calling DB::Resume() Todo: 1. More extensive testing 2. Add disk full condition to db_stress (follow-on PR) Pull Request resolved: https://github.com/facebook/rocksdb/pull/4164 Differential Revision: D9846378 Pulled By: anand1976 fbshipit-source-id: 80ea875dbd7f00205e19c82215ff6e37da10da4a --- db/compaction_job_test.cc | 2 +- db/db_impl.cc | 105 +++++- db/db_impl.h | 19 +- db/db_impl_compaction_flush.cc | 132 +++++++- db/db_impl_debug.cc | 2 +- db/db_impl_open.cc | 23 +- db/db_impl_write.cc | 28 +- db/error_handler.cc | 183 +++++++++- db/error_handler.h | 65 ++-- db/error_handler_test.cc | 571 +++++++++++++++++++++++++++++++- db/event_helpers.cc | 29 +- db/event_helpers.h | 5 +- db/flush_job.cc | 2 + db/listener_test.cc | 5 +- env/env_posix.cc | 13 + env/posix_logger.h | 1 - include/rocksdb/env.h | 9 + include/rocksdb/listener.h | 17 + tools/db_bench_tool.cc | 53 +++ util/delete_scheduler_test.cc | 2 +- util/fault_injection_test_env.h | 10 + util/sst_file_manager_impl.cc | 270 ++++++++++++++- util/sst_file_manager_impl.h | 51 ++- 23 files changed, 1511 insertions(+), 86 deletions(-) diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 3febded80..3eb5dd704 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -79,7 +79,7 @@ class CompactionJobTest : public testing::Test { shutting_down_(false), preserve_deletes_seqnum_(0), mock_table_factory_(new mock::MockTableFactory()), - error_handler_(db_options_, &mutex_) { + error_handler_(nullptr, db_options_, &mutex_) { EXPECT_OK(env_->CreateDirIfMissing(dbname_)); db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); diff --git a/db/db_impl.cc b/db/db_impl.cc index 7a9e49cc2..a583d3392 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -215,9 +215,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, // requires a custom gc for compaction, we use that to set use_custom_gc_ // as well. use_custom_gc_(seq_per_batch), + shutdown_initiated_(false), preserve_deletes_(options.preserve_deletes), closed_(false), - error_handler_(immutable_db_options_, &mutex_) { + error_handler_(this, immutable_db_options_, &mutex_) { // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); @@ -259,16 +260,62 @@ Status DBImpl::Resume() { return Status::OK(); } - Status s = error_handler_.GetBGError(); - if (s.severity() > Status::Severity::kHardError) { + if (error_handler_.IsRecoveryInProgress()) { + // Don't allow a mix of manual and automatic recovery + return Status::Busy(); + } + + mutex_.Unlock(); + Status s = error_handler_.RecoverFromBGError(true); + mutex_.Lock(); + return s; +} + +// This function implements the guts of recovery from a background error. It +// is eventually called for both manual as well as automatic recovery. It does +// the following - +// 1. Wait for currently scheduled background flush/compaction to exit, in +// order to inadvertently causing an error and thinking recovery failed +// 2. Flush memtables if there's any data for all the CFs. This may result +// another error, which will be saved by error_handler_ and reported later +// as the recovery status +// 3. Find and delete any obsolete files +// 4. Schedule compactions if needed for all the CFs. This is needed as the +// flush in the prior step might have been a no-op for some CFs, which +// means a new super version wouldn't have been installed +Status DBImpl::ResumeImpl() { + mutex_.AssertHeld(); + WaitForBackgroundWork(); + + Status bg_error = error_handler_.GetBGError(); + Status s; + if (shutdown_initiated_) { + // Returning shutdown status to SFM during auto recovery will cause it + // to abort the recovery and allow the shutdown to progress + s = Status::ShutdownInProgress(); + } + if (s.ok() && bg_error.severity() > Status::Severity::kHardError) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB resume requested but failed due to Fatal/Unrecoverable error"); - return s; + s = bg_error; + } + + // We cannot guarantee consistency of the WAL. So force flush Memtables of + // all the column families + if (s.ok()) { + s = FlushAllCFs(FlushReason::kErrorRecovery); + if (!s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "DB resume requested but failed due to Flush failure [%s]", + s.ToString().c_str()); + } } JobContext job_context(0); FindObsoleteFiles(&job_context, true); - error_handler_.ClearBGError(); + if (s.ok()) { + s = error_handler_.ClearBGError(); + } mutex_.Unlock(); job_context.manifest_file_number = 1; @@ -277,13 +324,36 @@ Status DBImpl::Resume() { } job_context.Clean(); - ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB"); + if (s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB"); + } mutex_.Lock(); - MaybeScheduleFlushOrCompaction(); + // Check for shutdown again before scheduling further compactions, + // since we released and re-acquired the lock above + if (shutdown_initiated_) { + s = Status::ShutdownInProgress(); + } + if (s.ok()) { + for (auto cfd : *versions_->GetColumnFamilySet()) { + SchedulePendingCompaction(cfd); + } + MaybeScheduleFlushOrCompaction(); + } + + // Wake up any waiters - in this case, it could be the shutdown thread + bg_cv_.SignalAll(); // No need to check BGError again. If something happened, event listener would be // notified and the operation causing it would have failed - return Status::OK(); + return s; +} + +void DBImpl::WaitForBackgroundWork() { + // Wait for background work to finish + while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || + bg_flush_scheduled_) { + bg_cv_.Wait(); + } } // Will lock the mutex_, will wait for completion if wait is true @@ -313,14 +383,20 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { if (!wait) { return; } - // Wait for background work to finish - while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || - bg_flush_scheduled_) { - bg_cv_.Wait(); - } + WaitForBackgroundWork(); } Status DBImpl::CloseHelper() { + // Guarantee that there is no background error recovery in progress before + // continuing with the shutdown + mutex_.Lock(); + shutdown_initiated_ = true; + error_handler_.CancelErrorRecovery(); + while (error_handler_.IsRecoveryInProgress()) { + bg_cv_.Wait(); + } + mutex_.Unlock(); + // CancelAllBackgroundWork called with false means we just set the shutdown // marker. After this we do a variant of the waiting and unschedule work // (to consider: moving all the waiting into CancelAllBackgroundWork(true)) @@ -338,7 +414,8 @@ Status DBImpl::CloseHelper() { // Wait for background work to finish while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || bg_flush_scheduled_ || bg_purge_scheduled_ || - pending_purge_obsolete_files_) { + pending_purge_obsolete_files_ || + error_handler_.IsRecoveryInProgress()) { TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob"); bg_cv_.Wait(); } diff --git a/db/db_impl.h b/db/db_impl.h index 8983c7895..ed8591990 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -795,6 +795,7 @@ class DBImpl : public DB { private: friend class DB; + friend class ErrorHandler; friend class InternalStats; friend class PessimisticTransaction; friend class TransactionBaseImpl; @@ -845,6 +846,8 @@ class DBImpl : public DB { bool read_only = false, bool error_if_log_file_exist = false, bool error_if_data_exists_in_logs = false); + Status ResumeImpl(); + void MaybeIgnoreError(Status* s) const; const Status CreateArchivalDirectory(); @@ -1046,9 +1049,10 @@ class DBImpl : public DB { LogBuffer* log_buffer, PrepickedCompaction* prepicked_compaction); Status BackgroundFlush(bool* madeProgress, JobContext* job_context, - LogBuffer* log_buffer); + LogBuffer* log_buffer, FlushReason* reason); - bool EnoughRoomForCompaction(const std::vector& inputs, + bool EnoughRoomForCompaction(ColumnFamilyData* cfd, + const std::vector& inputs, bool* sfm_bookkeeping, LogBuffer* log_buffer); void PrintStatistics(); @@ -1082,6 +1086,10 @@ class DBImpl : public DB { Status CloseHelper(); + Status FlushAllCFs(FlushReason flush_reason); + + void WaitForBackgroundWork(); + // table_cache_ provides its own synchronization std::shared_ptr table_cache_; @@ -1526,6 +1534,13 @@ class DBImpl : public DB { // flush/compaction and if it is not provided vis SnapshotChecker, we should // disable gc to be safe. const bool use_custom_gc_; + // Flag to indicate that the DB instance shutdown has been initiated. This + // different from shutting_down_ atomic in that it is set at the beginning + // of shutdown sequence, specifically in order to prevent any background + // error recovery from going on in parallel. The latter, shutting_down_, + // is set a little later during the shutdown after scheduling memtable + // flushes + bool shutdown_initiated_; // Clients must periodically call SetPreserveDeletesSequenceNumber() // to advance this seqnum. Default value is 0 which means ALL deletes are diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 9bbbc4734..eef8cf98d 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -26,7 +26,7 @@ namespace rocksdb { bool DBImpl::EnoughRoomForCompaction( - const std::vector& inputs, + ColumnFamilyData* cfd, const std::vector& inputs, bool* sfm_reserved_compact_space, LogBuffer* log_buffer) { // Check if we have enough room to do the compaction bool enough_room = true; @@ -34,12 +34,17 @@ bool DBImpl::EnoughRoomForCompaction( auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); if (sfm) { - enough_room = sfm->EnoughRoomForCompaction(inputs); + // Pass the current bg_error_ to SFM so it can decide what checks to + // perform. If this DB instance hasn't seen any error yet, the SFM can be + // optimistic and not do disk space checks + enough_room = + sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError()); if (enough_room) { *sfm_reserved_compact_space = true; } } #else + (void)cfd; (void)inputs; (void)sfm_reserved_compact_space; #endif // ROCKSDB_LITE @@ -584,7 +589,7 @@ Status DBImpl::CompactFilesImpl( bool sfm_reserved_compact_space = false; // First check if we have enough room to do the compaction bool enough_room = EnoughRoomForCompaction( - input_files, &sfm_reserved_compact_space, log_buffer); + cfd, input_files, &sfm_reserved_compact_space, log_buffer); if (!enough_room) { // m's vars will get set properly at the end of this function, @@ -914,6 +919,68 @@ Status DBImpl::Flush(const FlushOptions& flush_options, return s; } + +Status DBImpl::FlushAllCFs(FlushReason flush_reason) { + Status s; + WriteContext context; + WriteThread::Writer w; + + mutex_.AssertHeld(); + write_thread_.EnterUnbatched(&w, &mutex_); + + FlushRequest flush_req; + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() && + cached_recoverable_state_empty_.load()) { + // Nothing to flush + continue; + } + + // SwitchMemtable() will release and reacquire mutex during execution + s = SwitchMemtable(cfd, &context); + if (!s.ok()) { + break; + } + + cfd->imm()->FlushRequested(); + + flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID()); + } + + // schedule flush + if (s.ok() && !flush_req.empty()) { + SchedulePendingFlush(flush_req, flush_reason); + MaybeScheduleFlushOrCompaction(); + } + + write_thread_.ExitUnbatched(&w); + + if (s.ok()) { + for (auto& flush : flush_req) { + auto cfd = flush.first; + auto flush_memtable_id = flush.second; + while (cfd->imm()->NumNotFlushed() > 0 && + cfd->imm()->GetEarliestMemTableID() <= flush_memtable_id) { + if (!error_handler_.GetRecoveryError().ok()) { + break; + } + if (cfd->IsDropped()) { + // FlushJob cannot flush a dropped CF, if we did not break here + // we will loop forever since cfd->imm()->NumNotFlushed() will never + // drop to zero + continue; + } + cfd->Ref(); + bg_cv_.Wait(); + cfd->Unref(); + } + } + } + + flush_req.clear(); + return s; +} + Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, int output_level, uint32_t output_path_id, uint32_t max_subcompactions, @@ -1236,6 +1303,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { if (bg_work_paused_ > 0) { // we paused the background work return; + } else if (error_handler_.IsBGWorkStopped() && + !error_handler_.IsRecoveryInProgress()) { + // There has been a hard error and this call is not part of the recovery + // sequence. Bail out here so we don't get into an endless loop of + // scheduling BG work which will again call this function + return; } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions return; @@ -1263,6 +1336,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { if (bg_compaction_paused_ > 0) { // we paused the background compaction return; + } else if (error_handler_.IsBGWorkStopped()) { + // Compaction is not part of the recovery sequence from a hard error. We + // might get here because recovery might do a flush and install a new + // super version, which will try to schedule pending compactions. Bail + // out here and let the higher level recovery handle compactions + return; } if (HasExclusiveManualCompaction()) { @@ -1420,15 +1499,18 @@ void DBImpl::UnscheduleCallback(void* arg) { } Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, - LogBuffer* log_buffer) { + LogBuffer* log_buffer, FlushReason* reason) { mutex_.AssertHeld(); Status status; + *reason = FlushReason::kOthers; + // If BG work is stopped due to an error, but a recovery is in progress, + // that means this flush is part of the recovery. So allow it to go through if (!error_handler_.IsBGWorkStopped()) { if (shutting_down_.load(std::memory_order_acquire)) { status = Status::ShutdownInProgress(); } - } else { + } else if (!error_handler_.IsRecoveryInProgress()) { status = error_handler_.GetBGError(); } @@ -1479,6 +1561,9 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, } status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, job_context, log_buffer); + // All the CFDs in the FlushReq must have the same flush reason, so just + // grab the first one + *reason = bg_flush_args[0].cfd_->GetFlushReason(); for (auto& arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; if (cfd->Unref()) { @@ -1505,9 +1590,12 @@ void DBImpl::BackgroundCallFlush() { auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); + FlushReason reason; - Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer); - if (!s.ok() && !s.IsShutdownInProgress()) { + Status s = + BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason); + if (!s.ok() && !s.IsShutdownInProgress() && + reason != FlushReason::kErrorRecovery) { // Wait a little bit before retrying background flush in // case this is an environmental problem and we do not want to // chew up resources for failed flushes for the duration of @@ -1697,6 +1785,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } } else { status = error_handler_.GetBGError(); + // If we get here, it means a hard error happened after this compaction + // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got + // a chance to execute. Since we didn't pop a cfd from the compaction + // queue, increment unscheduled_compactions_ + unscheduled_compactions_++; } if (!status.ok()) { @@ -1732,7 +1825,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } else { // First check if we have enough room to do the compaction bool enough_room = EnoughRoomForCompaction( - *(c->inputs()), &sfm_reserved_compact_space, log_buffer); + m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer); if (!enough_room) { // Then don't do the compaction @@ -1795,7 +1888,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (c != nullptr) { bool enough_room = EnoughRoomForCompaction( - *(c->inputs()), &sfm_reserved_compact_space, log_buffer); + cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer); if (!enough_room) { // Then don't do the compaction @@ -2004,8 +2097,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status, compaction_job_stats, job_context->job_id); } - // this will unref its input_version and column_family_data - c.reset(); if (status.ok() || status.IsCompactionTooLarge()) { // Done @@ -2015,7 +2106,26 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", status.ToString().c_str()); error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); + if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) { + // Put this cfd back in the compaction queue so we can retry after some + // time + auto cfd = c->column_family_data(); + assert(cfd != nullptr); + // Since this compaction failed, we need to recompute the score so it + // takes the original input files into account + c->column_family_data() + ->current() + ->storage_info() + ->ComputeCompactionScore(*(c->immutable_cf_options()), + *(c->mutable_cf_options())); + if (!cfd->queued_for_compaction()) { + AddToCompactionQueue(cfd); + ++unscheduled_compactions_; + } + } } + // this will unref its input_version and column_family_data + c.reset(); if (is_manual) { ManualCompactionState* m = manual_compaction; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index f4709b849..6ed358d3b 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -137,7 +137,7 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) { while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || bg_flush_scheduled_ || (wait_unscheduled && unscheduled_compactions_)) && - !error_handler_.IsDBStopped()) { + (error_handler_.GetBGError() == Status::OK())) { bg_cv_.Wait(); } return error_handler_.GetBGError(); diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index d4614e7f8..8371b0720 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -134,8 +134,15 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { for (size_t i = 0; i < result.db_paths.size(); i++) { DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path); } -#endif + // Create a default SstFileManager for purposes of tracking compaction size + // and facilitating recovery from out of space errors. + if (result.sst_file_manager.get() == nullptr) { + std::shared_ptr sst_file_manager( + NewSstFileManager(result.env, result.info_log)); + result.sst_file_manager = sst_file_manager; + } +#endif return result; } @@ -1050,6 +1057,12 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, break; } } + + // For recovery from NoSpace() error, we can only handle + // the case where the database is stored in a single path + if (paths.size() <= 1) { + impl->error_handler_.EnableAutoRecovery(); + } } if (!s.ok()) { @@ -1213,6 +1226,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } } } + + // Reserve some disk buffer space. This is a heuristic - when we run out + // of disk space, this ensures that there is atleast write_buffer_size + // amount of free space before we resume DB writes. In low disk space + // conditions, we want to avoid a lot of small L0 files due to frequent + // WAL write failures and resultant forced flushes + sfm->ReserveDiskBuffer(max_write_buffer_size, + impl->immutable_db_options_.db_paths[0].path); } #endif // !ROCKSDB_LITE diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 61c5f15f6..ff786e113 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -710,6 +710,10 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, assert(write_context != nullptr && need_log_sync != nullptr); Status status; + if (error_handler_.IsDBStopped()) { + status = error_handler_.GetBGError(); + } + PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time); assert(!single_column_family_mode_ || @@ -728,10 +732,6 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, status = HandleWriteBufferFull(write_context); } - if (UNLIKELY(status.ok())) { - status = error_handler_.GetBGError(); - } - if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { status = ScheduleFlushes(write_context); } @@ -1184,7 +1184,11 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, mutex_.Lock(); } - while (!error_handler_.IsDBStopped() && write_controller_.IsStopped()) { + // Don't wait if there's a background error, even if its a soft error. We + // might wait here indefinitely as the background compaction may never + // finish successfully, resulting in the stall condition lasting + // indefinitely + while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) { if (write_options.no_slowdown) { return Status::Incomplete(); } @@ -1200,7 +1204,19 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, RecordTick(stats_, STALL_MICROS, time_delayed); } - return error_handler_.GetBGError(); + // If DB is not in read-only mode and write_controller is not stopping + // writes, we can ignore any background errors and allow the write to + // proceed + Status s; + if (write_controller_.IsStopped()) { + // If writes are still stopped, it means we bailed due to a background + // error + s = Status::Incomplete(error_handler_.GetBGError().ToString()); + } + if (error_handler_.IsDBStopped()) { + s = error_handler_.GetBGError(); + } + return s; } Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, diff --git a/db/error_handler.cc b/db/error_handler.cc index a5b23a4b8..8e297df98 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -4,7 +4,9 @@ // (found in the LICENSE.Apache file in the root directory). // #include "db/error_handler.h" +#include "db/db_impl.h" #include "db/event_helpers.h" +#include "util/sst_file_manager_impl.h" namespace rocksdb { @@ -33,7 +35,7 @@ std::map, // Errors during BG flush {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, Status::SubCode::kNoSpace, true), - Status::Severity::kSoftError}, + Status::Severity::kHardError}, {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, Status::SubCode::kNoSpace, false), Status::Severity::kNoError}, @@ -44,11 +46,11 @@ std::map, {std::make_tuple(BackgroundErrorReason::kWriteCallback, Status::Code::kIOError, Status::SubCode::kNoSpace, true), - Status::Severity::kFatalError}, + Status::Severity::kHardError}, {std::make_tuple(BackgroundErrorReason::kWriteCallback, Status::Code::kIOError, Status::SubCode::kNoSpace, false), - Status::Severity::kFatalError}, + Status::Severity::kHardError}, }; std::map, Status::Severity> @@ -118,6 +120,45 @@ std::map, Status::Severity> Status::Severity::kFatalError}, }; +void ErrorHandler::CancelErrorRecovery() { +#ifndef ROCKSDB_LITE + db_mutex_->AssertHeld(); + + // We'll release the lock before calling sfm, so make sure no new + // recovery gets scheduled at that point + auto_recovery_ = false; + SstFileManagerImpl* sfm = reinterpret_cast( + db_options_.sst_file_manager.get()); + if (sfm) { + // This may or may not cancel a pending recovery + db_mutex_->Unlock(); + bool cancelled = sfm->CancelErrorRecovery(this); + db_mutex_->Lock(); + if (cancelled) { + recovery_in_prog_ = false; + } + } +#endif +} + +// This is the main function for looking at an error during a background +// operation and deciding the severity, and error recovery strategy. The high +// level algorithm is as follows - +// 1. Classify the severity of the error based on the ErrorSeverityMap, +// DefaultErrorSeverityMap and DefaultReasonMap defined earlier +// 2. Call a Status code specific override function to adjust the severity +// if needed. The reason for this is our ability to recover may depend on +// the exact options enabled in DBOptions +// 3. Determine if auto recovery is possible. A listener notification callback +// is called, which can disable the auto recovery even if we decide its +// feasible +// 4. For Status::NoSpace() errors, rely on SstFileManagerImpl to control +// the actual recovery. If no sst file manager is specified in DBOptions, +// a default one is allocated during DB::Open(), so there will always be +// one. +// This can also get called as part of a recovery operation. In that case, we +// also track the error seperately in recovery_error_ so we can tell in the +// end whether recovery succeeded or not Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reason) { db_mutex_->AssertHeld(); @@ -125,6 +166,12 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas return Status::OK(); } + // Check if recovery is currently in progress. If it is, we will save this + // error so we can check it at the end to see if recovery succeeded or not + if (recovery_in_prog_ && recovery_error_.ok()) { + recovery_error_ = bg_err; + } + bool paranoid = db_options_.paranoid_checks; Status::Severity sev = Status::Severity::kFatalError; Status new_bg_err; @@ -156,15 +203,143 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas } new_bg_err = Status(bg_err, sev); + + bool auto_recovery = auto_recovery_; + if (new_bg_err.severity() >= Status::Severity::kFatalError && auto_recovery) { + auto_recovery = false; + ; + } + + // Allow some error specific overrides + if (new_bg_err == Status::NoSpace()) { + new_bg_err = OverrideNoSpaceError(new_bg_err, &auto_recovery); + } + if (!new_bg_err.ok()) { Status s = new_bg_err; - EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, db_mutex_); + EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, + db_mutex_, &auto_recovery); if (!s.ok() && (s.severity() > bg_error_.severity())) { bg_error_ = s; + } else { + // This error is less severe than previously encountered error. Don't + // take any further action + return bg_error_; } } + if (auto_recovery) { + recovery_in_prog_ = true; + + // Kick-off error specific recovery + if (bg_error_ == Status::NoSpace()) { + RecoverFromNoSpace(); + } + } return bg_error_; } +Status ErrorHandler::OverrideNoSpaceError(Status bg_error, + bool* auto_recovery) { +#ifndef ROCKSDB_LITE + if (bg_error.severity() >= Status::Severity::kFatalError) { + return bg_error; + } + + if (db_options_.sst_file_manager.get() == nullptr) { + // We rely on SFM to poll for enough disk space and recover + *auto_recovery = false; + return bg_error; + } + + if (db_options_.allow_2pc && + (bg_error.severity() <= Status::Severity::kSoftError)) { + // Don't know how to recover, as the contents of the current WAL file may + // be inconsistent, and it may be needed for 2PC. If 2PC is not enabled, + // we can just flush the memtable and discard the log + *auto_recovery = false; + return Status(bg_error, Status::Severity::kFatalError); + } + + { + uint64_t free_space; + if (db_options_.env->GetFreeSpace(db_options_.db_paths[0].path, + &free_space) == Status::NotSupported()) { + *auto_recovery = false; + } + } + + return bg_error; +#else + (void)auto_recovery; + return Status(bg_error, Status::Severity::kFatalError); +#endif +} + +void ErrorHandler::RecoverFromNoSpace() { +#ifndef ROCKSDB_LITE + SstFileManagerImpl* sfm = + reinterpret_cast(db_options_.sst_file_manager.get()); + + // Inform SFM of the error, so it can kick-off the recovery + if (sfm) { + sfm->StartErrorRecovery(this, bg_error_); + } +#endif +} + +Status ErrorHandler::ClearBGError() { +#ifndef ROCKSDB_LITE + db_mutex_->AssertHeld(); + + // Signal that recovery succeeded + if (recovery_error_.ok()) { + Status old_bg_error = bg_error_; + bg_error_ = Status::OK(); + recovery_in_prog_ = false; + EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners, + old_bg_error, db_mutex_); + } + return recovery_error_; +#else + return bg_error_; +#endif +} + +Status ErrorHandler::RecoverFromBGError(bool is_manual) { +#ifndef ROCKSDB_LITE + InstrumentedMutexLock l(db_mutex_); + if (is_manual) { + // If its a manual recovery and there's a background recovery in progress + // return busy status + if (recovery_in_prog_) { + return Status::Busy(); + } + recovery_in_prog_ = true; + } + + if (bg_error_.severity() == Status::Severity::kSoftError) { + // Simply clear the background error and return + recovery_error_ = Status::OK(); + return ClearBGError(); + } + + // Reset recovery_error_. We will use this to record any errors that happen + // during the recovery process. While recovering, the only operations that + // can generate background errors should be the flush operations + recovery_error_ = Status::OK(); + Status s = db_->ResumeImpl(); + // For manual recover, shutdown, and fatal error cases, set + // recovery_in_prog_ to false. For automatic background recovery, leave it + // as is regardless of success or failure as it will be retried + if (is_manual || s.IsShutdownInProgress() || + bg_error_.severity() >= Status::Severity::kFatalError) { + recovery_in_prog_ = false; + } + return s; +#else + (void)is_manual; + return bg_error_; +#endif +} } diff --git a/db/error_handler.h b/db/error_handler.h index 8a1a411fc..ce8454da6 100644 --- a/db/error_handler.h +++ b/db/error_handler.h @@ -11,42 +11,65 @@ namespace rocksdb { +class DBImpl; + class ErrorHandler { public: - ErrorHandler(const ImmutableDBOptions& db_options, - InstrumentedMutex* db_mutex) - : db_options_(db_options), - bg_error_(Status::OK()), - db_mutex_(db_mutex) - {} - ~ErrorHandler() {} + ErrorHandler(DBImpl* db, const ImmutableDBOptions& db_options, + InstrumentedMutex* db_mutex) + : db_(db), + db_options_(db_options), + bg_error_(Status::OK()), + recovery_error_(Status::OK()), + db_mutex_(db_mutex), + auto_recovery_(false), + recovery_in_prog_(false) {} + ~ErrorHandler() {} - Status::Severity GetErrorSeverity(BackgroundErrorReason reason, - Status::Code code, Status::SubCode subcode); + void EnableAutoRecovery() { auto_recovery_ = true; } - Status SetBGError(const Status& bg_err, BackgroundErrorReason reason); + Status::Severity GetErrorSeverity(BackgroundErrorReason reason, + Status::Code code, + Status::SubCode subcode); - Status GetBGError() - { - return bg_error_; - } + Status SetBGError(const Status& bg_err, BackgroundErrorReason reason); - void ClearBGError() { - bg_error_ = Status::OK(); - } + Status GetBGError() { return bg_error_; } - bool IsDBStopped() { - return !bg_error_.ok(); + Status GetRecoveryError() { return recovery_error_; } + + Status ClearBGError(); + + bool IsDBStopped() { + return !bg_error_.ok() && + bg_error_.severity() >= Status::Severity::kHardError; } bool IsBGWorkStopped() { - return !bg_error_.ok(); + return !bg_error_.ok() && + (bg_error_.severity() >= Status::Severity::kHardError || + !auto_recovery_); } - private: + bool IsRecoveryInProgress() { return recovery_in_prog_; } + + Status RecoverFromBGError(bool is_manual = false); + void CancelErrorRecovery(); + + private: + DBImpl* db_; const ImmutableDBOptions& db_options_; Status bg_error_; + // A seperate Status variable used to record any errors during the + // recovery process from hard errors + Status recovery_error_; InstrumentedMutex* db_mutex_; + // A flag indicating whether automatic recovery from errors is enabled + bool auto_recovery_; + bool recovery_in_prog_; + + Status OverrideNoSpaceError(Status bg_error, bool* auto_recovery); + void RecoverFromNoSpace(); }; } diff --git a/db/error_handler_test.cc b/db/error_handler_test.cc index abd5663d8..6efba2987 100644 --- a/db/error_handler_test.cc +++ b/db/error_handler_test.cc @@ -6,9 +6,12 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#ifndef ROCKSDB_LITE + #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/perf_context.h" +#include "rocksdb/sst_file_manager.h" #include "util/fault_injection_test_env.h" #if !defined(ROCKSDB_LITE) #include "util/sync_point.h" @@ -33,36 +36,137 @@ class DBErrorHandlingEnv : public EnvWrapper { bool trig_io_error; }; +class ErrorHandlerListener : public EventListener { + public: + ErrorHandlerListener() + : mutex_(), + cv_(&mutex_), + no_auto_recovery_(false), + recovery_complete_(false), + file_creation_started_(false), + override_bg_error_(false), + file_count_(0), + fault_env_(nullptr) {} + + void OnTableFileCreationStarted(const TableFileCreationBriefInfo& /*ti*/) { + InstrumentedMutexLock l(&mutex_); + file_creation_started_ = true; + if (file_count_ > 0) { + if (--file_count_ == 0) { + fault_env_->SetFilesystemActive(false, file_creation_error_); + file_creation_error_ = Status::OK(); + } + } + cv_.SignalAll(); + } + + void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/, + Status /*bg_error*/, bool* auto_recovery) { + if (*auto_recovery && no_auto_recovery_) { + *auto_recovery = false; + } + } + + void OnErrorRecoveryCompleted(Status /*old_bg_error*/) { + InstrumentedMutexLock l(&mutex_); + recovery_complete_ = true; + cv_.SignalAll(); + } + + bool WaitForRecovery(uint64_t /*abs_time_us*/) { + InstrumentedMutexLock l(&mutex_); + while (!recovery_complete_) { + cv_.Wait(/*abs_time_us*/); + } + if (recovery_complete_) { + recovery_complete_ = false; + return true; + } + return false; + } + + void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) { + InstrumentedMutexLock l(&mutex_); + while (!file_creation_started_) { + cv_.Wait(/*abs_time_us*/); + } + file_creation_started_ = false; + } + + void OnBackgroundError(BackgroundErrorReason /*reason*/, + Status* bg_error) override { + if (override_bg_error_) { + *bg_error = bg_error_; + override_bg_error_ = false; + } + } + + void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; } + + void OverrideBGError(Status bg_err) { + bg_error_ = bg_err; + override_bg_error_ = true; + } + + void InjectFileCreationError(FaultInjectionTestEnv* env, int file_count, + Status s) { + fault_env_ = env; + file_count_ = file_count; + file_creation_error_ = s; + } + + private: + InstrumentedMutex mutex_; + InstrumentedCondVar cv_; + bool no_auto_recovery_; + bool recovery_complete_; + bool file_creation_started_; + bool override_bg_error_; + int file_count_; + Status file_creation_error_; + Status bg_error_; + FaultInjectionTestEnv* fault_env_; +}; + TEST_F(DBErrorHandlingTest, FLushWriteError) { std::unique_ptr fault_env( new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); Options options = GetDefaultOptions(); options.create_if_missing = true; options.env = fault_env.get(); + options.listeners.emplace_back(listener); Status s; + + listener->EnableAutoRecovery(false); DestroyAndReopen(options); - Put(Key(0), "va;"); + Put(Key(0), "val"); SyncPoint::GetInstance()->SetCallBack( "FlushJob::Start", [&](void *) { fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); }); SyncPoint::GetInstance()->EnableProcessing(); s = Flush(); - ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError); + SyncPoint::GetInstance()->DisableProcessing(); fault_env->SetFilesystemActive(true); s = dbfull()->Resume(); ASSERT_EQ(s, Status::OK()); + Reopen(options); + ASSERT_EQ("val", Get(Key(0))); Destroy(options); } TEST_F(DBErrorHandlingTest, CompactionWriteError) { std::unique_ptr fault_env( new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); Options options = GetDefaultOptions(); options.create_if_missing = true; options.level0_file_num_compaction_trigger = 2; + options.listeners.emplace_back(listener); options.env = fault_env.get(); Status s; DestroyAndReopen(options); @@ -72,6 +176,10 @@ TEST_F(DBErrorHandlingTest, CompactionWriteError) { s = Flush(); ASSERT_EQ(s, Status::OK()); + listener->OverrideBGError( + Status(Status::NoSpace(), Status::Severity::kHardError) + ); + listener->EnableAutoRecovery(false); rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"FlushMemTableFinished", "BackgroundCallCompaction:0"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( @@ -85,7 +193,7 @@ TEST_F(DBErrorHandlingTest, CompactionWriteError) { ASSERT_EQ(s, Status::OK()); s = dbfull()->TEST_WaitForCompact(); - ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError); fault_env->SetFilesystemActive(true); s = dbfull()->Resume(); @@ -129,6 +237,453 @@ TEST_F(DBErrorHandlingTest, CorruptionError) { Destroy(options); } +#ifndef TRAVIS +TEST_F(DBErrorHandlingTest, AutoRecoverFlushError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.env = fault_env.get(); + options.listeners.emplace_back(listener); + Status s; + + listener->EnableAutoRecovery(); + DestroyAndReopen(options); + + Put(Key(0), "val"); + SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) { + fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError); + SyncPoint::GetInstance()->DisableProcessing(); + fault_env->SetFilesystemActive(true); + ASSERT_EQ(listener->WaitForRecovery(5000000), true); + + s = Put(Key(1), "val"); + ASSERT_EQ(s, Status::OK()); + + Reopen(options); + ASSERT_EQ("val", Get(Key(0))); + ASSERT_EQ("val", Get(Key(1))); + Destroy(options); +} + +TEST_F(DBErrorHandlingTest, FailRecoverFlushError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.env = fault_env.get(); + options.listeners.emplace_back(listener); + Status s; + + listener->EnableAutoRecovery(); + DestroyAndReopen(options); + + Put(Key(0), "val"); + SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) { + fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError); + // We should be able to shutdown the database while auto recovery is going + // on in the background + Close(); + DestroyDB(dbname_, options); +} + +TEST_F(DBErrorHandlingTest, WALWriteError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.writable_file_max_buffer_size = 32768; + options.env = fault_env.get(); + options.listeners.emplace_back(listener); + Status s; + + listener->EnableAutoRecovery(); + DestroyAndReopen(options); + + { + WriteBatch batch; + char val[1024]; + + for (auto i = 0; i<100; ++i) { + sprintf(val, "%d", i); + batch.Put(Key(i), Slice(val, sizeof(val))); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + { + WriteBatch batch; + char val[1024]; + int write_error = 0; + + for (auto i = 100; i<199; ++i) { + sprintf(val, "%d", i); + batch.Put(Key(i), Slice(val, sizeof(val))); + } + + SyncPoint::GetInstance()->SetCallBack("WritableFileWriter::Append:BeforePrepareWrite", [&](void*) { + write_error++; + if (write_error > 2) { + fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + WriteOptions wopts; + wopts.sync = true; + s = dbfull()->Write(wopts, &batch); + ASSERT_EQ(s, s.NoSpace()); + } + SyncPoint::GetInstance()->DisableProcessing(); + fault_env->SetFilesystemActive(true); + ASSERT_EQ(listener->WaitForRecovery(5000000), true); + for (auto i=0; i<199; ++i) { + if (i < 100) { + ASSERT_NE(Get(Key(i)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } + Reopen(options); + for (auto i=0; i<199; ++i) { + if (i < 100) { + ASSERT_NE(Get(Key(i)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } + Close(); +} + +TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + std::shared_ptr listener(new ErrorHandlerListener()); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.writable_file_max_buffer_size = 32768; + options.env = fault_env.get(); + options.listeners.emplace_back(listener); + Status s; + + listener->EnableAutoRecovery(); + CreateAndReopenWithCF({"one", "two", "three"}, options); + + { + WriteBatch batch; + char val[1024]; + + for (auto i = 1; i < 4; ++i) { + for (auto j = 0; j < 100; ++j) { + sprintf(val, "%d", j); + batch.Put(handles_[i], Key(j), Slice(val, sizeof(val))); + } + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + { + WriteBatch batch; + char val[1024]; + int write_error = 0; + + // Write to one CF + for (auto i = 100; i < 199; ++i) { + sprintf(val, "%d", i); + batch.Put(handles_[2], Key(i), Slice(val, sizeof(val))); + } + + SyncPoint::GetInstance()->SetCallBack( + "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) { + write_error++; + if (write_error > 2) { + fault_env->SetFilesystemActive(false, + Status::NoSpace("Out of space")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + WriteOptions wopts; + wopts.sync = true; + s = dbfull()->Write(wopts, &batch); + ASSERT_EQ(s, s.NoSpace()); + } + SyncPoint::GetInstance()->DisableProcessing(); + fault_env->SetFilesystemActive(true); + ASSERT_EQ(listener->WaitForRecovery(5000000), true); + + for (auto i = 1; i < 4; ++i) { + // Every CF should have been flushed + ASSERT_EQ(NumTableFilesAtLevel(0, i), 1); + } + + for (auto i = 1; i < 4; ++i) { + for (auto j = 0; j < 199; ++j) { + if (j < 100) { + ASSERT_NE(Get(i, Key(j)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND"); + } + } + } + ReopenWithColumnFamilies({"default", "one", "two", "three"}, options); + for (auto i = 1; i < 4; ++i) { + for (auto j = 0; j < 199; ++j) { + if (j < 100) { + ASSERT_NE(Get(i, Key(j)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND"); + } + } + } + Close(); +} + +TEST_F(DBErrorHandlingTest, MultiDBCompactionError) { + FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default()); + std::vector> fault_env; + std::vector options; + std::vector> listener; + std::vector db; + std::shared_ptr sfm(NewSstFileManager(def_env)); + int kNumDbInstances = 3; + + for (auto i = 0; i < kNumDbInstances; ++i) { + listener.emplace_back(new ErrorHandlerListener()); + options.emplace_back(GetDefaultOptions()); + fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default())); + options[i].create_if_missing = true; + options[i].level0_file_num_compaction_trigger = 2; + options[i].writable_file_max_buffer_size = 32768; + options[i].env = fault_env[i].get(); + options[i].listeners.emplace_back(listener[i]); + options[i].sst_file_manager = sfm; + DB* dbptr; + char buf[16]; + + listener[i]->EnableAutoRecovery(); + // Setup for returning error for the 3rd SST, which would be level 1 + listener[i]->InjectFileCreationError(fault_env[i].get(), 3, + Status::NoSpace("Out of space")); + snprintf(buf, sizeof(buf), "_%d", i); + DestroyDB(dbname_ + std::string(buf), options[i]); + ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr), + Status::OK()); + db.emplace_back(dbptr); + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + WriteBatch batch; + char val[1024]; + + for (auto j = 0; j <= 100; ++j) { + sprintf(val, "%d", j); + batch.Put(Key(j), Slice(val, sizeof(val))); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK()); + ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK()); + } + + def_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + for (auto i = 0; i < kNumDbInstances; ++i) { + WriteBatch batch; + char val[1024]; + + // Write to one CF + for (auto j = 100; j < 199; ++j) { + sprintf(val, "%d", j); + batch.Put(Key(j), Slice(val, sizeof(val))); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK()); + ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK()); + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + Status s = static_cast(db[i])->TEST_WaitForCompact(true); + ASSERT_EQ(s.severity(), Status::Severity::kSoftError); + fault_env[i]->SetFilesystemActive(true); + } + + def_env->SetFilesystemActive(true); + for (auto i = 0; i < kNumDbInstances; ++i) { + std::string prop; + ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true); + EXPECT_TRUE(db[i]->GetProperty( + "rocksdb.num-files-at-level" + NumberToString(0), &prop)); + EXPECT_EQ(atoi(prop.c_str()), 0); + EXPECT_TRUE(db[i]->GetProperty( + "rocksdb.num-files-at-level" + NumberToString(1), &prop)); + EXPECT_EQ(atoi(prop.c_str()), 1); + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + char buf[16]; + snprintf(buf, sizeof(buf), "_%d", i); + delete db[i]; + fault_env[i]->SetFilesystemActive(true); + if (getenv("KEEP_DB")) { + printf("DB is still at %s%s\n", dbname_.c_str(), buf); + } else { + Status s = DestroyDB(dbname_ + std::string(buf), options[i]); + } + } + options.clear(); + sfm.reset(); + delete def_env; +} + +TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) { + FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default()); + std::vector> fault_env; + std::vector options; + std::vector> listener; + std::vector db; + std::shared_ptr sfm(NewSstFileManager(def_env)); + int kNumDbInstances = 3; + + for (auto i = 0; i < kNumDbInstances; ++i) { + listener.emplace_back(new ErrorHandlerListener()); + options.emplace_back(GetDefaultOptions()); + fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default())); + options[i].create_if_missing = true; + options[i].level0_file_num_compaction_trigger = 2; + options[i].writable_file_max_buffer_size = 32768; + options[i].env = fault_env[i].get(); + options[i].listeners.emplace_back(listener[i]); + options[i].sst_file_manager = sfm; + DB* dbptr; + char buf[16]; + + listener[i]->EnableAutoRecovery(); + switch (i) { + case 0: + // Setup for returning error for the 3rd SST, which would be level 1 + listener[i]->InjectFileCreationError(fault_env[i].get(), 3, + Status::NoSpace("Out of space")); + break; + case 1: + // Setup for returning error after the 1st SST, which would result + // in a hard error + listener[i]->InjectFileCreationError(fault_env[i].get(), 2, + Status::NoSpace("Out of space")); + break; + default: + break; + } + snprintf(buf, sizeof(buf), "_%d", i); + DestroyDB(dbname_ + std::string(buf), options[i]); + ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr), + Status::OK()); + db.emplace_back(dbptr); + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + WriteBatch batch; + char val[1024]; + + for (auto j = 0; j <= 100; ++j) { + sprintf(val, "%d", j); + batch.Put(Key(j), Slice(val, sizeof(val))); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK()); + ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK()); + } + + def_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + for (auto i = 0; i < kNumDbInstances; ++i) { + WriteBatch batch; + char val[1024]; + + // Write to one CF + for (auto j = 100; j < 199; ++j) { + sprintf(val, "%d", j); + batch.Put(Key(j), Slice(val, sizeof(val))); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK()); + if (i != 1) { + ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK()); + } else { + ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::NoSpace()); + } + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + Status s = static_cast(db[i])->TEST_WaitForCompact(true); + switch (i) { + case 0: + ASSERT_EQ(s.severity(), Status::Severity::kSoftError); + break; + case 1: + ASSERT_EQ(s.severity(), Status::Severity::kHardError); + break; + case 2: + ASSERT_EQ(s, Status::OK()); + break; + } + fault_env[i]->SetFilesystemActive(true); + } + + def_env->SetFilesystemActive(true); + for (auto i = 0; i < kNumDbInstances; ++i) { + std::string prop; + if (i < 2) { + ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true); + } + if (i == 1) { + ASSERT_EQ(static_cast(db[i])->TEST_WaitForCompact(true), + Status::OK()); + } + EXPECT_TRUE(db[i]->GetProperty( + "rocksdb.num-files-at-level" + NumberToString(0), &prop)); + EXPECT_EQ(atoi(prop.c_str()), 0); + EXPECT_TRUE(db[i]->GetProperty( + "rocksdb.num-files-at-level" + NumberToString(1), &prop)); + EXPECT_EQ(atoi(prop.c_str()), 1); + } + + for (auto i = 0; i < kNumDbInstances; ++i) { + char buf[16]; + snprintf(buf, sizeof(buf), "_%d", i); + fault_env[i]->SetFilesystemActive(true); + delete db[i]; + if (getenv("KEEP_DB")) { + printf("DB is still at %s%s\n", dbname_.c_str(), buf); + } else { + DestroyDB(dbname_ + std::string(buf), options[i]); + } + } + options.clear(); + delete def_env; +} +#endif + } // namespace rocksdb int main(int argc, char** argv) { @@ -136,3 +691,13 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } + +#else +#include + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // ROCKSDB_LITE diff --git a/db/event_helpers.cc b/db/event_helpers.cc index e4fd64226..c80c5aefb 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -40,8 +40,8 @@ void EventHelpers::NotifyTableFileCreationStarted( void EventHelpers::NotifyOnBackgroundError( const std::vector>& listeners, - BackgroundErrorReason reason, Status* bg_error, - InstrumentedMutex* db_mutex) { + BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex, + bool* auto_recovery) { #ifndef ROCKSDB_LITE if (listeners.size() == 0U) { return; @@ -51,6 +51,9 @@ void EventHelpers::NotifyOnBackgroundError( db_mutex->Unlock(); for (auto& listener : listeners) { listener->OnBackgroundError(reason, bg_error); + if (*auto_recovery) { + listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery); + } } db_mutex->Lock(); #else @@ -58,6 +61,7 @@ void EventHelpers::NotifyOnBackgroundError( (void)reason; (void)bg_error; (void)db_mutex; + (void)auto_recovery; #endif // ROCKSDB_LITE } @@ -167,4 +171,25 @@ void EventHelpers::LogAndNotifyTableFileDeletion( #endif // !ROCKSDB_LITE } +void EventHelpers::NotifyOnErrorRecoveryCompleted( + const std::vector>& listeners, + Status old_bg_error, InstrumentedMutex* db_mutex) { +#ifndef ROCKSDB_LITE + if (listeners.size() == 0U) { + return; + } + db_mutex->AssertHeld(); + // release lock while notifying events + db_mutex->Unlock(); + for (auto& listener : listeners) { + listener->OnErrorRecoveryCompleted(old_bg_error); + } + db_mutex->Lock(); +#else + (void)listeners; + (void)old_bg_error; + (void)db_mutex; +#endif // ROCKSDB_LITE +} + } // namespace rocksdb diff --git a/db/event_helpers.h b/db/event_helpers.h index 674e6c5f6..ea35b4b5b 100644 --- a/db/event_helpers.h +++ b/db/event_helpers.h @@ -28,7 +28,7 @@ class EventHelpers { static void NotifyOnBackgroundError( const std::vector>& listeners, BackgroundErrorReason reason, Status* bg_error, - InstrumentedMutex* db_mutex); + InstrumentedMutex* db_mutex, bool* auto_recovery); static void LogAndNotifyTableFileCreationFinished( EventLogger* event_logger, const std::vector>& listeners, @@ -41,6 +41,9 @@ class EventHelpers { uint64_t file_number, const std::string& file_path, const Status& status, const std::string& db_name, const std::vector>& listeners); + static void NotifyOnErrorRecoveryCompleted( + const std::vector>& listeners, + Status bg_error, InstrumentedMutex* db_mutex); private: static void LogAndNotifyTableFileCreation( diff --git a/db/flush_job.cc b/db/flush_job.cc index 1fee162ab..b64712fd1 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -78,6 +78,8 @@ const char* GetFlushReasonString (FlushReason flush_reason) { return "Auto Compaction"; case FlushReason::kManualFlush: return "Manual Flush"; + case FlushReason::kErrorRecovery: + return "Error Recovery"; default: return "Invalid"; } diff --git a/db/listener_test.cc b/db/listener_test.cc index 33c916119..77afcd9ed 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -882,10 +882,13 @@ TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) { ASSERT_EQ(1, listener->counter()); // trigger flush so compaction is triggered again; this time it succeeds + // The previous failed compaction may get retried automatically, so we may + // be left with 0 or 1 files in level 1, depending on when the retry gets + // scheduled ASSERT_OK(Put("key0", "val")); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); ASSERT_OK(dbfull()->TEST_WaitForCompact()); - ASSERT_EQ(0, NumTableFilesAtLevel(0)); + ASSERT_LE(1, NumTableFilesAtLevel(0)); } } // namespace rocksdb diff --git a/env/env_posix.cc b/env/env_posix.cc index db28a2f1a..2b53fd8ab 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -25,6 +25,7 @@ #include #include #endif +#include #include #include #include @@ -776,6 +777,18 @@ class PosixEnv : public Env { return gettid(pthread_self()); } + virtual Status GetFreeSpace(const std::string& fname, + uint64_t* free_space) override { + struct statvfs sbuf; + + if (statvfs(fname.c_str(), &sbuf) < 0) { + return IOError("While doing statvfs", fname, errno); + } + + *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree); + return Status::OK(); + } + virtual Status NewLogger(const std::string& fname, shared_ptr* result) override { FILE* f; diff --git a/env/posix_logger.h b/env/posix_logger.h index e983ba704..401df6a3f 100644 --- a/env/posix_logger.h +++ b/env/posix_logger.h @@ -165,7 +165,6 @@ class PosixLogger : public Logger { size_t sz = fwrite(base, 1, write_size, file_); flush_pending_ = true; - assert(sz == write_size); if (sz > 0) { log_size_ += write_size; } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 99127f766..755836461 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -477,6 +477,15 @@ class Env { // Returns the ID of the current thread. virtual uint64_t GetThreadID() const; +// This seems to clash with a macro on Windows, so #undef it here +#undef GetFreeSpace + + // Get the amount of free disk space + virtual Status GetFreeSpace(const std::string& /*path*/, + uint64_t* /*diskfree*/) { + return Status::NotSupported(); + } + protected: // The pointer to an internal structure that will update the // status of each thread. diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 601951cd0..46ce712dc 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -27,6 +27,7 @@ enum class TableFileCreationReason { kFlush, kCompaction, kRecovery, + kMisc, }; struct TableFileCreationBriefInfo { @@ -103,6 +104,7 @@ enum class FlushReason : int { kDeleteFiles = 0x08, kAutoCompaction = 0x09, kManualFlush = 0x0a, + kErrorRecovery = 0xb, }; enum class BackgroundErrorReason { @@ -393,6 +395,21 @@ class EventListener { // returns. Otherwise, RocksDB may be blocked. virtual void OnStallConditionsChanged(const WriteStallInfo& /*info*/) {} + // A callback function for RocksDB which will be called just before + // starting the automatic recovery process for recoverable background + // errors, such as NoSpace(). The callback can suppress the automatic + // recovery by setting *auto_recovery to false. The database will then + // have to be transitioned out of read-only mode by calling DB::Resume() + virtual void OnErrorRecoveryBegin(BackgroundErrorReason /* reason */, + Status /* bg_error */, + bool* /* auto_recovery */) {} + + // A callback function for RocksDB which will be called once the database + // is recovered from read-only mode after an error. When this is called, it + // means normal writes to the database can be issued and the user can + // initiate any further recovery actions needed + virtual void OnErrorRecoveryCompleted(Status /* old_bg_error */) {} + virtual ~EventListener() {} }; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index c32822c8f..c8ccfbd22 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1983,6 +1983,52 @@ class Benchmark { bool report_file_operations_; bool use_blob_db_; + class ErrorHandlerListener : public EventListener { + public: + ErrorHandlerListener() + : mutex_(), + cv_(&mutex_), + no_auto_recovery_(false), + recovery_complete_(false) {} + + ~ErrorHandlerListener() {} + + void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/, + Status /*bg_error*/, bool* auto_recovery) { + if (*auto_recovery && no_auto_recovery_) { + *auto_recovery = false; + } + } + + void OnErrorRecoveryCompleted(Status /*old_bg_error*/) { + InstrumentedMutexLock l(&mutex_); + recovery_complete_ = true; + cv_.SignalAll(); + } + + bool WaitForRecovery(uint64_t /*abs_time_us*/) { + InstrumentedMutexLock l(&mutex_); + if (!recovery_complete_) { + cv_.Wait(/*abs_time_us*/); + } + if (recovery_complete_) { + recovery_complete_ = false; + return true; + } + return false; + } + + void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; } + + private: + InstrumentedMutex mutex_; + InstrumentedCondVar cv_; + bool no_auto_recovery_; + bool recovery_complete_; + }; + + std::shared_ptr listener_; + bool SanityCheck() { if (FLAGS_compression_ratio > 1) { fprintf(stderr, "compression_ratio should be between 0 and 1\n"); @@ -2318,6 +2364,8 @@ class Benchmark { } } } + + listener_.reset(new ErrorHandlerListener()); } ~Benchmark() { @@ -3500,6 +3548,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { FLAGS_rate_limiter_auto_tuned)); } + options.listeners.emplace_back(listener_); if (FLAGS_num_multi_db <= 1) { OpenDb(options, FLAGS_db, &db_); } else { @@ -3892,6 +3941,10 @@ void VerifyDBFromDB(std::string& truth_db_name) { NewGenericRateLimiter(write_rate)); } } + if (!s.ok()) { + s = listener_->WaitForRecovery(600000000) ? Status::OK() : s; + } + if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); diff --git a/util/delete_scheduler_test.cc b/util/delete_scheduler_test.cc index 6c33f03da..bfd9954de 100644 --- a/util/delete_scheduler_test.cc +++ b/util/delete_scheduler_test.cc @@ -89,7 +89,7 @@ class DeleteSchedulerTest : public testing::Test { std::string data(size, 'A'); EXPECT_OK(f->Append(data)); EXPECT_OK(f->Close()); - sst_file_mgr_->OnAddFile(file_path); + sst_file_mgr_->OnAddFile(file_path, false); return file_path; } diff --git a/util/fault_injection_test_env.h b/util/fault_injection_test_env.h index f2d4aaad2..a8a8e38dc 100644 --- a/util/fault_injection_test_env.h +++ b/util/fault_injection_test_env.h @@ -115,6 +115,16 @@ class FaultInjectionTestEnv : public EnvWrapper { virtual Status RenameFile(const std::string& s, const std::string& t) override; + virtual Status GetFreeSpace(const std::string& path, + uint64_t* disk_free) override { + if (!IsFilesystemActive() && error_ == Status::NoSpace()) { + *disk_free = 0; + return Status::OK(); + } else { + return target()->GetFreeSpace(path, disk_free); + } + } + void WritableFileClosed(const FileState& state); // For every file that is not fully synced, make a call to `func` with diff --git a/util/sst_file_manager_impl.cc b/util/sst_file_manager_impl.cc index 74164e06a..a1280622b 100644 --- a/util/sst_file_manager_impl.cc +++ b/util/sst_file_manager_impl.cc @@ -7,6 +7,7 @@ #include +#include "db/db_impl.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/sst_file_manager.h" @@ -23,20 +24,38 @@ SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr logger, : env_(env), logger_(logger), total_files_size_(0), + in_progress_files_size_(0), compaction_buffer_size_(0), cur_compactions_reserved_size_(0), max_allowed_space_(0), delete_scheduler_(env, rate_bytes_per_sec, logger.get(), this, - max_trash_db_ratio, bytes_max_delete_chunk) {} + max_trash_db_ratio, bytes_max_delete_chunk), + cv_(&mu_), + closing_(false), + bg_thread_(nullptr), + reserved_disk_buffer_(0), + free_space_trigger_(0), + cur_instance_(nullptr) { +} -SstFileManagerImpl::~SstFileManagerImpl() {} +SstFileManagerImpl::~SstFileManagerImpl() { + { + MutexLock l(&mu_); + closing_ = true; + cv_.SignalAll(); + } + if (bg_thread_) { + bg_thread_->join(); + } +} -Status SstFileManagerImpl::OnAddFile(const std::string& file_path) { +Status SstFileManagerImpl::OnAddFile(const std::string& file_path, + bool compaction) { uint64_t file_size; Status s = env_->GetFileSize(file_path, &file_size); if (s.ok()) { MutexLock l(&mu_); - OnAddFileImpl(file_path, file_size); + OnAddFileImpl(file_path, file_size, compaction); } TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile"); return s; @@ -61,6 +80,19 @@ void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) { } } cur_compactions_reserved_size_ -= size_added_by_compaction; + + auto new_files = c->edit()->GetNewFiles(); + for (auto& new_file : new_files) { + auto fn = TableFileName(c->immutable_cf_options()->cf_paths, + new_file.second.fd.GetNumber(), + new_file.second.fd.GetPathId()); + if (in_progress_files_.find(fn) != in_progress_files_.end()) { + auto tracked_file = tracked_files_.find(fn); + assert(tracked_file != tracked_files_.end()); + in_progress_files_size_ -= tracked_file->second; + in_progress_files_.erase(fn); + } + } } Status SstFileManagerImpl::OnMoveFile(const std::string& old_path, @@ -71,7 +103,7 @@ Status SstFileManagerImpl::OnMoveFile(const std::string& old_path, if (file_size != nullptr) { *file_size = tracked_files_[old_path]; } - OnAddFileImpl(new_path, tracked_files_[old_path]); + OnAddFileImpl(new_path, tracked_files_[old_path], false); OnDeleteFileImpl(old_path); } TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile"); @@ -107,7 +139,8 @@ bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() { } bool SstFileManagerImpl::EnoughRoomForCompaction( - const std::vector& inputs) { + ColumnFamilyData* cfd, const std::vector& inputs, + Status bg_error) { MutexLock l(&mu_); uint64_t size_added_by_compaction = 0; // First check if we even have the space to do the compaction @@ -118,15 +151,47 @@ bool SstFileManagerImpl::EnoughRoomForCompaction( } } - if (max_allowed_space_ != 0 && - (size_added_by_compaction + cur_compactions_reserved_size_ + - total_files_size_ + compaction_buffer_size_ > - max_allowed_space_)) { - return false; - } // Update cur_compactions_reserved_size_ so concurrent compaction // don't max out space + size_t needed_headroom = + cur_compactions_reserved_size_ + size_added_by_compaction + + compaction_buffer_size_; + if (max_allowed_space_ != 0 && + (needed_headroom + total_files_size_ > max_allowed_space_)) { + return false; + } + + // Implement more aggressive checks only if this DB instance has already + // seen a NoSpace() error. This is tin order to contain a single potentially + // misbehaving DB instance and prevent it from slowing down compactions of + // other DB instances + if (CheckFreeSpace() && bg_error == Status::NoSpace()) { + auto fn = + TableFileName(cfd->ioptions()->cf_paths, inputs[0][0]->fd.GetNumber(), + inputs[0][0]->fd.GetPathId()); + uint64_t free_space = 0; + env_->GetFreeSpace(fn, &free_space); + // needed_headroom is based on current size reserved by compactions, + // minus any files created by running compactions as they would count + // against the reserved size. If user didn't specify any compaction + // buffer, add reserved_disk_buffer_ that's calculated by default so the + // compaction doesn't end up leaving nothing for logs and flush SSTs + if (compaction_buffer_size_ == 0) { + needed_headroom += reserved_disk_buffer_; + } + needed_headroom -= in_progress_files_size_; + if (free_space < needed_headroom + size_added_by_compaction) { + // We hit the condition of not enough disk space + ROCKS_LOG_ERROR(logger_, "free space [%d bytes] is less than " + "needed headroom [%d bytes]\n", free_space, needed_headroom); + return false; + } + } + cur_compactions_reserved_size_ += size_added_by_compaction; + // Take a snapshot of cur_compactions_reserved_size_ for when we encounter + // a NoSpace error. + free_space_trigger_ = cur_compactions_reserved_size_; return true; } @@ -166,6 +231,169 @@ uint64_t SstFileManagerImpl::GetTotalTrashSize() { return delete_scheduler_.GetTotalTrashSize(); } +void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size, + const std::string& path) { + MutexLock l(&mu_); + + reserved_disk_buffer_ += size; + if (path_.empty()) { + path_ = path; + } +} + +void SstFileManagerImpl::ClearError() { + while (true) { + MutexLock l(&mu_); + + if (closing_) { + return; + } + + uint64_t free_space; + Status s = env_->GetFreeSpace(path_, &free_space); + if (s.ok()) { + // In case of multi-DB instances, some of them may have experienced a + // soft error and some a hard error. In the SstFileManagerImpl, a hard + // error will basically override previously reported soft errors. Once + // we clear the hard error, we don't keep track of previous errors for + // now + if (bg_err_.severity() == Status::Severity::kHardError) { + if (free_space < reserved_disk_buffer_) { + ROCKS_LOG_ERROR(logger_, "free space [%d bytes] is less than " + "required disk buffer [%d bytes]\n", free_space, + reserved_disk_buffer_); + ROCKS_LOG_ERROR(logger_, "Cannot clear hard error\n"); + s = Status::NoSpace(); + } + } else if (bg_err_.severity() == Status::Severity::kSoftError) { + if (free_space < free_space_trigger_) { + ROCKS_LOG_WARN(logger_, "free space [%d bytes] is less than " + "free space for compaction trigger [%d bytes]\n", free_space, + free_space_trigger_); + ROCKS_LOG_WARN(logger_, "Cannot clear soft error\n"); + s = Status::NoSpace(); + } + } + } + + // Someone could have called CancelErrorRecovery() and the list could have + // become empty, so check again here + if (s.ok() && !error_handler_list_.empty()) { + auto error_handler = error_handler_list_.front(); + // Since we will release the mutex, set cur_instance_ to signal to the + // shutdown thread, if it calls // CancelErrorRecovery() the meantime, + // to indicate that this DB instance is busy. The DB instance is + // guaranteed to not be deleted before RecoverFromBGError() returns, + // since the ErrorHandler::recovery_in_prog_ flag would be true + cur_instance_ = error_handler; + mu_.Unlock(); + s = error_handler->RecoverFromBGError(); + mu_.Lock(); + // The DB instance might have been deleted while we were + // waiting for the mutex, so check cur_instance_ to make sure its + // still non-null + if (cur_instance_) { + // Check for error again, since the instance may have recovered but + // immediately got another error. If that's the case, and the new + // error is also a NoSpace() non-fatal error, leave the instance in + // the list + Status err = cur_instance_->GetBGError(); + if (s.ok() && err == Status::NoSpace() && + err.severity() < Status::Severity::kFatalError) { + s = err; + } + cur_instance_ = nullptr; + } + + if (s.ok() || s.IsShutdownInProgress() || + (!s.ok() && s.severity() >= Status::Severity::kFatalError)) { + // If shutdown is in progress, abandon this handler instance + // and continue with the others + error_handler_list_.pop_front(); + } + } + + if (!error_handler_list_.empty()) { + // If there are more instances to be recovered, reschedule after 5 + // seconds + int64_t wait_until = env_->NowMicros() + 5000000; + cv_.TimedWait(wait_until); + } + + // Check again for error_handler_list_ empty, as a DB instance shutdown + // could have removed it from the queue while we were in timed wait + if (error_handler_list_.empty()) { + ROCKS_LOG_INFO(logger_, "Clearing error\n"); + bg_err_ = Status::OK(); + return; + } + } +} + +void SstFileManagerImpl::StartErrorRecovery(ErrorHandler* handler, + Status bg_error) { + MutexLock l(&mu_); + if (bg_error.severity() == Status::Severity::kSoftError) { + if (bg_err_.ok()) { + // Setting bg_err_ basically means we're in degraded mode + // Assume that all pending compactions will fail similarly. The trigger + // for clearing this condition is set to current compaction reserved + // size, so we stop checking disk space available in + // EnoughRoomForCompaction once this much free space is available + bg_err_ = bg_error; + } + } else if (bg_error.severity() == Status::Severity::kHardError) { + bg_err_ = bg_error; + } else { + assert(false); + } + + // If this is the first instance of this error, kick of a thread to poll + // and recover from this condition + if (error_handler_list_.empty()) { + error_handler_list_.push_back(handler); + // Release lock before calling join. Its ok to do so because + // error_handler_list_ is now non-empty, so no other invocation of this + // function will execute this piece of code + mu_.Unlock(); + if (bg_thread_) { + bg_thread_->join(); + } + // Start a new thread. The previous one would have exited. + bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this)); + mu_.Lock(); + } else { + // Check if this DB instance is already in the list + for (auto iter = error_handler_list_.begin(); + iter != error_handler_list_.end(); ++iter) { + if ((*iter) == handler) { + return; + } + } + error_handler_list_.push_back(handler); + } +} + +bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) { + MutexLock l(&mu_); + + if (cur_instance_ == handler) { + // This instance is currently busy attempting to recover + // Nullify it so the recovery thread doesn't attempt to access it again + cur_instance_ = nullptr; + return false; + } + + for (auto iter = error_handler_list_.begin(); + iter != error_handler_list_.end(); ++iter) { + if ((*iter) == handler) { + error_handler_list_.erase(iter); + return true; + } + } + return false; +} + Status SstFileManagerImpl::ScheduleFileDeletion( const std::string& file_path, const std::string& path_to_sync) { return delete_scheduler_.DeleteFile(file_path, path_to_sync); @@ -176,14 +404,24 @@ void SstFileManagerImpl::WaitForEmptyTrash() { } void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path, - uint64_t file_size) { + uint64_t file_size, bool compaction) { auto tracked_file = tracked_files_.find(file_path); if (tracked_file != tracked_files_.end()) { // File was added before, we will just update the size + assert(!compaction); total_files_size_ -= tracked_file->second; total_files_size_ += file_size; + cur_compactions_reserved_size_ -= file_size; } else { total_files_size_ += file_size; + if (compaction) { + // Keep track of the size of files created by in-progress compactions. + // When calculating whether there's enough headroom for new compactions, + // this will be subtracted from cur_compactions_reserved_size_. + // Otherwise, compactions will be double counted. + in_progress_files_size_ += file_size; + in_progress_files_.insert(file_path); + } } tracked_files_[file_path] = file_size; } @@ -192,10 +430,16 @@ void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) { auto tracked_file = tracked_files_.find(file_path); if (tracked_file == tracked_files_.end()) { // File is not tracked + assert(in_progress_files_.find(file_path) == in_progress_files_.end()); return; } total_files_size_ -= tracked_file->second; + // Check if it belonged to an in-progress compaction + if (in_progress_files_.find(file_path) != in_progress_files_.end()) { + in_progress_files_size_ -= tracked_file->second; + in_progress_files_.erase(file_path); + } tracked_files_.erase(tracked_file); } diff --git a/util/sst_file_manager_impl.h b/util/sst_file_manager_impl.h index 90815d44f..8c8015310 100644 --- a/util/sst_file_manager_impl.h +++ b/util/sst_file_manager_impl.h @@ -12,6 +12,7 @@ #include "port/port.h" #include "db/compaction.h" +#include "db/error_handler.h" #include "rocksdb/sst_file_manager.h" #include "util/delete_scheduler.h" @@ -33,7 +34,7 @@ class SstFileManagerImpl : public SstFileManager { ~SstFileManagerImpl(); // DB will call OnAddFile whenever a new sst file is added. - Status OnAddFile(const std::string& file_path); + Status OnAddFile(const std::string& file_path, bool compaction = false); // DB will call OnDeleteFile whenever an sst file is deleted. Status OnDeleteFile(const std::string& file_path); @@ -67,7 +68,9 @@ class SstFileManagerImpl : public SstFileManager { // estimates how much space is currently being used by compactions (i.e. // if a compaction has started, this function bumps the used space by // the full compaction size). - bool EnoughRoomForCompaction(const std::vector& inputs); + bool EnoughRoomForCompaction(ColumnFamilyData* cfd, + const std::vector& inputs, + Status bg_error); // Bookkeeping so total_file_sizes_ goes back to normal after compaction // finishes @@ -96,6 +99,18 @@ class SstFileManagerImpl : public SstFileManager { // Return the total size of trash files uint64_t GetTotalTrashSize() override; + // Called by each DB instance using this sst file manager to reserve + // disk buffer space for recovery from out of space errors + void ReserveDiskBuffer(uint64_t buffer, const std::string& path); + + // Set a flag upon encountering disk full. May enqueue the ErrorHandler + // instance for background polling and recovery + void StartErrorRecovery(ErrorHandler* db, Status bg_error); + + // Remove the given Errorhandler instance from the recovery queue. Its + // not guaranteed + bool CancelErrorRecovery(ErrorHandler* db); + // Mark file as trash and schedule it's deletion. virtual Status ScheduleFileDeletion(const std::string& file_path, const std::string& dir_to_sync); @@ -108,16 +123,24 @@ class SstFileManagerImpl : public SstFileManager { private: // REQUIRES: mutex locked - void OnAddFileImpl(const std::string& file_path, uint64_t file_size); + void OnAddFileImpl(const std::string& file_path, uint64_t file_size, + bool compaction); // REQUIRES: mutex locked void OnDeleteFileImpl(const std::string& file_path); + void ClearError(); + bool CheckFreeSpace() { + return bg_err_.severity() == Status::Severity::kSoftError; + } + Env* env_; std::shared_ptr logger_; // Mutex to protect tracked_files_, total_files_size_ port::Mutex mu_; // The summation of the sizes of all files in tracked_files_ map uint64_t total_files_size_; + // The summation of all output files of in-progress compactions + uint64_t in_progress_files_size_; // Compactions should only execute if they can leave at least // this amount of buffer space for logs and flushes uint64_t compaction_buffer_size_; @@ -126,10 +149,32 @@ class SstFileManagerImpl : public SstFileManager { // A map containing all tracked files and there sizes // file_path => file_size std::unordered_map tracked_files_; + // A set of files belonging to in-progress compactions + std::unordered_set in_progress_files_; // The maximum allowed space (in bytes) for sst files. uint64_t max_allowed_space_; // DeleteScheduler used to throttle file deletition. DeleteScheduler delete_scheduler_; + port::CondVar cv_; + // Flag to force error recovery thread to exit + bool closing_; + // Background error recovery thread + std::unique_ptr bg_thread_; + // A path in the filesystem corresponding to this SFM. This is used for + // calling Env::GetFreeSpace. Posix requires a path in the filesystem + std::string path_; + // Save the current background error + Status bg_err_; + // Amount of free disk headroom before allowing recovery from hard errors + uint64_t reserved_disk_buffer_; + // For soft errors, amount of free disk space before we can allow + // compactions to run full throttle. If disk space is below this trigger, + // compactions will be gated by free disk space > input size + uint64_t free_space_trigger_; + // List of database error handler instances tracked by this sst file manager + std::list error_handler_list_; + // Pointer to ErrorHandler instance that is currently processing recovery + ErrorHandler* cur_instance_; }; } // namespace rocksdb