From fe1147db1c4a0e60156e45be81117cff946bac23 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 2 Dec 2019 17:43:37 -0800 Subject: [PATCH] Let DBSecondary close files after catch up (#6114) Summary: After secondary instance replays the logs from primary, certain files become obsolete. The secondary should find these files, evict their table readers from table cache and close them. If this is not done, the secondary will hold on to these files and prevent their space from being freed. Test plan (devserver): ``` $./db_secondary_test --gtest_filter=DBSecondaryTest.SecondaryCloseFiles $make check $./db_stress -ops_per_thread=100000 -enable_secondary=true -threads=32 -secondary_catch_up_one_in=10000 -clear_column_family_one_in=1000 -reopen=100 ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/6114 Differential Revision: D18769998 Pulled By: riversand963 fbshipit-source-id: 5d1f151567247196164e1b79d8402fa2045b9120 --- db/db_impl/db_impl.h | 2 + db/db_impl/db_impl_files.cc | 8 +++- db/db_impl/db_impl_secondary.cc | 85 ++++++++++++++++++++------------- db/db_impl/db_impl_secondary.h | 26 ++++++++++ db/db_impl/db_secondary_test.cc | 84 ++++++++++++++++++++++++++++++++ 5 files changed, 170 insertions(+), 35 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index fe97e08be..67a81259d 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1109,6 +1109,8 @@ class DBImpl : public DB { bool read_only = false, bool error_if_log_file_exist = false, bool error_if_data_exists_in_logs = false); + virtual bool OwnTablesAndLogs() const { return true; } + private: friend class DB; friend class ErrorHandler; diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 1fa288406..d6053e8be 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -385,6 +385,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { w->Close(); } + bool own_files = OwnTablesAndLogs(); std::unordered_set files_to_del; for (const auto& candidate_file : candidate_files) { const std::string& to_delete = candidate_file.file_name; @@ -484,6 +485,12 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { } #endif // !ROCKSDB_LITE + // If I do not own these files, e.g. secondary instance with max_open_files + // = -1, then no need to delete or schedule delete these files since they + // will be removed by their owner, e.g. the primary instance. + if (!own_files) { + continue; + } Status file_deletion_status; if (schedule_only) { InstrumentedMutexLock guard_lock(&mutex_); @@ -495,7 +502,6 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { { // After purging obsolete files, remove them from files_grabbed_for_purge_. - // Use a temporary vector to perform bulk deletion via swap. InstrumentedMutexLock guard_lock(&mutex_); autovector to_be_removed; for (auto fn : files_grabbed_for_purge_) { diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 8eac41ded..a113019a9 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -11,6 +11,7 @@ #include "db/merge_context.h" #include "logging/auto_roll_logger.h" #include "monitoring/perf_context_imp.h" +#include "util/cast_util.h" namespace rocksdb { @@ -497,45 +498,61 @@ Status DBImplSecondary::TryCatchUpWithPrimary() { // read the manifest and apply new changes to the secondary instance std::unordered_set cfds_changed; JobContext job_context(0, true /*create_superversion*/); - InstrumentedMutexLock lock_guard(&mutex_); - s = static_cast(versions_.get()) - ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); + { + InstrumentedMutexLock lock_guard(&mutex_); + s = static_cast_with_check(versions_.get()) + ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); - ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, - static_cast(versions_->LastSequence())); - for (ColumnFamilyData* cfd : cfds_changed) { - if (cfd->IsDropped()) { - ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", - cfd->GetName().c_str()); - continue; + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, + static_cast(versions_->LastSequence())); + for (ColumnFamilyData* cfd : cfds_changed) { + if (cfd->IsDropped()) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", + cfd->GetName().c_str()); + continue; + } + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, + "[%s] Level summary: %s\n", cfd->GetName().c_str(), + cfd->current()->storage_info()->LevelSummary(&tmp)); } - VersionStorageInfo::LevelSummaryStorage tmp; - ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] Level summary: %s\n", - cfd->GetName().c_str(), - cfd->current()->storage_info()->LevelSummary(&tmp)); - } - // list wal_dir to discover new WALs and apply new changes to the secondary - // instance - if (s.ok()) { - s = FindAndRecoverLogFiles(&cfds_changed, &job_context); - } - if (s.IsPathNotFound()) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Secondary tries to read WAL, but WAL file(s) have already " - "been purged by primary."); - s = Status::OK(); - } - if (s.ok()) { - for (auto cfd : cfds_changed) { - cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), - &job_context.memtables_to_free); - auto& sv_context = job_context.superversion_contexts.back(); - cfd->InstallSuperVersion(&sv_context, &mutex_); - sv_context.NewSuperVersion(); + // list wal_dir to discover new WALs and apply new changes to the secondary + // instance + if (s.ok()) { + s = FindAndRecoverLogFiles(&cfds_changed, &job_context); + } + if (s.IsPathNotFound()) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Secondary tries to read WAL, but WAL file(s) have already " + "been purged by primary."); + s = Status::OK(); + } + if (s.ok()) { + for (auto cfd : cfds_changed) { + cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), + &job_context.memtables_to_free); + auto& sv_context = job_context.superversion_contexts.back(); + cfd->InstallSuperVersion(&sv_context, &mutex_); + sv_context.NewSuperVersion(); + } } - job_context.Clean(); } + job_context.Clean(); + + // Cleanup unused, obsolete files. + JobContext purge_files_job_context(0); + { + InstrumentedMutexLock lock_guard(&mutex_); + // Currently, secondary instance does not own the database files, thus it + // is unnecessary for the secondary to force full scan. + FindObsoleteFiles(&purge_files_job_context, /*force=*/false); + } + if (purge_files_job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(purge_files_job_context); + } + purge_files_job_context.Clean(); return s; } diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index ca853e258..99887f55b 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -172,6 +172,24 @@ class DBImplSecondary : public DBImpl { return Status::NotSupported("Not supported operation in secondary mode."); } + using DBImpl::SetDBOptions; + Status SetDBOptions(const std::unordered_map& + /*options_map*/) override { + // Currently not supported because changing certain options may cause + // flush/compaction. + return Status::NotSupported("Not supported operation in secondary mode."); + } + + using DBImpl::SetOptions; + Status SetOptions( + ColumnFamilyHandle* /*cfd*/, + const std::unordered_map& /*options_map*/) + override { + // Currently not supported because changing certain options may cause + // flush/compaction and/or write to MANIFEST. + return Status::NotSupported("Not supported operation in secondary mode."); + } + using DBImpl::SyncWAL; Status SyncWAL() override { return Status::NotSupported("Not supported operation in secondary mode."); @@ -269,6 +287,14 @@ class DBImplSecondary : public DBImpl { return s; } + bool OwnTablesAndLogs() const override { + // Currently, the secondary instance does not own the database files. It + // simply opens the files of the primary instance and tracks their file + // descriptors until they become obsolete. In the future, the secondary may + // create links to database files. OwnTablesAndLogs will return true then. + return false; + } + private: friend class DB; diff --git a/db/db_impl/db_secondary_test.cc b/db/db_impl/db_secondary_test.cc index 6caff005e..aabe61b41 100644 --- a/db/db_impl/db_secondary_test.cc +++ b/db/db_impl/db_secondary_test.cc @@ -195,6 +195,90 @@ TEST_F(DBSecondaryTest, OpenAsSecondary) { verify_db_func("new_foo_value", "new_bar_value"); } +namespace { +class TraceFileEnv : public EnvWrapper { + public: + explicit TraceFileEnv(Env* target) : EnvWrapper(target) {} + Status NewRandomAccessFile(const std::string& f, + std::unique_ptr* r, + const EnvOptions& env_options) override { + class TracedRandomAccessFile : public RandomAccessFile { + public: + TracedRandomAccessFile(std::unique_ptr&& target, + std::atomic& counter) + : target_(std::move(target)), files_closed_(counter) {} + ~TracedRandomAccessFile() override { + files_closed_.fetch_add(1, std::memory_order_relaxed); + } + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + return target_->Read(offset, n, result, scratch); + } + + private: + std::unique_ptr target_; + std::atomic& files_closed_; + }; + Status s = target()->NewRandomAccessFile(f, r, env_options); + if (s.ok()) { + r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_)); + } + return s; + } + + int files_closed() const { + return files_closed_.load(std::memory_order_relaxed); + } + + private: + std::atomic files_closed_{0}; +}; +} // namespace + +TEST_F(DBSecondaryTest, SecondaryCloseFiles) { + Options options; + options.env = env_; + options.max_open_files = 1; + options.disable_auto_compactions = true; + Reopen(options); + Options options1; + std::unique_ptr traced_env(new TraceFileEnv(env_)); + options1.env = traced_env.get(); + OpenSecondary(options1); + + static const auto verify_db = [&]() { + std::unique_ptr iter1(dbfull()->NewIterator(ReadOptions())); + std::unique_ptr iter2(db_secondary_->NewIterator(ReadOptions())); + for (iter1->SeekToFirst(), iter2->SeekToFirst(); + iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) { + ASSERT_EQ(iter1->key(), iter2->key()); + ASSERT_EQ(iter1->value(), iter2->value()); + } + ASSERT_FALSE(iter1->Valid()); + ASSERT_FALSE(iter2->Valid()); + }; + + ASSERT_OK(Put("a", "value")); + ASSERT_OK(Put("c", "value")); + ASSERT_OK(Flush()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(); + + ASSERT_OK(Put("b", "value")); + ASSERT_OK(Put("d", "value")); + ASSERT_OK(Flush()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(); + + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + ASSERT_EQ(2, static_cast(traced_env.get())->files_closed()); + + Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}}); + ASSERT_TRUE(s.IsNotSupported()); + CloseSecondary(); +} + TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) { Options options; options.env = env_;