From d1118f6f19cacbeafc9e3dc8bdeaa4f728dd2168 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 30 Oct 2018 16:35:58 -0700 Subject: [PATCH] Add test to check if DB can handle atomic group (#4433) Summary: Add unit tests to demonstrate that `VersionSet::Recover` is able to detect and handle cases in which the MANIFEST has valid atomic group, incomplete trailing atomic group, atomic group mixed with normal version edits and atomic group with incorrect size. With this capability, RocksDB identifies non-valid groups of version edits and do not apply them, thus guaranteeing that the db is restored to a state consistent with the most recent successful atomic flush before applying WAL. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4433 Differential Revision: D10079202 Pulled By: riversand963 fbshipit-source-id: a0e0b8bf4da1cf68e044d397588c121b66c68876 --- db/version_set.cc | 15 ++- db/version_set_test.cc | 264 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 258 insertions(+), 21 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index b86fac22e..c13fadc17 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3474,14 +3474,21 @@ Status VersionSet::Recover( if (edit.is_in_atomic_group_) { if (replay_buffer.empty()) { replay_buffer.resize(edit.remaining_entries_ + 1); + TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:FirstInAtomicGroup", + &edit); } ++num_entries_decoded; if (num_entries_decoded + edit.remaining_entries_ != static_cast(replay_buffer.size())) { - return Status::Corruption("corrupted atomic group"); + TEST_SYNC_POINT_CALLBACK( + "VersionSet::Recover:IncorrectAtomicGroupSize", &edit); + s = Status::Corruption("corrupted atomic group"); + break; } replay_buffer[num_entries_decoded - 1] = std::move(edit); if (num_entries_decoded == replay_buffer.size()) { + TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup", + &edit); for (auto& e : replay_buffer) { s = ApplyOneVersionEdit( e, cf_name_to_options, column_families_not_found, builders, @@ -3496,9 +3503,13 @@ Status VersionSet::Recover( replay_buffer.clear(); num_entries_decoded = 0; } + TEST_SYNC_POINT("VersionSet::Recover:AtomicGroup"); } else { if (!replay_buffer.empty()) { - return Status::Corruption("corrupted atomic group"); + TEST_SYNC_POINT_CALLBACK( + "VersionSet::Recover:AtomicGroupMixedWithNormalEdits", &edit); + s = Status::Corruption("corrupted atomic group"); + break; } s = ApplyOneVersionEdit( edit, cf_name_to_options, column_families_not_found, builders, diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 505da7775..37e1b7b8f 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -605,9 +605,9 @@ TEST_F(FindLevelFileTest, LevelOverlappingFiles) { ASSERT_TRUE(Overlaps("600", "700")); } -class ManifestWriterTest : public testing::Test { +class VersionSetTest : public testing::Test { public: - ManifestWriterTest() + VersionSetTest() : env_(Env::Default()), dbname_(test::PerThreadDBPath("version_set_test")), db_options_(), @@ -624,8 +624,12 @@ class ManifestWriterTest : public testing::Test { std::numeric_limits::max()); } - // Create DB with 3 column families. - void NewDB() { + void PrepareManifest(std::vector* column_families, + SequenceNumber* last_seqno, + std::unique_ptr* log_writer) { + assert(column_families != nullptr); + assert(last_seqno != nullptr); + assert(log_writer != nullptr); VersionEdit new_db; new_db.SetLogNumber(0); new_db.SetNextFile(2); @@ -646,6 +650,7 @@ class ManifestWriterTest : public testing::Test { new_cf.SetLastSequence(last_seq++); new_cfs.emplace_back(new_cf); } + *last_seqno = last_seq; const std::string manifest = DescriptorFileName(dbname_, 1); unique_ptr file; @@ -655,32 +660,40 @@ class ManifestWriterTest : public testing::Test { unique_ptr file_writer( new WritableFileWriter(std::move(file), manifest, env_options_)); { - log::Writer log(std::move(file_writer), 0, false); + log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); std::string record; new_db.EncodeTo(&record); - s = log.AddRecord(record); + s = (*log_writer)->AddRecord(record); for (const auto& e : new_cfs) { + record.clear(); e.EncodeTo(&record); - s = log.AddRecord(record); + s = (*log_writer)->AddRecord(record); ASSERT_OK(s); } } ASSERT_OK(s); - // Make "CURRENT" file point to the new manifest file. - s = SetCurrentFile(env_, dbname_, 1, nullptr); - std::vector column_families; cf_options_.table_factory = mock_table_factory_; for (const auto& cf_name : cf_names) { - column_families.emplace_back(cf_name, cf_options_); + column_families->emplace_back(cf_name, cf_options_); } + } + + // Create DB with 3 column families. + void NewDB() { + std::vector column_families; + SequenceNumber last_seqno; + std::unique_ptr log_writer; + + PrepareManifest(&column_families, &last_seqno, &log_writer); + log_writer.reset(); + // Make "CURRENT" file point to the new manifest file. + Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); EXPECT_OK(versions_->Recover(column_families, false)); - EXPECT_EQ(kInitialNumOfCfs, + EXPECT_EQ(column_families.size(), versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); - for (auto cfd : *versions_->GetColumnFamilySet()) { - cfds_.emplace_back(cfd); - } } Env* env_; @@ -692,14 +705,13 @@ class ManifestWriterTest : public testing::Test { std::shared_ptr table_cache_; WriteController write_controller_; WriteBufferManager write_buffer_manager_; - std::unique_ptr versions_; + std::shared_ptr versions_; InstrumentedMutex mutex_; std::atomic shutting_down_; std::shared_ptr mock_table_factory_; - std::vector cfds_; }; -TEST_F(ManifestWriterTest, SameColumnFamilyGroupCommit) { +TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) { NewDB(); const int kGroupSize = 5; autovector edits; @@ -710,13 +722,15 @@ TEST_F(ManifestWriterTest, SameColumnFamilyGroupCommit) { autovector all_mutable_cf_options; autovector> edit_lists; for (int i = 0; i != kGroupSize; ++i) { - cfds.emplace_back(cfds_[0]); + cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault()); all_mutable_cf_options.emplace_back(&mutable_cf_options_); autovector edit_list; edit_list.emplace_back(&edits[i]); edit_lists.emplace_back(edit_list); } + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); int count = 0; SyncPoint::GetInstance()->SetCallBack( "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) { @@ -732,6 +746,218 @@ TEST_F(ManifestWriterTest, SameColumnFamilyGroupCommit) { EXPECT_OK(s); EXPECT_EQ(kGroupSize - 1, count); } + +TEST_F(VersionSetTest, HandleValidAtomicGroup) { + std::vector column_families; + SequenceNumber last_seqno; + std::unique_ptr log_writer; + PrepareManifest(&column_families, &last_seqno, &log_writer); + + // Append multiple version edits that form an atomic group + const int kAtomicGroupSize = 3; + std::vector edits(kAtomicGroupSize); + int remaining = kAtomicGroupSize; + for (size_t i = 0; i != edits.size(); ++i) { + edits[i].SetLogNumber(0); + edits[i].SetNextFile(2); + edits[i].MarkAtomicGroup(--remaining); + edits[i].SetLastSequence(last_seqno++); + } + Status s; + for (const auto& edit : edits) { + std::string record; + edit.EncodeTo(&record); + s = log_writer->AddRecord(record); + ASSERT_OK(s); + } + log_writer.reset(); + + s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + bool first_in_atomic_group = false; + bool last_in_atomic_group = false; + + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::Recover:FirstInAtomicGroup", [&](void* arg) { + VersionEdit* e = reinterpret_cast(arg); + EXPECT_EQ(edits.front().DebugString(), + e->DebugString()); // compare based on value + first_in_atomic_group = true; + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::Recover:LastInAtomicGroup", [&](void* arg) { + VersionEdit* e = reinterpret_cast(arg); + EXPECT_EQ(edits.back().DebugString(), + e->DebugString()); // compare based on value + EXPECT_TRUE(first_in_atomic_group); + last_in_atomic_group = true; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + EXPECT_OK(versions_->Recover(column_families, false)); + EXPECT_EQ(column_families.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group); + EXPECT_TRUE(last_in_atomic_group); +} + +TEST_F(VersionSetTest, HandleIncompleteTrailingAtomicGroup) { + std::vector column_families; + SequenceNumber last_seqno; + std::unique_ptr log_writer; + PrepareManifest(&column_families, &last_seqno, &log_writer); + + // Append multiple version edits that form an atomic group + const int kAtomicGroupSize = 4; + const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; + std::vector edits(kNumberOfPersistedVersionEdits); + int remaining = kAtomicGroupSize; + for (size_t i = 0; i != edits.size(); ++i) { + edits[i].SetLogNumber(0); + edits[i].SetNextFile(2); + edits[i].MarkAtomicGroup(--remaining); + edits[i].SetLastSequence(last_seqno++); + } + Status s; + for (const auto& edit : edits) { + std::string record; + edit.EncodeTo(&record); + s = log_writer->AddRecord(record); + ASSERT_OK(s); + } + log_writer.reset(); + + s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + bool first_in_atomic_group = false; + bool last_in_atomic_group = false; + size_t num = 0; + + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::Recover:FirstInAtomicGroup", [&](void* arg) { + VersionEdit* e = reinterpret_cast(arg); + EXPECT_EQ(edits.front().DebugString(), + e->DebugString()); // compare based on value + first_in_atomic_group = true; + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::Recover:LastInAtomicGroup", + [&](void* /* arg */) { last_in_atomic_group = true; }); + SyncPoint::GetInstance()->SetCallBack("VersionSet::Recover:AtomicGroup", + [&](void* /* arg */) { ++num; }); + SyncPoint::GetInstance()->EnableProcessing(); + + EXPECT_OK(versions_->Recover(column_families, false)); + EXPECT_EQ(column_families.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group); + EXPECT_FALSE(last_in_atomic_group); + EXPECT_EQ(kNumberOfPersistedVersionEdits, num); +} + +TEST_F(VersionSetTest, HandleCorruptedAtomicGroup) { + std::vector column_families; + SequenceNumber last_seqno; + std::unique_ptr log_writer; + PrepareManifest(&column_families, &last_seqno, &log_writer); + + // Append multiple version edits that form an atomic group + const int kAtomicGroupSize = 4; + std::vector edits(kAtomicGroupSize); + int remaining = kAtomicGroupSize; + for (size_t i = 0; i != edits.size(); ++i) { + edits[i].SetLogNumber(0); + edits[i].SetNextFile(2); + if (i != (kAtomicGroupSize / 2)) { + edits[i].MarkAtomicGroup(--remaining); + } + edits[i].SetLastSequence(last_seqno++); + } + Status s; + for (const auto& edit : edits) { + std::string record; + edit.EncodeTo(&record); + s = log_writer->AddRecord(record); + ASSERT_OK(s); + } + log_writer.reset(); + + s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + bool mixed = false; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::Recover:AtomicGroupMixedWithNormalEdits", [&](void* arg) { + VersionEdit* e = reinterpret_cast(arg); + EXPECT_EQ(edits[kAtomicGroupSize / 2].DebugString(), e->DebugString()); + mixed = true; + }); + SyncPoint::GetInstance()->EnableProcessing(); + EXPECT_NOK(versions_->Recover(column_families, false)); + EXPECT_EQ(column_families.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(mixed); +} + +TEST_F(VersionSetTest, HandleIncorrectAtomicGroupSize) { + std::vector column_families; + SequenceNumber last_seqno; + std::unique_ptr log_writer; + PrepareManifest(&column_families, &last_seqno, &log_writer); + + // Append multiple version edits that form an atomic group + const int kAtomicGroupSize = 4; + std::vector edits(kAtomicGroupSize); + int remaining = kAtomicGroupSize; + for (size_t i = 0; i != edits.size(); ++i) { + edits[i].SetLogNumber(0); + edits[i].SetNextFile(2); + if (i != 1) { + edits[i].MarkAtomicGroup(--remaining); + } else { + edits[i].MarkAtomicGroup(remaining--); + } + edits[i].SetLastSequence(last_seqno++); + } + Status s; + for (const auto& edit : edits) { + std::string record; + edit.EncodeTo(&record); + s = log_writer->AddRecord(record); + ASSERT_OK(s); + } + log_writer.reset(); + + s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + bool incorrect_group_size = false; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::Recover:IncorrectAtomicGroupSize", [&](void* arg) { + VersionEdit* e = reinterpret_cast(arg); + EXPECT_EQ(edits[1].DebugString(), e->DebugString()); + incorrect_group_size = true; + }); + SyncPoint::GetInstance()->EnableProcessing(); + EXPECT_NOK(versions_->Recover(column_families, false)); + EXPECT_EQ(column_families.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(incorrect_group_size); +} } // namespace rocksdb int main(int argc, char** argv) {