Fix NotifyOnFlushCompleted() for atomic flush (#8585)
Summary: PR https://github.com/facebook/rocksdb/issues/5908 added `flush_jobs_info_` to `FlushJob` to make sure `OnFlushCompleted()` is called after committing flush results to MANIFEST. However, `flush_jobs_info_` is not updated in atomic flush, causing `NotifyOnFlushCompleted()` to skip `OnFlushCompleted()`. This PR fixes this, in a similar way to https://github.com/facebook/rocksdb/issues/5908 that handles regular flush. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8585 Test Plan: make check Reviewed By: jay-zhuang Differential Revision: D29913720 Pulled By: riversand963 fbshipit-source-id: 4ff023c98372fa2c93188d4a5c8a4e9ffa0f4dda
This commit is contained in:
parent
8b2f60b668
commit
0879c24040
@ -590,6 +590,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
autovector<const autovector<MemTable*>*> mems_list;
|
autovector<const autovector<MemTable*>*> mems_list;
|
||||||
autovector<const MutableCFOptions*> mutable_cf_options_list;
|
autovector<const MutableCFOptions*> mutable_cf_options_list;
|
||||||
autovector<FileMetaData*> tmp_file_meta;
|
autovector<FileMetaData*> tmp_file_meta;
|
||||||
|
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
|
||||||
|
committed_flush_jobs_info;
|
||||||
for (int i = 0; i != num_cfs; ++i) {
|
for (int i = 0; i != num_cfs; ++i) {
|
||||||
const auto& mems = jobs[i]->GetMemTables();
|
const auto& mems = jobs[i]->GetMemTables();
|
||||||
if (!cfds[i]->IsDropped() && !mems.empty()) {
|
if (!cfds[i]->IsDropped() && !mems.empty()) {
|
||||||
@ -597,13 +599,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
mems_list.emplace_back(&mems);
|
mems_list.emplace_back(&mems);
|
||||||
mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
|
mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
|
||||||
tmp_file_meta.emplace_back(&file_meta[i]);
|
tmp_file_meta.emplace_back(&file_meta[i]);
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
committed_flush_jobs_info.emplace_back(
|
||||||
|
jobs[i]->GetCommittedFlushJobsInfo());
|
||||||
|
#endif //! ROCKSDB_LITE
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s = InstallMemtableAtomicFlushResults(
|
s = InstallMemtableAtomicFlushResults(
|
||||||
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
|
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
|
||||||
versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
|
versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
|
||||||
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
|
committed_flush_jobs_info, &job_context->memtables_to_free,
|
||||||
|
directories_.GetDbDir(), log_buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
|
@ -418,12 +418,19 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
|
|||||||
for (auto cfd : all_cfds) {
|
for (auto cfd : all_cfds) {
|
||||||
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
|
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
|
||||||
}
|
}
|
||||||
|
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
|
||||||
|
committed_flush_jobs_info;
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
for (auto& job : flush_jobs) {
|
||||||
|
committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo());
|
||||||
|
}
|
||||||
|
#endif //! ROCKSDB_LITE
|
||||||
|
|
||||||
Status s = InstallMemtableAtomicFlushResults(
|
Status s = InstallMemtableAtomicFlushResults(
|
||||||
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
|
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
|
||||||
versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs,
|
versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs,
|
||||||
&job_context.memtables_to_free, nullptr /* db_directory */,
|
committed_flush_jobs_info, &job_context.memtables_to_free,
|
||||||
nullptr /* log_buffer */);
|
nullptr /* db_directory */, nullptr /* log_buffer */);
|
||||||
ASSERT_OK(s);
|
ASSERT_OK(s);
|
||||||
|
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
|
@ -356,13 +356,17 @@ TEST_F(EventListenerTest, MultiCF) {
|
|||||||
#ifdef ROCKSDB_USING_THREAD_STATUS
|
#ifdef ROCKSDB_USING_THREAD_STATUS
|
||||||
options.enable_thread_tracking = true;
|
options.enable_thread_tracking = true;
|
||||||
#endif // ROCKSDB_USING_THREAD_STATUS
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
||||||
|
for (auto atomic_flush : {false, true}) {
|
||||||
|
options.atomic_flush = atomic_flush;
|
||||||
|
options.create_if_missing = true;
|
||||||
|
DestroyAndReopen(options);
|
||||||
TestFlushListener* listener = new TestFlushListener(options.env, this);
|
TestFlushListener* listener = new TestFlushListener(options.env, this);
|
||||||
options.listeners.emplace_back(listener);
|
options.listeners.emplace_back(listener);
|
||||||
options.table_properties_collector_factories.push_back(
|
options.table_properties_collector_factories.push_back(
|
||||||
std::make_shared<TestPropertiesCollectorFactory>());
|
std::make_shared<TestPropertiesCollectorFactory>());
|
||||||
std::vector<std::string> cf_names = {
|
std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
|
||||||
"pikachu", "ilya", "muromec", "dobrynia",
|
"dobrynia", "nikitich", "alyosha",
|
||||||
"nikitich", "alyosha", "popovich"};
|
"popovich"};
|
||||||
CreateAndReopenWithCF(cf_names, options);
|
CreateAndReopenWithCF(cf_names, options);
|
||||||
|
|
||||||
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
|
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
|
||||||
@ -383,6 +387,8 @@ TEST_F(EventListenerTest, MultiCF) {
|
|||||||
ASSERT_EQ(listener->flushed_dbs_[i], db_);
|
ASSERT_EQ(listener->flushed_dbs_[i], db_);
|
||||||
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
|
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
|
||||||
}
|
}
|
||||||
|
Close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(EventListenerTest, MultiDBMultiListeners) {
|
TEST_F(EventListenerTest, MultiDBMultiListeners) {
|
||||||
|
@ -724,6 +724,8 @@ Status InstallMemtableAtomicFlushResults(
|
|||||||
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
|
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
|
||||||
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
|
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
|
||||||
const autovector<FileMetaData*>& file_metas,
|
const autovector<FileMetaData*>& file_metas,
|
||||||
|
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
|
||||||
|
committed_flush_jobs_info,
|
||||||
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
||||||
LogBuffer* log_buffer) {
|
LogBuffer* log_buffer) {
|
||||||
AutoThreadOperationStageUpdater stage_updater(
|
AutoThreadOperationStageUpdater stage_updater(
|
||||||
@ -753,6 +755,17 @@ Status InstallMemtableAtomicFlushResults(
|
|||||||
(*mems_list[k])[i]->SetFlushCompleted(true);
|
(*mems_list[k])[i]->SetFlushCompleted(true);
|
||||||
(*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
|
(*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
|
||||||
}
|
}
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
if (committed_flush_jobs_info[k]) {
|
||||||
|
assert(!mems_list[k]->empty());
|
||||||
|
assert((*mems_list[k])[0]);
|
||||||
|
std::unique_ptr<FlushJobInfo> flush_job_info =
|
||||||
|
(*mems_list[k])[0]->ReleaseFlushJobInfo();
|
||||||
|
committed_flush_jobs_info[k]->push_back(std::move(flush_job_info));
|
||||||
|
}
|
||||||
|
#else //! ROCKSDB_LITE
|
||||||
|
(void)committed_flush_jobs_info;
|
||||||
|
#endif // ROCKSDB_LITE
|
||||||
}
|
}
|
||||||
|
|
||||||
Status s;
|
Status s;
|
||||||
|
@ -140,6 +140,8 @@ class MemTableListVersion {
|
|||||||
const autovector<const autovector<MemTable*>*>& mems_list,
|
const autovector<const autovector<MemTable*>*>& mems_list,
|
||||||
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
|
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
|
||||||
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
|
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
|
||||||
|
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
|
||||||
|
committed_flush_jobs_info,
|
||||||
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
||||||
LogBuffer* log_buffer);
|
LogBuffer* log_buffer);
|
||||||
|
|
||||||
@ -415,6 +417,8 @@ class MemTableList {
|
|||||||
const autovector<const autovector<MemTable*>*>& mems_list,
|
const autovector<const autovector<MemTable*>*>& mems_list,
|
||||||
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
|
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
|
||||||
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
|
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
|
||||||
|
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
|
||||||
|
committed_flush_jobs_info,
|
||||||
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
||||||
LogBuffer* log_buffer);
|
LogBuffer* log_buffer);
|
||||||
|
|
||||||
@ -469,6 +473,8 @@ extern Status InstallMemtableAtomicFlushResults(
|
|||||||
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
|
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
|
||||||
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
|
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
|
||||||
const autovector<FileMetaData*>& file_meta,
|
const autovector<FileMetaData*>& file_meta,
|
||||||
|
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
|
||||||
|
committed_flush_jobs_info,
|
||||||
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
||||||
LogBuffer* log_buffer);
|
LogBuffer* log_buffer);
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
@ -182,12 +182,21 @@ class MemTableListTest : public testing::Test {
|
|||||||
for (auto& meta : file_metas) {
|
for (auto& meta : file_metas) {
|
||||||
file_meta_ptrs.push_back(&meta);
|
file_meta_ptrs.push_back(&meta);
|
||||||
}
|
}
|
||||||
|
std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
|
||||||
|
committed_flush_jobs_info_storage(cf_ids.size());
|
||||||
|
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
|
||||||
|
committed_flush_jobs_info;
|
||||||
|
for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
|
||||||
|
committed_flush_jobs_info.push_back(
|
||||||
|
&committed_flush_jobs_info_storage[i]);
|
||||||
|
}
|
||||||
|
|
||||||
InstrumentedMutex mutex;
|
InstrumentedMutex mutex;
|
||||||
InstrumentedMutexLock l(&mutex);
|
InstrumentedMutexLock l(&mutex);
|
||||||
return InstallMemtableAtomicFlushResults(
|
return InstallMemtableAtomicFlushResults(
|
||||||
&lists, cfds, mutable_cf_options_list, mems_list, &versions,
|
&lists, cfds, mutable_cf_options_list, mems_list, &versions,
|
||||||
nullptr /* prep_tracker */, &mutex, file_meta_ptrs, to_delete, nullptr,
|
nullptr /* prep_tracker */, &mutex, file_meta_ptrs,
|
||||||
&log_buffer);
|
committed_flush_jobs_info, to_delete, nullptr, &log_buffer);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user