From 5efec84c606d715090521f00916f2099fa743674 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Fri, 20 Aug 2021 11:37:53 -0700 Subject: [PATCH] Fix blob callback in compaction and atomic flush (#8681) Summary: Pass BlobFileCompletionCallback in case of atomic flush and compaction job which is currently nullptr(default parameter). BlobFileCompletionCallback is used in case of IntegratedBlobDB to report new blob files to SstFileManager. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8681 Test Plan: CircleCI jobs Reviewed By: ltamasi Differential Revision: D30445998 Pulled By: akankshamahajan15 fbshipit-source-id: ba48093843864faec57f1f365cce7b5a569c4021 --- HISTORY.md | 1 + db/db_impl/db_impl_compaction_flush.cc | 7 +- db/db_sst_test.cc | 90 ++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 3 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index aa728f412..a424099e3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Fixed an issue where `OnFlushCompleted` was not called for atomic flush. * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`. * Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction. +* Fixed passing of BlobFileCompletionCallback to Compaction job and Atomic flush job which was default paramter (nullptr). BlobFileCompletitionCallback is internal callback that manages addition of blob files to SSTFileManager. ### New Features * Made the EventListener extend the Customizable class. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index df849d519..b1679d756 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -411,7 +411,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, thread_pri, io_tracer_, db_id_, db_session_id_, - cfd->GetFullHistoryTsLow())); + cfd->GetFullHistoryTsLow(), &blob_callback_)); } std::vector file_meta(num_cfs); @@ -1280,7 +1280,7 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, &manual_compaction_paused_, nullptr, db_id_, db_session_id_, - c->column_family_data()->GetFullHistoryTsLow()); + c->column_family_data()->GetFullHistoryTsLow(), &blob_callback_); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -3193,7 +3193,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, &compaction_job_stats, thread_pri, io_tracer_, is_manual ? &manual_compaction_paused_ : nullptr, is_manual ? manual_compaction->canceled : nullptr, db_id_, - db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); + db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), + &blob_callback_); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 77984758f..cba6fb0b7 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -1569,6 +1569,96 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) { ASSERT_EQ(total_sst_files_size, 0); } +// This test if blob files are recorded by SST File Manager when Compaction job +// creates/delete them and in case of AtomicFlush. +TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.enable_blob_files = true; + options.min_blob_size = 0; + options.disable_auto_compactions = true; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + options.atomic_flush = true; + + int files_added = 0; + int files_deleted = 0; + int files_scheduled_to_delete = 0; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnAddFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + files_added++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnDeleteFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + files_deleted++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) { + assert(arg); + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + ++files_scheduled_to_delete; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndReopen(options); + Random rnd(301); + + ASSERT_OK(Put("key_1", "value_1")); + ASSERT_OK(Put("key_2", "value_2")); + ASSERT_OK(Put("key_3", "value_3")); + ASSERT_OK(Put("key_4", "value_4")); + ASSERT_OK(Flush()); + + // Overwrite will create the garbage data. + ASSERT_OK(Put("key_3", "new_value_3")); + ASSERT_OK(Put("key_4", "new_value_4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + ASSERT_EQ(files_added, 3); + ASSERT_EQ(files_deleted, 0); + ASSERT_EQ(files_scheduled_to_delete, 0); + files_added = 0; + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + // Compaction job will create a new file and delete the older files. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + ASSERT_EQ(files_added, 1); + ASSERT_EQ(files_deleted, 1); + ASSERT_EQ(files_scheduled_to_delete, 1); + + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + sfm->WaitForEmptyTrash(); + ASSERT_EQ(files_deleted, 4); + ASSERT_EQ(files_scheduled_to_delete, 4); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + #endif // ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE