diff --git a/HISTORY.md b/HISTORY.md index 0505514e9..bdc08dd10 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### New Features * Enabled checkpoint on readonly db (DBImplReadOnly). +* Make DB ignore dropped column families while committing results of atomic flush. ### Public API Change * Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index e9ae980b9..8a4d8fc63 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -407,6 +407,87 @@ TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) { Destroy(options); } +TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = atomic_flush; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->EnableProcessing(); + + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + WriteOptions wopts; + wopts.disableWAL = true; + std::vector cf_ids; + for (size_t i = 0; i != num_cfs; ++i) { + int cf_id = static_cast(i); + ASSERT_OK(Put(cf_id, "key", "value", wopts)); + cf_ids.push_back(cf_id); + } + ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); + ASSERT_TRUE(Flush(cf_ids).IsShutdownInProgress()); + Destroy(options); +} + +TEST_P(DBAtomicFlushTest, + FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = atomic_flush; + + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush", + "DBAtomicFlushTest::BeforeDropCF"}, + {"DBAtomicFlushTest::AfterDropCF", + "DBImpl::BackgroundCallFlush:start"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + WriteOptions wopts; + wopts.disableWAL = true; + for (size_t i = 0; i != num_cfs; ++i) { + int cf_id = static_cast(i); + ASSERT_OK(Put(cf_id, "key", "value", wopts)); + } + port::Thread user_thread([&]() { + TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF"); + ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); + TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF"); + }); + FlushOptions flush_opts; + flush_opts.wait = true; + ASSERT_OK(dbfull()->Flush(flush_opts, handles_)); + user_thread.join(); + for (size_t i = 0; i != num_cfs; ++i) { + int cf_id = static_cast(i); + ASSERT_EQ("value", Get(cf_id, "key")); + } + + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options); + num_cfs = handles_.size(); + ASSERT_EQ(2, num_cfs); + for (size_t i = 0; i != num_cfs; ++i) { + int cf_id = static_cast(i); + ASSERT_EQ("value", Get(cf_id, "key")); + } + Destroy(options); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 795e4164d..b22845a1f 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -241,20 +241,25 @@ Status DBImpl::FlushMemTablesToOutputFiles( return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress, job_context, log_buffer); } - Status s; + Status status; for (auto& arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; const MutableCFOptions& mutable_cf_options = *cfd->GetLatestMutableCFOptions(); SuperVersionContext* superversion_context = arg.superversion_context_; - s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, - job_context, superversion_context, - log_buffer); + Status s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, + job_context, superversion_context, + log_buffer); if (!s.ok()) { - break; + status = s; + if (!s.IsShutdownInProgress()) { + // At this point, DB is not shutting down, nor is cfd dropped. + // Something is wrong, thus we break out of the loop. + break; + } } } - return s; + return status; } /* @@ -353,8 +358,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( autovector> exec_status; for (int i = 0; i != num_cfs; ++i) { // Initially all jobs are not executed, with status OK. - std::pair elem(false, Status::OK()); - exec_status.emplace_back(elem); + exec_status.emplace_back(false, Status::OK()); } if (s.ok()) { @@ -363,10 +367,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( exec_status[i].second = jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]); exec_status[i].first = true; - if (!exec_status[i].second.ok()) { - s = exec_status[i].second; - break; - } } if (num_cfs > 1) { TEST_SYNC_POINT( @@ -374,17 +374,27 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( TEST_SYNC_POINT( "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"); } - if (s.ok()) { - exec_status[0].second = - jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]); - exec_status[0].first = true; - if (!exec_status[0].second.ok()) { - s = exec_status[0].second; + exec_status[0].second = + jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]); + exec_status[0].first = true; + + Status error_status; + for (const auto& e : exec_status) { + if (!e.second.ok()) { + s = e.second; + if (!e.second.IsShutdownInProgress()) { + // If a flush job did not return OK, and the CF is not dropped, and + // the DB is not shutting down, then we have to return this result to + // caller later. + error_status = e.second; + } } } + + s = error_status.ok() ? s : error_status; } - if (s.ok()) { + if (s.ok() || s.IsShutdownInProgress()) { // Sync on all distinct output directories. for (auto dir : distinct_output_dirs) { if (dir != nullptr) { @@ -398,6 +408,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( if (s.ok()) { autovector*> mems_list; for (int i = 0; i != num_cfs; ++i) { + if (cfds[i]->IsDropped()) { + continue; + } const auto& mems = jobs[i].GetMemTables(); mems_list.emplace_back(&mems); } @@ -405,6 +418,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( autovector imm_lists; autovector mutable_cf_options_list; for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } all_cfds.emplace_back(cfd); imm_lists.emplace_back(cfd->imm()); mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions()); @@ -418,10 +434,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } } - if (s.ok()) { + if (s.ok() || s.IsShutdownInProgress()) { assert(num_cfs == static_cast(job_context->superversion_contexts.size())); for (int i = 0; i != num_cfs; ++i) { + if (cfds[i]->IsDropped()) { + continue; + } InstallSuperVersionAndScheduleWork(cfds[i], &job_context->superversion_contexts[i], *cfds[i]->GetLatestMutableCFOptions()); @@ -437,6 +456,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); for (int i = 0; i != num_cfs; ++i) { + if (cfds[i]->IsDropped()) { + continue; + } NotifyOnFlushCompleted(cfds[i], &file_meta[i], *cfds[i]->GetLatestMutableCFOptions(), job_context->job_id, jobs[i].GetTableProperties()); @@ -456,7 +478,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( #endif // ROCKSDB_LITE } - if (!s.ok()) { + // Need to undo atomic flush if something went wrong, i.e. s is not OK and + // it is not because of CF drop. + if (!s.ok() && !s.IsShutdownInProgress()) { // Have to cancel the flush jobs that have NOT executed because we need to // unref the versions. for (int i = 0; i != num_cfs; ++i) { @@ -464,17 +488,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( jobs[i].Cancel(); } } - if (!s.IsShutdownInProgress()) { - for (int i = 0; i != num_cfs; ++i) { - if (exec_status[i].first && exec_status[i].second.ok()) { - auto& mems = jobs[i].GetMemTables(); - cfds[i]->imm()->RollbackMemtableFlush(mems, - file_meta[i].fd.GetNumber()); - } + for (int i = 0; i != num_cfs; ++i) { + if (exec_status[i].first && exec_status[i].second.ok()) { + auto& mems = jobs[i].GetMemTables(); + cfds[i]->imm()->RollbackMemtableFlush(mems, + file_meta[i].fd.GetNumber()); } - Status new_bg_error = s; - error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); } + Status new_bg_error = s; + error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); } return s; @@ -1541,6 +1563,7 @@ Status DBImpl::AtomicFlushMemTables( write_thread_.ExitUnbatched(&w); } } + TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush"); if (s.ok() && flush_options.wait) { autovector flush_memtable_ids; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 4c0af1e89..39e00285b 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -426,7 +426,7 @@ Status MemTableList::TryInstallMemtableFlushResults( imm_lists[pos]->InstallNewVersion(); } - if (s.ok()) { + if (s.ok() || s.IsShutdownInProgress()) { for (size_t i = 0; i != batch_sz; ++i) { if (tmp_cfds[i]->IsDropped()) { continue; diff --git a/db/version_edit.cc b/db/version_edit.cc index adeca134d..e9f497999 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -579,7 +579,7 @@ std::string VersionEdit::DebugString(bool hex_key) const { AppendNumberTo(&r, max_column_family_); } if (is_in_atomic_group_) { - r.append("\n AtomicGroup: "); + r.append("\n AtomicGroup: "); AppendNumberTo(&r, remaining_entries_); r.append(" entries remains"); } diff --git a/db/version_set.cc b/db/version_set.cc index 8349f2857..5cea3d27e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2849,6 +2849,7 @@ Status VersionSet::ProcessManifestWrites( batch_edits.push_back(first_writer.edit_list.front()); } else { auto it = manifest_writers_.cbegin(); + size_t group_start = std::numeric_limits::max(); while (it != manifest_writers_.cend()) { if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) { // no group commits for column family add or drop @@ -2857,7 +2858,36 @@ Status VersionSet::ProcessManifestWrites( last_writer = *(it++); assert(last_writer != nullptr); assert(last_writer->cfd != nullptr); - if (last_writer->cfd != nullptr && last_writer->cfd->IsDropped()) { + if (last_writer->cfd->IsDropped()) { + // If we detect a dropped CF at this point, and the corresponding + // version edits belong to an atomic group, then we need to find out + // the preceding version edits in the same atomic group, and update + // their `remaining_entries_` member variable because we are NOT going + // to write the version edits' of dropped CF to the MANIFEST. If we + // don't update, then Recover can report corrupted atomic group because + // the `remaining_entries_` do not match. + if (!batch_edits.empty()) { + if (batch_edits.back()->is_in_atomic_group_ && + batch_edits.back()->remaining_entries_ > 0) { + assert(group_start < batch_edits.size()); + const auto& edit_list = last_writer->edit_list; + size_t k = 0; + while (k < edit_list.size()) { + if (!edit_list[k]->is_in_atomic_group_) { + break; + } else if (edit_list[k]->remaining_entries_ == 0) { + ++k; + break; + } + ++k; + } + for (auto i = group_start; i < batch_edits.size(); ++i) { + assert(static_cast(k) <= + batch_edits.back()->remaining_entries_); + batch_edits[i]->remaining_entries_ -= static_cast(k); + } + } + } continue; } // We do a linear search on versions because versions is small. @@ -2888,6 +2918,15 @@ Status VersionSet::ProcessManifestWrites( } assert(builder != nullptr); // make checker happy for (const auto& e : last_writer->edit_list) { + if (e->is_in_atomic_group_) { + if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ || + (batch_edits.back()->is_in_atomic_group_ && + batch_edits.back()->remaining_entries_ == 0)) { + group_start = batch_edits.size(); + } + } else if (group_start != std::numeric_limits::max()) { + group_start = std::numeric_limits::max(); + } LogAndApplyHelper(last_writer->cfd, builder, version, e, mu); batch_edits.push_back(e); } @@ -2900,6 +2939,42 @@ Status VersionSet::ProcessManifestWrites( } } +#ifndef NDEBUG + // Verify that version edits of atomic groups have correct + // remaining_entries_. + size_t k = 0; + while (k < batch_edits.size()) { + while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) { + ++k; + } + if (k == batch_edits.size()) { + break; + } + size_t i = k; + while (i < batch_edits.size()) { + if (!batch_edits[i]->is_in_atomic_group_) { + break; + } + assert(i - k + batch_edits[i]->remaining_entries_ == + batch_edits[k]->remaining_entries_); + if (batch_edits[i]->remaining_entries_ == 0) { + ++i; + break; + } + ++i; + } + assert(batch_edits[i - 1]->is_in_atomic_group_); + assert(0 == batch_edits[i - 1]->remaining_entries_); + std::vector tmp; + for (size_t j = k; j != i; ++j) { + tmp.emplace_back(batch_edits[j]); + } + TEST_SYNC_POINT_CALLBACK( + "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp); + k = i; + } +#endif // NDEBUG + uint64_t new_manifest_file_size = 0; Status s; @@ -3205,7 +3280,7 @@ Status VersionSet::LogAndApply( if (!manifest_writers_.empty()) { manifest_writers_.front()->cv.Signal(); } - return Status::OK(); + return Status::ShutdownInProgress(); } return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log, diff --git a/db/version_set_test.cc b/db/version_set_test.cc index c94ffb154..8b478ceb0 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -605,9 +605,13 @@ TEST_F(FindLevelFileTest, LevelOverlappingFiles) { ASSERT_TRUE(Overlaps("600", "700")); } -class VersionSetTest : public testing::Test { +class VersionSetTestBase { public: - VersionSetTest() + const static std::string kColumnFamilyName1; + const static std::string kColumnFamilyName2; + const static std::string kColumnFamilyName3; + + VersionSetTestBase() : env_(Env::Default()), dbname_(test::PerThreadDBPath("version_set_test")), db_options_(), @@ -635,8 +639,9 @@ class VersionSetTest : public testing::Test { new_db.SetNextFile(2); new_db.SetLastSequence(0); - const std::vector cf_names = {kDefaultColumnFamilyName, - "alice", "bob"}; + const std::vector cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; const int kInitialNumOfCfs = static_cast(cf_names.size()); autovector new_cfs; uint64_t last_seq = 1; @@ -711,6 +716,15 @@ class VersionSetTest : public testing::Test { std::shared_ptr mock_table_factory_; }; +const std::string VersionSetTestBase::kColumnFamilyName1 = "alice"; +const std::string VersionSetTestBase::kColumnFamilyName2 = "bob"; +const std::string VersionSetTestBase::kColumnFamilyName3 = "charles"; + +class VersionSetTest : public VersionSetTestBase, public testing::Test { + public: + VersionSetTest() : VersionSetTestBase() {} +}; + TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) { NewDB(); const int kGroupSize = 5; @@ -958,6 +972,124 @@ TEST_F(VersionSetTest, HandleIncorrectAtomicGroupSize) { versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); EXPECT_TRUE(incorrect_group_size); } + +class VersionSetTestDropOneCF : public VersionSetTestBase, + public testing::TestWithParam { + public: + VersionSetTestDropOneCF() : VersionSetTestBase() {} +}; + +// This test simulates the following execution sequence +// Time thread1 bg_flush_thr +// | Prepare version edits (e1,e2,e3) for atomic +// | flush cf1, cf2, cf3 +// | Enqueue e to drop cfi +// | to manifest_writers_ +// | Enqueue (e1,e2,e3) to manifest_writers_ +// | +// | Apply e, +// | cfi.IsDropped() is true +// | Apply (e1,e2,e3), +// | since cfi.IsDropped() == true, we need to +// | drop ei and write the rest to MANIFEST. +// V +// +// Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and +// last column family in an atomic group. +TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) { + std::vector column_families; + SequenceNumber last_seqno; + std::unique_ptr log_writer; + PrepareManifest(&column_families, &last_seqno, &log_writer); + Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + + EXPECT_OK(versions_->Recover(column_families, false /* read_only */)); + EXPECT_EQ(column_families.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + + const int kAtomicGroupSize = 3; + const std::vector non_default_cf_names = { + kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3}; + + // Drop one column family + VersionEdit drop_cf_edit; + drop_cf_edit.DropColumnFamily(); + const std::string cf_to_drop_name(GetParam()); + auto cfd_to_drop = + versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name); + ASSERT_NE(nullptr, cfd_to_drop); + cfd_to_drop->Ref(); // Increase its refcount because cfd_to_drop is used later + drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID()); + mutex_.Lock(); + s = versions_->LogAndApply(cfd_to_drop, + *cfd_to_drop->GetLatestMutableCFOptions(), + &drop_cf_edit, &mutex_); + mutex_.Unlock(); + ASSERT_OK(s); + + std::vector edits(kAtomicGroupSize); + uint32_t remaining = kAtomicGroupSize; + size_t i = 0; + autovector cfds; + autovector mutable_cf_options_list; + autovector> edit_lists; + for (const auto& cf_name : non_default_cf_names) { + auto cfd = (cf_name != cf_to_drop_name) + ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name) + : cfd_to_drop; + ASSERT_NE(nullptr, cfd); + cfds.push_back(cfd); + mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions()); + edits[i].SetColumnFamily(cfd->GetID()); + edits[i].SetLogNumber(0); + edits[i].SetNextFile(2); + edits[i].MarkAtomicGroup(--remaining); + edits[i].SetLastSequence(last_seqno++); + autovector tmp_edits; + tmp_edits.push_back(&edits[i]); + edit_lists.emplace_back(tmp_edits); + ++i; + } + int called = 0; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) { + std::vector* tmp_edits = + reinterpret_cast*>(arg); + EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size()); + for (const auto e : *tmp_edits) { + bool found = false; + for (const auto& e2 : edits) { + if (&e2 == e) { + found = true; + break; + } + } + ASSERT_TRUE(found); + } + ++called; + }); + SyncPoint::GetInstance()->EnableProcessing(); + mutex_.Lock(); + s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists, + &mutex_); + mutex_.Unlock(); + ASSERT_OK(s); + ASSERT_EQ(1, called); + if (cfd_to_drop->Unref()) { + delete cfd_to_drop; + cfd_to_drop = nullptr; + } +} + +INSTANTIATE_TEST_CASE_P( + AtomicGroup, VersionSetTestDropOneCF, + testing::Values(VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3)); + } // namespace rocksdb int main(int argc, char** argv) {