From 3514cf51fa4e2941d731e4425c86276de4be7795 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Sat, 27 Jun 2020 08:55:49 -0700 Subject: [PATCH] Fix data race to VersionSet::io_status_ (#7034) Summary: After https://github.com/facebook/rocksdb/issues/6949 , VersionSet::io_status_ can be concurrently accessed by multiple threads without lock, causing tsan test to fail. For example, a bg flush thread resets io_status_ before calling LogAndApply(), while another thread already in the process of LogAndApply() reads io_status_. This is a bug. We do not have to reset io_status_ each time we call LogAndApply(). io_status_ is part of the state of VersionSet, and it indicates the outcome of preceding MANIFEST/CURRENT files IO operations. Its value should be updated only when: 1. MANIFEST/CURRENT files IO fail for the first time. 2. MANIFEST/CURRENT files IO succeed as part of recovering from a prior failure without process restart, e.g. calling Resume(). Test Plan (devserver): COMPILE_WITH_TSAN=1 make check COMPILE_WITH_TSAN=1 make db_test2 ./db_test2 --gtest_filter=DBTest2.CompactionStall Pull Request resolved: https://github.com/facebook/rocksdb/pull/7034 Reviewed By: zhichao-cao Differential Revision: D22247137 Pulled By: riversand963 fbshipit-source-id: 77b83e05390f3ee3cd2d96d3fdd6fe4f225e3216 --- db/compaction/compaction_job.cc | 1 - db/db_impl/db_impl_compaction_flush.cc | 2 - db/memtable_list.cc | 1 - db/version_set.cc | 51 +++++++++++++------------- db/version_set.h | 7 +--- 5 files changed, 28 insertions(+), 34 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 46e685abd..9908bb180 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -721,7 +721,6 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { cfd->internal_stats()->AddCompactionStats( compact_->compaction->output_level(), thread_pri_, compaction_stats_); - versions_->SetIOStatus(IOStatus::OK()); if (status.ok()) { status = InstallCompactionResults(mutable_cf_options); } diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 1ab4cf523..f93d70551 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2703,7 +2703,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, for (const auto& f : *c->inputs(0)) { c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); } - versions_->SetIOStatus(IOStatus::OK()); status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); @@ -2761,7 +2760,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } } - versions_->SetIOStatus(IOStatus::OK()); status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 89de07f3d..f38ead559 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -470,7 +470,6 @@ Status MemTableList::TryInstallMemtableFlushResults( } // this can release and reacquire the mutex. - vset->SetIOStatus(IOStatus::OK()); s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory); *io_s = vset->io_status(); diff --git a/db/version_set.cc b/db/version_set.cc index 8696b57f3..db282a51e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3878,10 +3878,6 @@ Status VersionSet::ProcessManifestWrites( } #endif // NDEBUG - uint64_t new_manifest_file_size = 0; - Status s; - IOStatus io_s; - assert(pending_manifest_file_number_ == 0); if (!descriptor_log_ || manifest_file_size_ > db_options_->max_manifest_file_size) { @@ -3911,6 +3907,9 @@ Status VersionSet::ProcessManifestWrites( } } + uint64_t new_manifest_file_size = 0; + Status s; + IOStatus io_s; { FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_); mu->Unlock(); @@ -3947,9 +3946,9 @@ Status VersionSet::ProcessManifestWrites( std::string descriptor_fname = DescriptorFileName(dbname_, pending_manifest_file_number_); std::unique_ptr descriptor_file; - s = NewWritableFile(fs_, descriptor_fname, &descriptor_file, - opt_file_opts); - if (s.ok()) { + io_s = NewWritableFile(fs_, descriptor_fname, &descriptor_file, + opt_file_opts); + if (io_s.ok()) { descriptor_file->SetPreallocationBlockSize( db_options_->manifest_preallocation_size); @@ -3958,7 +3957,10 @@ Status VersionSet::ProcessManifestWrites( nullptr, db_options_->listeners)); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); - s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get()); + s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(), + io_s); + } else { + s = io_s; } } @@ -3994,7 +3996,6 @@ Status VersionSet::ProcessManifestWrites( #endif /* !NDEBUG */ io_s = descriptor_log_->AddRecord(record); if (!io_s.ok()) { - io_status_ = io_s; s = io_s; break; } @@ -4005,12 +4006,9 @@ Status VersionSet::ProcessManifestWrites( "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s); } if (!io_s.ok()) { - io_status_ = io_s; s = io_s; ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n", s.ToString().c_str()); - } else if (io_status_.IsIOError()) { - io_status_ = io_s; } } @@ -4020,10 +4018,7 @@ Status VersionSet::ProcessManifestWrites( io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_, db_directory); if (!io_s.ok()) { - io_status_ = io_s; s = io_s; - } else if (io_status_.IsIOError()) { - io_status_ = io_s; } TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest"); } @@ -4044,6 +4039,14 @@ Status VersionSet::ProcessManifestWrites( mu->Lock(); } + if (!io_s.ok()) { + if (io_status_.ok()) { + io_status_ = io_s; + } + } else if (!io_status_.ok()) { + io_status_ = io_s; + } + // Append the old manifest file to the obsolete_manifest_ list to be deleted // by PurgeObsoleteFiles later. if (s.ok() && new_descriptor_log) { @@ -5297,7 +5300,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { Status VersionSet::WriteCurrentStateToManifest( const std::unordered_map& curr_state, - log::Writer* log) { + log::Writer* log, IOStatus& io_s) { // TODO: Break up into multiple records to reduce memory usage on recovery? // WARNING: This method doesn't hold a mutex!! @@ -5306,6 +5309,7 @@ Status VersionSet::WriteCurrentStateToManifest( // LogAndApply. Column family manipulations can only happen within LogAndApply // (the same single thread), so we're safe to iterate. + assert(io_s.ok()); if (db_options_->write_dbid_to_manifest) { VersionEdit edit_for_db_id; assert(!db_id_.empty()); @@ -5315,10 +5319,9 @@ Status VersionSet::WriteCurrentStateToManifest( return Status::Corruption("Unable to Encode VersionEdit:" + edit_for_db_id.DebugString(true)); } - IOStatus io_s = log->AddRecord(db_id_record); + io_s = log->AddRecord(db_id_record); if (!io_s.ok()) { - io_status_ = io_s; - return std::move(io_s); + return io_s; } } @@ -5345,10 +5348,9 @@ Status VersionSet::WriteCurrentStateToManifest( return Status::Corruption( "Unable to Encode VersionEdit:" + edit.DebugString(true)); } - IOStatus io_s = log->AddRecord(record); + io_s = log->AddRecord(record); if (!io_s.ok()) { - io_status_ = io_s; - return std::move(io_s); + return io_s; } } @@ -5398,10 +5400,9 @@ Status VersionSet::WriteCurrentStateToManifest( return Status::Corruption( "Unable to Encode VersionEdit:" + edit.DebugString(true)); } - IOStatus io_s = log->AddRecord(record); + io_s = log->AddRecord(record); if (!io_s.ok()) { - io_status_ = io_s; - return std::move(io_s); + return io_s; } } } diff --git a/db/version_set.h b/db/version_set.h index 16661e097..829a7d0cf 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1159,10 +1159,7 @@ class VersionSet { static uint64_t GetTotalSstFilesSize(Version* dummy_versions); // Get the IO Status returned by written Manifest. - IOStatus io_status() const { return io_status_; } - - // Set the IO Status to OK. Called before Manifest write if needed. - void SetIOStatus(const IOStatus& s) { io_status_ = s; } + const IOStatus& io_status() const { return io_status_; } protected: using VersionBuilderMap = @@ -1205,7 +1202,7 @@ class VersionSet { // Save current contents to *log Status WriteCurrentStateToManifest( const std::unordered_map& curr_state, - log::Writer* log); + log::Writer* log, IOStatus& io_s); void AppendVersion(ColumnFamilyData* column_family_data, Version* v);