Use correct FileMeta for atomic flush result install (#4932)

Summary:
1. this commit fixes our handling of a combination of two separate edge
cases. If a flush job does not pick any memtable to flush (because another
flush job has already picked the same memtables), and the column family
assigned to the flush job is dropped right before RocksDB calls
rocksdb::InstallMemtableAtomicFlushResults, our original code passes
a FileMetaData object whose file number is 0, failing the assertion in
rocksdb::InstallMemtableAtomicFlushResults (assert(m->GetFileNumber() > 0)).
2. Also piggyback a small change: since we already create a local copy of column family's mutable CF options to eliminate potential race condition with `SetOptions` call, we might as well use the local copy in other function calls in the same scope.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4932

Differential Revision: D13901322

Pulled By: riversand963

fbshipit-source-id: b936580af7c127ea0c6c19ea10cd5fcede9fb0f9
This commit is contained in:
Yanqin Jin 2019-01-31 14:28:53 -08:00 committed by Facebook Github Bot
parent 0ea57115a3
commit 842cdc11dd
5 changed files with 31 additions and 21 deletions

View File

@ -332,21 +332,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
jobs.back().PickMemTable();
}
autovector<FileMetaData> file_meta;
std::vector<FileMetaData> file_meta(num_cfs);
Status s;
assert(num_cfs == static_cast<int>(jobs.size()));
for (int i = 0; i != num_cfs; ++i) {
file_meta.emplace_back();
#ifndef ROCKSDB_LITE
const MutableCFOptions& mutable_cf_options =
*cfds[i]->GetLatestMutableCFOptions();
for (int i = 0; i != num_cfs; ++i) {
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
job_context->job_id, jobs[i].GetTableProperties());
#endif /* !ROCKSDB_LITE */
}
#endif /* !ROCKSDB_LITE */
if (logfile_number_ > 0) {
// TODO (yanqin) investigate whether we should sync the closed logs for
@ -450,19 +447,21 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<ColumnFamilyData*> tmp_cfds;
autovector<const autovector<MemTable*>*> mems_list;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i].GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) {
tmp_cfds.emplace_back(cfds[i]);
mems_list.emplace_back(&mems);
mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
tmp_file_meta.emplace_back(&file_meta[i]);
}
}
s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, file_meta, &job_context->memtables_to_free,
directories_.GetDbDir(), log_buffer);
versions_.get(), &mutex_, tmp_file_meta,
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
}
if (s.ok() || s.IsShutdownInProgress()) {
@ -474,7 +473,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
InstallSuperVersionAndScheduleWork(cfds[i],
&job_context->superversion_contexts[i],
*cfds[i]->GetLatestMutableCFOptions());
all_mutable_cf_options[i]);
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
cfds[i]->GetName().c_str(),
@ -490,8 +489,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (cfds[i]->IsDropped()) {
continue;
}
NotifyOnFlushCompleted(cfds[i], &file_meta[i],
*cfds[i]->GetLatestMutableCFOptions(),
NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i],
job_context->job_id, jobs[i].GetTableProperties());
if (sfm) {
std::string file_path = MakeTableFileName(

View File

@ -308,7 +308,9 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
k++;
}
HistogramData hist;
autovector<FileMetaData> file_metas;
std::vector<FileMetaData> file_metas;
// Call reserve to avoid auto-resizing
file_metas.reserve(flush_jobs.size());
mutex_.Lock();
for (auto& job : flush_jobs) {
job.PickMemTable();
@ -319,6 +321,10 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
ASSERT_OK(job.Run(nullptr /**/, &meta));
file_metas.emplace_back(meta);
}
autovector<FileMetaData*> file_meta_ptrs;
for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta);
}
autovector<const autovector<MemTable*>*> mems_list;
for (size_t i = 0; i != all_cfds.size(); ++i) {
const auto& mems = flush_jobs[i].GetMemTables();
@ -331,7 +337,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
Status s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, file_metas, &job_context.memtables_to_free,
versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
nullptr /* db_directory */, nullptr /* log_buffer */);
ASSERT_OK(s);

View File

@ -533,7 +533,7 @@ Status InstallMemtableAtomicFlushResults(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData>& file_metas,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
@ -553,10 +553,11 @@ Status InstallMemtableAtomicFlushResults(
assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
}
#endif
assert(nullptr != file_metas[k]);
for (size_t i = 0; i != mems_list[k]->size(); ++i) {
assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
(*mems_list[k])[i]->SetFlushCompleted(true);
(*mems_list[k])[i]->SetFileNumber(file_metas[k].fd.GetNumber());
(*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
}
}

View File

@ -123,7 +123,7 @@ class MemTableListVersion {
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData>& file_meta,
const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
@ -301,7 +301,7 @@ class MemTableList {
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData>& file_meta,
const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
@ -337,7 +337,7 @@ extern Status InstallMemtableAtomicFlushResults(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData>& file_meta,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
} // namespace rocksdb

View File

@ -161,18 +161,23 @@ class MemTableListTest : public testing::Test {
cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
EXPECT_NE(nullptr, cfds[i]);
}
autovector<FileMetaData> file_metas;
std::vector<FileMetaData> file_metas;
file_metas.reserve(cf_ids.size());
for (size_t i = 0; i != cf_ids.size(); ++i) {
FileMetaData meta;
uint64_t file_num = file_number.fetch_add(1);
meta.fd = FileDescriptor(file_num, 0, 0);
file_metas.emplace_back(meta);
}
autovector<FileMetaData*> file_meta_ptrs;
for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta);
}
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return InstallMemtableAtomicFlushResults(
&lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
file_metas, to_delete, nullptr, &log_buffer);
file_meta_ptrs, to_delete, nullptr, &log_buffer);
}
};