From e3a93c9ee1aa3bcf2e6981da30dbf375ed1b2128 Mon Sep 17 00:00:00 2001 From: Tomas Kolda Date: Tue, 8 Oct 2019 14:18:48 -0700 Subject: [PATCH] Fix crash when background task fails (#5879) Summary: Fixing crash. Full story in issue: https://github.com/facebook/rocksdb/issues/5878 Pull Request resolved: https://github.com/facebook/rocksdb/pull/5879 Differential Revision: D17812299 Pulled By: anand1976 fbshipit-source-id: 14e5a4fc502ade974583da9692d0ed6e5014613a --- HISTORY.md | 1 + db/db_impl/db_impl.cc | 22 +++++++++++++--------- db/db_impl/db_impl.h | 5 +++-- db/db_impl/db_impl_compaction_flush.cc | 15 +++++++++------ db/db_impl/db_impl_open.cc | 5 +++-- 5 files changed, 29 insertions(+), 19 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 2431688f5..41d1078c7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ * Fix a bug in file ingestion caused by incorrect file number allocation when the number of column families involved in the ingestion exceeds 2. * Fix a bug when format_version=3, partitioned fitlers, and prefix search are used in conjunction. The bug could result into Seek::(prefix) returning NotFound for an existing prefix. * Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strong results when reseek happens with a different iterator upper bound. +* Fix a bug causing a crash during ingest external file when background compaction cause severe error (file not found). ### New Features * Introduced DBOptions::max_write_batch_group_size_bytes to configure maximum limit on number of bytes that are written in a single batch of WAL or memtable write. It is followed when the leader write size is larger than 1/8 of this limit. * VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 1c2df575d..d3b001d25 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2904,8 +2904,11 @@ DBImpl::CaptureCurrentFileNumberInPendingOutputs() { } void DBImpl::ReleaseFileNumberFromPendingOutputs( - std::list::iterator v) { - pending_outputs_.erase(v); + std::unique_ptr::iterator>& v) { + if (v.get() != nullptr) { + pending_outputs_.erase(*v.get()); + v.reset(); + } } #ifndef ROCKSDB_LITE @@ -3744,7 +3747,7 @@ Status DBImpl::IngestExternalFiles( // TODO (yanqin) maybe handle the case in which column_families have // duplicates - std::list::iterator pending_output_elem; + std::unique_ptr::iterator> pending_output_elem; size_t total = 0; for (const auto& arg : args) { total += arg.external_files.size(); @@ -3752,7 +3755,7 @@ Status DBImpl::IngestExternalFiles( uint64_t next_file_number = 0; Status status = ReserveFileNumbersBeforeIngestion( static_cast(args[0].column_family)->cfd(), total, - &pending_output_elem, &next_file_number); + pending_output_elem, &next_file_number); if (!status.ok()) { InstrumentedMutexLock l(&mutex_); ReleaseFileNumberFromPendingOutputs(pending_output_elem); @@ -4026,7 +4029,7 @@ Status DBImpl::CreateColumnFamilyWithImport( SuperVersionContext dummy_sv_ctx(/* create_superversion */ true); VersionEdit dummy_edit; uint64_t next_file_number = 0; - std::list::iterator pending_output_elem; + std::unique_ptr::iterator> pending_output_elem; { // Lock db mutex InstrumentedMutexLock l(&mutex_); @@ -4036,7 +4039,8 @@ Status DBImpl::CreateColumnFamilyWithImport( } // Make sure that bg cleanup wont delete the files that we are importing - pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); + pending_output_elem.reset(new std::list::iterator( + CaptureCurrentFileNumberInPendingOutputs())); if (status.ok()) { // If crash happen after a hard link established, Recover function may @@ -4254,18 +4258,18 @@ Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, Status DBImpl::ReserveFileNumbersBeforeIngestion( ColumnFamilyData* cfd, uint64_t num, - std::list::iterator* pending_output_elem, + std::unique_ptr::iterator>& pending_output_elem, uint64_t* next_file_number) { Status s; SuperVersionContext dummy_sv_ctx(true /* create_superversion */); - assert(nullptr != pending_output_elem); assert(nullptr != next_file_number); InstrumentedMutexLock l(&mutex_); if (error_handler_.IsDBStopped()) { // Do not ingest files when there is a bg_error return error_handler_.GetBGError(); } - *pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); + pending_output_elem.reset(new std::list::iterator( + CaptureCurrentFileNumberInPendingOutputs())); *next_file_number = versions_->FetchAddFileNumber(static_cast(num)); auto cf_options = cfd->GetLatestMutableCFOptions(); VersionEdit dummy_edit; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 84acc8452..c1f4e66b9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1313,7 +1313,8 @@ class DBImpl : public DB { // created between the calls CaptureCurrentFileNumberInPendingOutputs() and // ReleaseFileNumberFromPendingOutputs() can now be deleted (if it's not live // and blocked by any other pending_outputs_ calls) - void ReleaseFileNumberFromPendingOutputs(std::list::iterator v); + void ReleaseFileNumberFromPendingOutputs( + std::unique_ptr::iterator>& v); Status SyncClosedLogs(JobContext* job_context); @@ -1605,7 +1606,7 @@ class DBImpl : public DB { // Write a version edit to the MANIFEST. Status ReserveFileNumbersBeforeIngestion( ColumnFamilyData* cfd, uint64_t num, - std::list::iterator* pending_output_elem, + std::unique_ptr::iterator>& pending_output_elem, uint64_t* next_file_number); #endif //! ROCKSDB_LITE diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 2c8be2847..8e4dc411f 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -969,8 +969,9 @@ Status DBImpl::CompactFilesImpl( GetSnapshotContext(job_context, &snapshot_seqs, &earliest_write_conflict_snapshot, &snapshot_checker); - auto pending_outputs_inserted_elem = - CaptureCurrentFileNumberInPendingOutputs(); + std::unique_ptr::iterator> pending_outputs_inserted_elem( + new std::list::iterator( + CaptureCurrentFileNumberInPendingOutputs())); assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJobStats compaction_job_stats; @@ -2216,8 +2217,9 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { assert(bg_flush_scheduled_); num_running_flushes_++; - auto pending_outputs_inserted_elem = - CaptureCurrentFileNumberInPendingOutputs(); + std::unique_ptr::iterator> + pending_outputs_inserted_elem(new std::list::iterator( + CaptureCurrentFileNumberInPendingOutputs())); FlushReason reason; Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer, @@ -2298,8 +2300,9 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, num_running_compactions_++; - auto pending_outputs_inserted_elem = - CaptureCurrentFileNumberInPendingOutputs(); + std::unique_ptr::iterator> + pending_outputs_inserted_elem(new std::list::iterator( + CaptureCurrentFileNumberInPendingOutputs())); assert((bg_thread_pri == Env::Priority::BOTTOM && bg_bottom_compaction_scheduled_) || diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index b02676bb4..44f6e6e23 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1136,8 +1136,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; - auto pending_outputs_inserted_elem = - CaptureCurrentFileNumberInPendingOutputs(); + std::unique_ptr::iterator> pending_outputs_inserted_elem( + new std::list::iterator( + CaptureCurrentFileNumberInPendingOutputs())); meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); ReadOptions ro; ro.total_order_seek = true;