Improve flushing multiple column families (#4708)
Summary: If one column family is dropped, we should simply skip it and continue to flush other active ones. Currently we use Status::ShutdownInProgress to notify caller of column families being dropped. In the future, we should consider using a different Status code. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4708 Differential Revision: D13378954 Pulled By: riversand963 fbshipit-source-id: 42f248cdf2d32d4c0f677cd39012694b8f1328ca
This commit is contained in:
parent
ec43385bf3
commit
663d24f467
@ -11,6 +11,7 @@
|
|||||||
* Introduced `MemoryAllocator`, which lets the user specify custom memory allocator for block based table.
|
* Introduced `MemoryAllocator`, which lets the user specify custom memory allocator for block based table.
|
||||||
* Improved `DeleteRange` to prevent read performance degradation. The feature is no longer marked as experimental.
|
* Improved `DeleteRange` to prevent read performance degradation. The feature is no longer marked as experimental.
|
||||||
* Enabled checkpoint on readonly db (DBImplReadOnly).
|
* Enabled checkpoint on readonly db (DBImplReadOnly).
|
||||||
|
* Make DB ignore dropped column families while committing results of atomic flush.
|
||||||
|
|
||||||
### Public API Change
|
### Public API Change
|
||||||
* `DBOptions::use_direct_reads` now affects reads issued by `BackupEngine` on the database's SSTs.
|
* `DBOptions::use_direct_reads` now affects reads issued by `BackupEngine` on the database's SSTs.
|
||||||
|
@ -407,6 +407,87 @@ TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
|
|||||||
Destroy(options);
|
Destroy(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
|
||||||
|
bool atomic_flush = GetParam();
|
||||||
|
if (!atomic_flush) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.atomic_flush = atomic_flush;
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
|
size_t num_cfs = handles_.size();
|
||||||
|
ASSERT_EQ(3, num_cfs);
|
||||||
|
WriteOptions wopts;
|
||||||
|
wopts.disableWAL = true;
|
||||||
|
std::vector<int> cf_ids;
|
||||||
|
for (size_t i = 0; i != num_cfs; ++i) {
|
||||||
|
int cf_id = static_cast<int>(i);
|
||||||
|
ASSERT_OK(Put(cf_id, "key", "value", wopts));
|
||||||
|
cf_ids.push_back(cf_id);
|
||||||
|
}
|
||||||
|
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
|
||||||
|
ASSERT_TRUE(Flush(cf_ids).IsShutdownInProgress());
|
||||||
|
Destroy(options);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(DBAtomicFlushTest,
|
||||||
|
FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) {
|
||||||
|
bool atomic_flush = GetParam();
|
||||||
|
if (!atomic_flush) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.atomic_flush = atomic_flush;
|
||||||
|
|
||||||
|
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
|
{{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
|
||||||
|
"DBAtomicFlushTest::BeforeDropCF"},
|
||||||
|
{"DBAtomicFlushTest::AfterDropCF",
|
||||||
|
"DBImpl::BackgroundCallFlush:start"}});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
size_t num_cfs = handles_.size();
|
||||||
|
ASSERT_EQ(3, num_cfs);
|
||||||
|
WriteOptions wopts;
|
||||||
|
wopts.disableWAL = true;
|
||||||
|
for (size_t i = 0; i != num_cfs; ++i) {
|
||||||
|
int cf_id = static_cast<int>(i);
|
||||||
|
ASSERT_OK(Put(cf_id, "key", "value", wopts));
|
||||||
|
}
|
||||||
|
port::Thread user_thread([&]() {
|
||||||
|
TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
|
||||||
|
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
|
||||||
|
TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
|
||||||
|
});
|
||||||
|
FlushOptions flush_opts;
|
||||||
|
flush_opts.wait = true;
|
||||||
|
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
|
||||||
|
user_thread.join();
|
||||||
|
for (size_t i = 0; i != num_cfs; ++i) {
|
||||||
|
int cf_id = static_cast<int>(i);
|
||||||
|
ASSERT_EQ("value", Get(cf_id, "key"));
|
||||||
|
}
|
||||||
|
|
||||||
|
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options);
|
||||||
|
num_cfs = handles_.size();
|
||||||
|
ASSERT_EQ(2, num_cfs);
|
||||||
|
for (size_t i = 0; i != num_cfs; ++i) {
|
||||||
|
int cf_id = static_cast<int>(i);
|
||||||
|
ASSERT_EQ("value", Get(cf_id, "key"));
|
||||||
|
}
|
||||||
|
Destroy(options);
|
||||||
|
}
|
||||||
|
|
||||||
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
|
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
|
||||||
testing::Bool());
|
testing::Bool());
|
||||||
|
|
||||||
|
@ -219,20 +219,25 @@ Status DBImpl::FlushMemTablesToOutputFiles(
|
|||||||
return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress,
|
return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress,
|
||||||
job_context, log_buffer);
|
job_context, log_buffer);
|
||||||
}
|
}
|
||||||
Status s;
|
Status status;
|
||||||
for (auto& arg : bg_flush_args) {
|
for (auto& arg : bg_flush_args) {
|
||||||
ColumnFamilyData* cfd = arg.cfd_;
|
ColumnFamilyData* cfd = arg.cfd_;
|
||||||
const MutableCFOptions& mutable_cf_options =
|
const MutableCFOptions& mutable_cf_options =
|
||||||
*cfd->GetLatestMutableCFOptions();
|
*cfd->GetLatestMutableCFOptions();
|
||||||
SuperVersionContext* superversion_context = arg.superversion_context_;
|
SuperVersionContext* superversion_context = arg.superversion_context_;
|
||||||
s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
|
Status s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
|
||||||
job_context, superversion_context,
|
job_context, superversion_context,
|
||||||
log_buffer);
|
log_buffer);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
break;
|
status = s;
|
||||||
|
if (!s.IsShutdownInProgress()) {
|
||||||
|
// At this point, DB is not shutting down, nor is cfd dropped.
|
||||||
|
// Something is wrong, thus we break out of the loop.
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return s;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -331,8 +336,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
autovector<std::pair<bool, Status>> exec_status;
|
autovector<std::pair<bool, Status>> exec_status;
|
||||||
for (int i = 0; i != num_cfs; ++i) {
|
for (int i = 0; i != num_cfs; ++i) {
|
||||||
// Initially all jobs are not executed, with status OK.
|
// Initially all jobs are not executed, with status OK.
|
||||||
std::pair<bool, Status> elem(false, Status::OK());
|
exec_status.emplace_back(false, Status::OK());
|
||||||
exec_status.emplace_back(elem);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
@ -341,10 +345,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
exec_status[i].second =
|
exec_status[i].second =
|
||||||
jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]);
|
jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]);
|
||||||
exec_status[i].first = true;
|
exec_status[i].first = true;
|
||||||
if (!exec_status[i].second.ok()) {
|
|
||||||
s = exec_status[i].second;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (num_cfs > 1) {
|
if (num_cfs > 1) {
|
||||||
TEST_SYNC_POINT(
|
TEST_SYNC_POINT(
|
||||||
@ -352,17 +352,27 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
TEST_SYNC_POINT(
|
TEST_SYNC_POINT(
|
||||||
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
|
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
|
||||||
}
|
}
|
||||||
if (s.ok()) {
|
exec_status[0].second =
|
||||||
exec_status[0].second =
|
jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]);
|
||||||
jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]);
|
exec_status[0].first = true;
|
||||||
exec_status[0].first = true;
|
|
||||||
if (!exec_status[0].second.ok()) {
|
Status error_status;
|
||||||
s = exec_status[0].second;
|
for (const auto& e : exec_status) {
|
||||||
|
if (!e.second.ok()) {
|
||||||
|
s = e.second;
|
||||||
|
if (!e.second.IsShutdownInProgress()) {
|
||||||
|
// If a flush job did not return OK, and the CF is not dropped, and
|
||||||
|
// the DB is not shutting down, then we have to return this result to
|
||||||
|
// caller later.
|
||||||
|
error_status = e.second;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s = error_status.ok() ? s : error_status;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok() || s.IsShutdownInProgress()) {
|
||||||
// Sync on all distinct output directories.
|
// Sync on all distinct output directories.
|
||||||
for (auto dir : distinct_output_dirs) {
|
for (auto dir : distinct_output_dirs) {
|
||||||
if (dir != nullptr) {
|
if (dir != nullptr) {
|
||||||
@ -376,6 +386,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
autovector<const autovector<MemTable*>*> mems_list;
|
autovector<const autovector<MemTable*>*> mems_list;
|
||||||
for (int i = 0; i != num_cfs; ++i) {
|
for (int i = 0; i != num_cfs; ++i) {
|
||||||
|
if (cfds[i]->IsDropped()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
const auto& mems = jobs[i].GetMemTables();
|
const auto& mems = jobs[i].GetMemTables();
|
||||||
mems_list.emplace_back(&mems);
|
mems_list.emplace_back(&mems);
|
||||||
}
|
}
|
||||||
@ -383,6 +396,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
autovector<MemTableList*> imm_lists;
|
autovector<MemTableList*> imm_lists;
|
||||||
autovector<const MutableCFOptions*> mutable_cf_options_list;
|
autovector<const MutableCFOptions*> mutable_cf_options_list;
|
||||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||||
|
if (cfd->IsDropped()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
all_cfds.emplace_back(cfd);
|
all_cfds.emplace_back(cfd);
|
||||||
imm_lists.emplace_back(cfd->imm());
|
imm_lists.emplace_back(cfd->imm());
|
||||||
mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
|
mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
|
||||||
@ -396,10 +412,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok() || s.IsShutdownInProgress()) {
|
||||||
assert(num_cfs ==
|
assert(num_cfs ==
|
||||||
static_cast<int>(job_context->superversion_contexts.size()));
|
static_cast<int>(job_context->superversion_contexts.size()));
|
||||||
for (int i = 0; i != num_cfs; ++i) {
|
for (int i = 0; i != num_cfs; ++i) {
|
||||||
|
if (cfds[i]->IsDropped()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
InstallSuperVersionAndScheduleWork(cfds[i],
|
InstallSuperVersionAndScheduleWork(cfds[i],
|
||||||
&job_context->superversion_contexts[i],
|
&job_context->superversion_contexts[i],
|
||||||
*cfds[i]->GetLatestMutableCFOptions());
|
*cfds[i]->GetLatestMutableCFOptions());
|
||||||
@ -415,6 +434,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
auto sfm = static_cast<SstFileManagerImpl*>(
|
auto sfm = static_cast<SstFileManagerImpl*>(
|
||||||
immutable_db_options_.sst_file_manager.get());
|
immutable_db_options_.sst_file_manager.get());
|
||||||
for (int i = 0; i != num_cfs; ++i) {
|
for (int i = 0; i != num_cfs; ++i) {
|
||||||
|
if (cfds[i]->IsDropped()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
NotifyOnFlushCompleted(cfds[i], &file_meta[i],
|
NotifyOnFlushCompleted(cfds[i], &file_meta[i],
|
||||||
*cfds[i]->GetLatestMutableCFOptions(),
|
*cfds[i]->GetLatestMutableCFOptions(),
|
||||||
job_context->job_id, jobs[i].GetTableProperties());
|
job_context->job_id, jobs[i].GetTableProperties());
|
||||||
@ -434,7 +456,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!s.ok()) {
|
// Need to undo atomic flush if something went wrong, i.e. s is not OK and
|
||||||
|
// it is not because of CF drop.
|
||||||
|
if (!s.ok() && !s.IsShutdownInProgress()) {
|
||||||
// Have to cancel the flush jobs that have NOT executed because we need to
|
// Have to cancel the flush jobs that have NOT executed because we need to
|
||||||
// unref the versions.
|
// unref the versions.
|
||||||
for (int i = 0; i != num_cfs; ++i) {
|
for (int i = 0; i != num_cfs; ++i) {
|
||||||
@ -442,17 +466,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
jobs[i].Cancel();
|
jobs[i].Cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!s.IsShutdownInProgress()) {
|
for (int i = 0; i != num_cfs; ++i) {
|
||||||
for (int i = 0; i != num_cfs; ++i) {
|
if (exec_status[i].first && exec_status[i].second.ok()) {
|
||||||
if (exec_status[i].first && exec_status[i].second.ok()) {
|
auto& mems = jobs[i].GetMemTables();
|
||||||
auto& mems = jobs[i].GetMemTables();
|
cfds[i]->imm()->RollbackMemtableFlush(mems,
|
||||||
cfds[i]->imm()->RollbackMemtableFlush(mems,
|
file_meta[i].fd.GetNumber());
|
||||||
file_meta[i].fd.GetNumber());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Status new_bg_error = s;
|
|
||||||
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
|
|
||||||
}
|
}
|
||||||
|
Status new_bg_error = s;
|
||||||
|
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
|
||||||
}
|
}
|
||||||
|
|
||||||
return s;
|
return s;
|
||||||
@ -1539,6 +1561,7 @@ Status DBImpl::AtomicFlushMemTables(
|
|||||||
write_thread_.ExitUnbatched(&w);
|
write_thread_.ExitUnbatched(&w);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
|
||||||
|
|
||||||
if (s.ok() && flush_options.wait) {
|
if (s.ok() && flush_options.wait) {
|
||||||
autovector<const uint64_t*> flush_memtable_ids;
|
autovector<const uint64_t*> flush_memtable_ids;
|
||||||
|
@ -426,7 +426,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
|
|||||||
imm_lists[pos]->InstallNewVersion();
|
imm_lists[pos]->InstallNewVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok() || s.IsShutdownInProgress()) {
|
||||||
for (size_t i = 0; i != batch_sz; ++i) {
|
for (size_t i = 0; i != batch_sz; ++i) {
|
||||||
if (tmp_cfds[i]->IsDropped()) {
|
if (tmp_cfds[i]->IsDropped()) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -579,7 +579,7 @@ std::string VersionEdit::DebugString(bool hex_key) const {
|
|||||||
AppendNumberTo(&r, max_column_family_);
|
AppendNumberTo(&r, max_column_family_);
|
||||||
}
|
}
|
||||||
if (is_in_atomic_group_) {
|
if (is_in_atomic_group_) {
|
||||||
r.append("\n AtomicGroup: ");
|
r.append("\n AtomicGroup: ");
|
||||||
AppendNumberTo(&r, remaining_entries_);
|
AppendNumberTo(&r, remaining_entries_);
|
||||||
r.append(" entries remains");
|
r.append(" entries remains");
|
||||||
}
|
}
|
||||||
|
@ -2849,6 +2849,7 @@ Status VersionSet::ProcessManifestWrites(
|
|||||||
batch_edits.push_back(first_writer.edit_list.front());
|
batch_edits.push_back(first_writer.edit_list.front());
|
||||||
} else {
|
} else {
|
||||||
auto it = manifest_writers_.cbegin();
|
auto it = manifest_writers_.cbegin();
|
||||||
|
size_t group_start = std::numeric_limits<size_t>::max();
|
||||||
while (it != manifest_writers_.cend()) {
|
while (it != manifest_writers_.cend()) {
|
||||||
if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
|
if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
|
||||||
// no group commits for column family add or drop
|
// no group commits for column family add or drop
|
||||||
@ -2857,7 +2858,36 @@ Status VersionSet::ProcessManifestWrites(
|
|||||||
last_writer = *(it++);
|
last_writer = *(it++);
|
||||||
assert(last_writer != nullptr);
|
assert(last_writer != nullptr);
|
||||||
assert(last_writer->cfd != nullptr);
|
assert(last_writer->cfd != nullptr);
|
||||||
if (last_writer->cfd != nullptr && last_writer->cfd->IsDropped()) {
|
if (last_writer->cfd->IsDropped()) {
|
||||||
|
// If we detect a dropped CF at this point, and the corresponding
|
||||||
|
// version edits belong to an atomic group, then we need to find out
|
||||||
|
// the preceding version edits in the same atomic group, and update
|
||||||
|
// their `remaining_entries_` member variable because we are NOT going
|
||||||
|
// to write the version edits' of dropped CF to the MANIFEST. If we
|
||||||
|
// don't update, then Recover can report corrupted atomic group because
|
||||||
|
// the `remaining_entries_` do not match.
|
||||||
|
if (!batch_edits.empty()) {
|
||||||
|
if (batch_edits.back()->is_in_atomic_group_ &&
|
||||||
|
batch_edits.back()->remaining_entries_ > 0) {
|
||||||
|
assert(group_start < batch_edits.size());
|
||||||
|
const auto& edit_list = last_writer->edit_list;
|
||||||
|
size_t k = 0;
|
||||||
|
while (k < edit_list.size()) {
|
||||||
|
if (!edit_list[k]->is_in_atomic_group_) {
|
||||||
|
break;
|
||||||
|
} else if (edit_list[k]->remaining_entries_ == 0) {
|
||||||
|
++k;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
++k;
|
||||||
|
}
|
||||||
|
for (auto i = group_start; i < batch_edits.size(); ++i) {
|
||||||
|
assert(static_cast<uint32_t>(k) <=
|
||||||
|
batch_edits.back()->remaining_entries_);
|
||||||
|
batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// We do a linear search on versions because versions is small.
|
// We do a linear search on versions because versions is small.
|
||||||
@ -2888,6 +2918,15 @@ Status VersionSet::ProcessManifestWrites(
|
|||||||
}
|
}
|
||||||
assert(builder != nullptr); // make checker happy
|
assert(builder != nullptr); // make checker happy
|
||||||
for (const auto& e : last_writer->edit_list) {
|
for (const auto& e : last_writer->edit_list) {
|
||||||
|
if (e->is_in_atomic_group_) {
|
||||||
|
if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
|
||||||
|
(batch_edits.back()->is_in_atomic_group_ &&
|
||||||
|
batch_edits.back()->remaining_entries_ == 0)) {
|
||||||
|
group_start = batch_edits.size();
|
||||||
|
}
|
||||||
|
} else if (group_start != std::numeric_limits<size_t>::max()) {
|
||||||
|
group_start = std::numeric_limits<size_t>::max();
|
||||||
|
}
|
||||||
LogAndApplyHelper(last_writer->cfd, builder, version, e, mu);
|
LogAndApplyHelper(last_writer->cfd, builder, version, e, mu);
|
||||||
batch_edits.push_back(e);
|
batch_edits.push_back(e);
|
||||||
}
|
}
|
||||||
@ -2900,6 +2939,42 @@ Status VersionSet::ProcessManifestWrites(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef NDEBUG
|
||||||
|
// Verify that version edits of atomic groups have correct
|
||||||
|
// remaining_entries_.
|
||||||
|
size_t k = 0;
|
||||||
|
while (k < batch_edits.size()) {
|
||||||
|
while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
|
||||||
|
++k;
|
||||||
|
}
|
||||||
|
if (k == batch_edits.size()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
size_t i = k;
|
||||||
|
while (i < batch_edits.size()) {
|
||||||
|
if (!batch_edits[i]->is_in_atomic_group_) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
assert(i - k + batch_edits[i]->remaining_entries_ ==
|
||||||
|
batch_edits[k]->remaining_entries_);
|
||||||
|
if (batch_edits[i]->remaining_entries_ == 0) {
|
||||||
|
++i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
assert(batch_edits[i - 1]->is_in_atomic_group_);
|
||||||
|
assert(0 == batch_edits[i - 1]->remaining_entries_);
|
||||||
|
std::vector<VersionEdit*> tmp;
|
||||||
|
for (size_t j = k; j != i; ++j) {
|
||||||
|
tmp.emplace_back(batch_edits[j]);
|
||||||
|
}
|
||||||
|
TEST_SYNC_POINT_CALLBACK(
|
||||||
|
"VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
|
||||||
|
k = i;
|
||||||
|
}
|
||||||
|
#endif // NDEBUG
|
||||||
|
|
||||||
uint64_t new_manifest_file_size = 0;
|
uint64_t new_manifest_file_size = 0;
|
||||||
Status s;
|
Status s;
|
||||||
|
|
||||||
@ -3205,7 +3280,7 @@ Status VersionSet::LogAndApply(
|
|||||||
if (!manifest_writers_.empty()) {
|
if (!manifest_writers_.empty()) {
|
||||||
manifest_writers_.front()->cv.Signal();
|
manifest_writers_.front()->cv.Signal();
|
||||||
}
|
}
|
||||||
return Status::OK();
|
return Status::ShutdownInProgress();
|
||||||
}
|
}
|
||||||
|
|
||||||
return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
|
return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
|
||||||
|
@ -605,9 +605,13 @@ TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
|
|||||||
ASSERT_TRUE(Overlaps("600", "700"));
|
ASSERT_TRUE(Overlaps("600", "700"));
|
||||||
}
|
}
|
||||||
|
|
||||||
class VersionSetTest : public testing::Test {
|
class VersionSetTestBase {
|
||||||
public:
|
public:
|
||||||
VersionSetTest()
|
const static std::string kColumnFamilyName1;
|
||||||
|
const static std::string kColumnFamilyName2;
|
||||||
|
const static std::string kColumnFamilyName3;
|
||||||
|
|
||||||
|
VersionSetTestBase()
|
||||||
: env_(Env::Default()),
|
: env_(Env::Default()),
|
||||||
dbname_(test::PerThreadDBPath("version_set_test")),
|
dbname_(test::PerThreadDBPath("version_set_test")),
|
||||||
db_options_(),
|
db_options_(),
|
||||||
@ -635,8 +639,9 @@ class VersionSetTest : public testing::Test {
|
|||||||
new_db.SetNextFile(2);
|
new_db.SetNextFile(2);
|
||||||
new_db.SetLastSequence(0);
|
new_db.SetLastSequence(0);
|
||||||
|
|
||||||
const std::vector<std::string> cf_names = {kDefaultColumnFamilyName,
|
const std::vector<std::string> cf_names = {
|
||||||
"alice", "bob"};
|
kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
|
||||||
|
kColumnFamilyName3};
|
||||||
const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
|
const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
|
||||||
autovector<VersionEdit> new_cfs;
|
autovector<VersionEdit> new_cfs;
|
||||||
uint64_t last_seq = 1;
|
uint64_t last_seq = 1;
|
||||||
@ -711,6 +716,15 @@ class VersionSetTest : public testing::Test {
|
|||||||
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
|
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
|
||||||
|
const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";
|
||||||
|
const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
|
||||||
|
|
||||||
|
class VersionSetTest : public VersionSetTestBase, public testing::Test {
|
||||||
|
public:
|
||||||
|
VersionSetTest() : VersionSetTestBase() {}
|
||||||
|
};
|
||||||
|
|
||||||
TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
|
TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
|
||||||
NewDB();
|
NewDB();
|
||||||
const int kGroupSize = 5;
|
const int kGroupSize = 5;
|
||||||
@ -958,6 +972,124 @@ TEST_F(VersionSetTest, HandleIncorrectAtomicGroupSize) {
|
|||||||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
||||||
EXPECT_TRUE(incorrect_group_size);
|
EXPECT_TRUE(incorrect_group_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class VersionSetTestDropOneCF : public VersionSetTestBase,
|
||||||
|
public testing::TestWithParam<std::string> {
|
||||||
|
public:
|
||||||
|
VersionSetTestDropOneCF() : VersionSetTestBase() {}
|
||||||
|
};
|
||||||
|
|
||||||
|
// This test simulates the following execution sequence
|
||||||
|
// Time thread1 bg_flush_thr
|
||||||
|
// | Prepare version edits (e1,e2,e3) for atomic
|
||||||
|
// | flush cf1, cf2, cf3
|
||||||
|
// | Enqueue e to drop cfi
|
||||||
|
// | to manifest_writers_
|
||||||
|
// | Enqueue (e1,e2,e3) to manifest_writers_
|
||||||
|
// |
|
||||||
|
// | Apply e,
|
||||||
|
// | cfi.IsDropped() is true
|
||||||
|
// | Apply (e1,e2,e3),
|
||||||
|
// | since cfi.IsDropped() == true, we need to
|
||||||
|
// | drop ei and write the rest to MANIFEST.
|
||||||
|
// V
|
||||||
|
//
|
||||||
|
// Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and
|
||||||
|
// last column family in an atomic group.
|
||||||
|
TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
|
||||||
|
std::vector<ColumnFamilyDescriptor> column_families;
|
||||||
|
SequenceNumber last_seqno;
|
||||||
|
std::unique_ptr<log::Writer> log_writer;
|
||||||
|
PrepareManifest(&column_families, &last_seqno, &log_writer);
|
||||||
|
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
|
||||||
|
EXPECT_EQ(column_families.size(),
|
||||||
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
|
||||||
|
|
||||||
|
const int kAtomicGroupSize = 3;
|
||||||
|
const std::vector<std::string> non_default_cf_names = {
|
||||||
|
kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};
|
||||||
|
|
||||||
|
// Drop one column family
|
||||||
|
VersionEdit drop_cf_edit;
|
||||||
|
drop_cf_edit.DropColumnFamily();
|
||||||
|
const std::string cf_to_drop_name(GetParam());
|
||||||
|
auto cfd_to_drop =
|
||||||
|
versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
|
||||||
|
ASSERT_NE(nullptr, cfd_to_drop);
|
||||||
|
cfd_to_drop->Ref(); // Increase its refcount because cfd_to_drop is used later
|
||||||
|
drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
|
||||||
|
mutex_.Lock();
|
||||||
|
s = versions_->LogAndApply(cfd_to_drop,
|
||||||
|
*cfd_to_drop->GetLatestMutableCFOptions(),
|
||||||
|
&drop_cf_edit, &mutex_);
|
||||||
|
mutex_.Unlock();
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
std::vector<VersionEdit> edits(kAtomicGroupSize);
|
||||||
|
uint32_t remaining = kAtomicGroupSize;
|
||||||
|
size_t i = 0;
|
||||||
|
autovector<ColumnFamilyData*> cfds;
|
||||||
|
autovector<const MutableCFOptions*> mutable_cf_options_list;
|
||||||
|
autovector<autovector<VersionEdit*>> edit_lists;
|
||||||
|
for (const auto& cf_name : non_default_cf_names) {
|
||||||
|
auto cfd = (cf_name != cf_to_drop_name)
|
||||||
|
? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)
|
||||||
|
: cfd_to_drop;
|
||||||
|
ASSERT_NE(nullptr, cfd);
|
||||||
|
cfds.push_back(cfd);
|
||||||
|
mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
|
||||||
|
edits[i].SetColumnFamily(cfd->GetID());
|
||||||
|
edits[i].SetLogNumber(0);
|
||||||
|
edits[i].SetNextFile(2);
|
||||||
|
edits[i].MarkAtomicGroup(--remaining);
|
||||||
|
edits[i].SetLastSequence(last_seqno++);
|
||||||
|
autovector<VersionEdit*> tmp_edits;
|
||||||
|
tmp_edits.push_back(&edits[i]);
|
||||||
|
edit_lists.emplace_back(tmp_edits);
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
int called = 0;
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
|
SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {
|
||||||
|
std::vector<VersionEdit*>* tmp_edits =
|
||||||
|
reinterpret_cast<std::vector<VersionEdit*>*>(arg);
|
||||||
|
EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());
|
||||||
|
for (const auto e : *tmp_edits) {
|
||||||
|
bool found = false;
|
||||||
|
for (const auto& e2 : edits) {
|
||||||
|
if (&e2 == e) {
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT_TRUE(found);
|
||||||
|
}
|
||||||
|
++called;
|
||||||
|
});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
mutex_.Lock();
|
||||||
|
s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,
|
||||||
|
&mutex_);
|
||||||
|
mutex_.Unlock();
|
||||||
|
ASSERT_OK(s);
|
||||||
|
ASSERT_EQ(1, called);
|
||||||
|
if (cfd_to_drop->Unref()) {
|
||||||
|
delete cfd_to_drop;
|
||||||
|
cfd_to_drop = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
AtomicGroup, VersionSetTestDropOneCF,
|
||||||
|
testing::Values(VersionSetTestBase::kColumnFamilyName1,
|
||||||
|
VersionSetTestBase::kColumnFamilyName2,
|
||||||
|
VersionSetTestBase::kColumnFamilyName3));
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
Loading…
Reference in New Issue
Block a user