diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index e4312bbcf..6f5e22258 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -590,6 +590,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( autovector*> mems_list; autovector mutable_cf_options_list; autovector tmp_file_meta; + autovector>*> + committed_flush_jobs_info; for (int i = 0; i != num_cfs; ++i) { const auto& mems = jobs[i]->GetMemTables(); if (!cfds[i]->IsDropped() && !mems.empty()) { @@ -597,13 +599,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( mems_list.emplace_back(&mems); mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]); tmp_file_meta.emplace_back(&file_meta[i]); +#ifndef ROCKSDB_LITE + committed_flush_jobs_info.emplace_back( + jobs[i]->GetCommittedFlushJobsInfo()); +#endif //! ROCKSDB_LITE } } s = InstallMemtableAtomicFlushResults( nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list, versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta, - &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer); + committed_flush_jobs_info, &job_context->memtables_to_free, + directories_.GetDbDir(), log_buffer); } if (s.ok()) { diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 2366da201..6c5baec3d 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -418,12 +418,19 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { for (auto cfd : all_cfds) { mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); } + autovector>*> + committed_flush_jobs_info; +#ifndef ROCKSDB_LITE + for (auto& job : flush_jobs) { + committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo()); + } +#endif //! ROCKSDB_LITE Status s = InstallMemtableAtomicFlushResults( nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list, versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs, - &job_context.memtables_to_free, nullptr /* db_directory */, - nullptr /* log_buffer */); + committed_flush_jobs_info, &job_context.memtables_to_free, + nullptr /* db_directory */, nullptr /* log_buffer */); ASSERT_OK(s); mutex_.Unlock(); diff --git a/db/listener_test.cc b/db/listener_test.cc index c2fb90b56..c1e68d42c 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -356,32 +356,38 @@ TEST_F(EventListenerTest, MultiCF) { #ifdef ROCKSDB_USING_THREAD_STATUS options.enable_thread_tracking = true; #endif // ROCKSDB_USING_THREAD_STATUS - TestFlushListener* listener = new TestFlushListener(options.env, this); - options.listeners.emplace_back(listener); - options.table_properties_collector_factories.push_back( - std::make_shared()); - std::vector cf_names = { - "pikachu", "ilya", "muromec", "dobrynia", - "nikitich", "alyosha", "popovich"}; - CreateAndReopenWithCF(cf_names, options); + for (auto atomic_flush : {false, true}) { + options.atomic_flush = atomic_flush; + options.create_if_missing = true; + DestroyAndReopen(options); + TestFlushListener* listener = new TestFlushListener(options.env, this); + options.listeners.emplace_back(listener); + options.table_properties_collector_factories.push_back( + std::make_shared()); + std::vector cf_names = {"pikachu", "ilya", "muromec", + "dobrynia", "nikitich", "alyosha", + "popovich"}; + CreateAndReopenWithCF(cf_names, options); - ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); - ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); - ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); - ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); - ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); - ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); - ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); - for (int i = 1; i < 8; ++i) { - ASSERT_OK(Flush(i)); - ASSERT_EQ(listener->flushed_dbs_.size(), i); - ASSERT_EQ(listener->flushed_column_family_names_.size(), i); - } + ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); + ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); + ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); + ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); + ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); + ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); + ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); + for (int i = 1; i < 8; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_EQ(listener->flushed_dbs_.size(), i); + ASSERT_EQ(listener->flushed_column_family_names_.size(), i); + } - // make sure callback functions are called in the right order - for (size_t i = 0; i < cf_names.size(); i++) { - ASSERT_EQ(listener->flushed_dbs_[i], db_); - ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); + // make sure callback functions are called in the right order + for (size_t i = 0; i < cf_names.size(); i++) { + ASSERT_EQ(listener->flushed_dbs_[i], db_); + ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); + } + Close(); } } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index c35ebd5a5..3927a3f03 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -724,6 +724,8 @@ Status InstallMemtableAtomicFlushResults( const autovector*>& mems_list, VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, const autovector& file_metas, + const autovector>*>& + committed_flush_jobs_info, autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer) { AutoThreadOperationStageUpdater stage_updater( @@ -753,6 +755,17 @@ Status InstallMemtableAtomicFlushResults( (*mems_list[k])[i]->SetFlushCompleted(true); (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber()); } +#ifndef ROCKSDB_LITE + if (committed_flush_jobs_info[k]) { + assert(!mems_list[k]->empty()); + assert((*mems_list[k])[0]); + std::unique_ptr flush_job_info = + (*mems_list[k])[0]->ReleaseFlushJobInfo(); + committed_flush_jobs_info[k]->push_back(std::move(flush_job_info)); + } +#else //! ROCKSDB_LITE + (void)committed_flush_jobs_info; +#endif // ROCKSDB_LITE } Status s; diff --git a/db/memtable_list.h b/db/memtable_list.h index 9ae50e47f..17a7aa87f 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -140,6 +140,8 @@ class MemTableListVersion { const autovector*>& mems_list, VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, const autovector& file_meta, + const autovector>*>& + committed_flush_jobs_info, autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); @@ -415,6 +417,8 @@ class MemTableList { const autovector*>& mems_list, VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, const autovector& file_meta, + const autovector>*>& + committed_flush_jobs_info, autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); @@ -469,6 +473,8 @@ extern Status InstallMemtableAtomicFlushResults( const autovector*>& mems_list, VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, const autovector& file_meta, + const autovector>*>& + committed_flush_jobs_info, autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); } // namespace ROCKSDB_NAMESPACE diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 165471b6b..18b5f57e9 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -182,12 +182,21 @@ class MemTableListTest : public testing::Test { for (auto& meta : file_metas) { file_meta_ptrs.push_back(&meta); } + std::vector>> + committed_flush_jobs_info_storage(cf_ids.size()); + autovector>*> + committed_flush_jobs_info; + for (int i = 0; i < static_cast(cf_ids.size()); ++i) { + committed_flush_jobs_info.push_back( + &committed_flush_jobs_info_storage[i]); + } + InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); return InstallMemtableAtomicFlushResults( &lists, cfds, mutable_cf_options_list, mems_list, &versions, - nullptr /* prep_tracker */, &mutex, file_meta_ptrs, to_delete, nullptr, - &log_buffer); + nullptr /* prep_tracker */, &mutex, file_meta_ptrs, + committed_flush_jobs_info, to_delete, nullptr, &log_buffer); } };