From 65b229851093f1819b42b16784fd3668e32154d4 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 31 Jan 2019 14:28:53 -0800 Subject: [PATCH] Use correct FileMeta for atomic flush result install (#4932) Summary: 1. this commit fixes our handling of a combination of two separate edge cases. If a flush job does not pick any memtable to flush (because another flush job has already picked the same memtables), and the column family assigned to the flush job is dropped right before RocksDB calls rocksdb::InstallMemtableAtomicFlushResults, our original code passes a FileMetaData object whose file number is 0, failing the assertion in rocksdb::InstallMemtableAtomicFlushResults (assert(m->GetFileNumber() > 0)). 2. Also piggyback a small change: since we already create a local copy of column family's mutable CF options to eliminate potential race condition with `SetOptions` call, we might as well use the local copy in other function calls in the same scope. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4932 Differential Revision: D13901322 Pulled By: riversand963 fbshipit-source-id: b936580af7c127ea0c6c19ea10cd5fcede9fb0f9 --- db/db_impl_compaction_flush.cc | 22 ++++++++++------------ db/flush_job_test.cc | 10 ++++++++-- db/memtable_list.cc | 5 +++-- db/memtable_list.h | 6 +++--- db/memtable_list_test.cc | 9 +++++++-- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 5847f05dd..a42e60f85 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -310,21 +310,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( jobs.back().PickMemTable(); } - autovector file_meta; + std::vector file_meta(num_cfs); Status s; assert(num_cfs == static_cast(jobs.size())); - for (int i = 0; i != num_cfs; ++i) { - file_meta.emplace_back(); - #ifndef ROCKSDB_LITE - const MutableCFOptions& mutable_cf_options = - *cfds[i]->GetLatestMutableCFOptions(); + for (int i = 0; i != num_cfs; ++i) { + const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i); // may temporarily unlock and lock the mutex. NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options, job_context->job_id, jobs[i].GetTableProperties()); -#endif /* !ROCKSDB_LITE */ } +#endif /* !ROCKSDB_LITE */ if (logfile_number_ > 0) { // TODO (yanqin) investigate whether we should sync the closed logs for @@ -428,19 +425,21 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( autovector tmp_cfds; autovector*> mems_list; autovector mutable_cf_options_list; + autovector tmp_file_meta; for (int i = 0; i != num_cfs; ++i) { const auto& mems = jobs[i].GetMemTables(); if (!cfds[i]->IsDropped() && !mems.empty()) { tmp_cfds.emplace_back(cfds[i]); mems_list.emplace_back(&mems); mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]); + tmp_file_meta.emplace_back(&file_meta[i]); } } s = InstallMemtableAtomicFlushResults( nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list, - versions_.get(), &mutex_, file_meta, &job_context->memtables_to_free, - directories_.GetDbDir(), log_buffer); + versions_.get(), &mutex_, tmp_file_meta, + &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer); } if (s.ok() || s.IsShutdownInProgress()) { @@ -452,7 +451,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } InstallSuperVersionAndScheduleWork(cfds[i], &job_context->superversion_contexts[i], - *cfds[i]->GetLatestMutableCFOptions()); + all_mutable_cf_options[i]); VersionStorageInfo::LevelSummaryStorage tmp; ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", cfds[i]->GetName().c_str(), @@ -468,8 +467,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( if (cfds[i]->IsDropped()) { continue; } - NotifyOnFlushCompleted(cfds[i], &file_meta[i], - *cfds[i]->GetLatestMutableCFOptions(), + NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i], job_context->job_id, jobs[i].GetTableProperties()); if (sfm) { std::string file_path = MakeTableFileName( diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 5ac5f2f93..1f7bc7b84 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -308,7 +308,9 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { k++; } HistogramData hist; - autovector file_metas; + std::vector file_metas; + // Call reserve to avoid auto-resizing + file_metas.reserve(flush_jobs.size()); mutex_.Lock(); for (auto& job : flush_jobs) { job.PickMemTable(); @@ -319,6 +321,10 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { ASSERT_OK(job.Run(nullptr /**/, &meta)); file_metas.emplace_back(meta); } + autovector file_meta_ptrs; + for (auto& meta : file_metas) { + file_meta_ptrs.push_back(&meta); + } autovector*> mems_list; for (size_t i = 0; i != all_cfds.size(); ++i) { const auto& mems = flush_jobs[i].GetMemTables(); @@ -331,7 +337,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { Status s = InstallMemtableAtomicFlushResults( nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list, - versions_.get(), &mutex_, file_metas, &job_context.memtables_to_free, + versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free, nullptr /* db_directory */, nullptr /* log_buffer */); ASSERT_OK(s); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 459d392d5..9397dbc7e 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -533,7 +533,7 @@ Status InstallMemtableAtomicFlushResults( const autovector& cfds, const autovector& mutable_cf_options_list, const autovector*>& mems_list, VersionSet* vset, - InstrumentedMutex* mu, const autovector& file_metas, + InstrumentedMutex* mu, const autovector& file_metas, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer) { AutoThreadOperationStageUpdater stage_updater( @@ -553,10 +553,11 @@ Status InstallMemtableAtomicFlushResults( assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID()); } #endif + assert(nullptr != file_metas[k]); for (size_t i = 0; i != mems_list[k]->size(); ++i) { assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0); (*mems_list[k])[i]->SetFlushCompleted(true); - (*mems_list[k])[i]->SetFileNumber(file_metas[k].fd.GetNumber()); + (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber()); } } diff --git a/db/memtable_list.h b/db/memtable_list.h index be3f93562..b56ad4932 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -123,7 +123,7 @@ class MemTableListVersion { const autovector& mutable_cf_options_list, const autovector*>& mems_list, VersionSet* vset, InstrumentedMutex* mu, - const autovector& file_meta, + const autovector& file_meta, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer); @@ -301,7 +301,7 @@ class MemTableList { const autovector& mutable_cf_options_list, const autovector*>& mems_list, VersionSet* vset, InstrumentedMutex* mu, - const autovector& file_meta, + const autovector& file_meta, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer); @@ -337,7 +337,7 @@ extern Status InstallMemtableAtomicFlushResults( const autovector& cfds, const autovector& mutable_cf_options_list, const autovector*>& mems_list, VersionSet* vset, - InstrumentedMutex* mu, const autovector& file_meta, + InstrumentedMutex* mu, const autovector& file_meta, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer); } // namespace rocksdb diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index d67eed9fa..f0f4b0bb0 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -161,18 +161,23 @@ class MemTableListTest : public testing::Test { cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i])); EXPECT_NE(nullptr, cfds[i]); } - autovector file_metas; + std::vector file_metas; + file_metas.reserve(cf_ids.size()); for (size_t i = 0; i != cf_ids.size(); ++i) { FileMetaData meta; uint64_t file_num = file_number.fetch_add(1); meta.fd = FileDescriptor(file_num, 0, 0); file_metas.emplace_back(meta); } + autovector file_meta_ptrs; + for (auto& meta : file_metas) { + file_meta_ptrs.push_back(&meta); + } InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); return InstallMemtableAtomicFlushResults( &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex, - file_metas, to_delete, nullptr, &log_buffer); + file_meta_ptrs, to_delete, nullptr, &log_buffer); } };