From 00ee89b5849d1e07ab6dc7542cc8d2d220f174e6 Mon Sep 17 00:00:00 2001 From: Cheng Chang Date: Tue, 22 Sep 2020 14:33:29 -0700 Subject: [PATCH] Track WAL in MANIFEST: update WalMetadata for WAL syncing (#7414) Summary: There are some tricky behaviors related to WAL sync: - When creating a WAL, the WAL might not be synced, if the WAL directory is not synced, the WAL file's metadata may not even be synced to disk, so during recovery, when listing the WAL directory, the WAL may not even show up. - During each DB::Write, the WriteOption can control whether the WAL should be synced, so a WAL previously not synced on creation can be synced during Write. For each `SyncWAL`, we'll track the synced status and the current WAL size. Previously, we only track the WAL size on closing. During recovery, we check that the on-disk WAL size is >= the last synced size. So this PR introduces `synced_size` and `closed` to `WalMetadata` for the above design update. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7414 Test Plan: - updated wal_edit_test - updated version_edit_test Reviewed By: riversand963 Differential Revision: D23796127 Pulled By: cheng-chang fbshipit-source-id: 5498ab80f537c48a10157e71a4745716aef5cf30 --- db/version_edit_test.cc | 21 ++++++++++------ db/wal_edit.cc | 55 +++++++++++++++++++++++++++-------------- db/wal_edit.h | 42 ++++++++++++++++++++----------- db/wal_edit_test.cc | 43 +++++++++++++++++++++----------- 4 files changed, 105 insertions(+), 56 deletions(-) diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index e073f5439..ea62d9a78 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -312,10 +312,14 @@ TEST_F(VersionEditTest, BlobFileAdditionAndGarbage) { TEST_F(VersionEditTest, AddWalEncodeDecode) { VersionEdit edit; for (uint64_t log_number = 1; log_number <= 20; log_number++) { - WalMetadata meta(rand() % 100); + WalMetadata meta; bool has_size = rand() % 2 == 0; if (has_size) { - meta.SetSizeInBytes(rand() % 1000); + meta.SetSyncedSizeInBytes(rand() % 1000); + } + bool is_closed = rand() % 2 == 0; + if (is_closed) { + meta.SetClosed(); } edit.AddWal(log_number, meta); } @@ -373,7 +377,7 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) { // Only has size tag, no terminate tag. std::string encoded_with_size = encoded_without_tag; PutVarint32(&encoded_with_size, - static_cast(WalAdditionTag::kSize)); + static_cast(WalAdditionTag::kSyncedSize)); PutVarint64(&encoded_with_size, kSizeInBytes); VersionEdit edit; Status s = edit.DecodeFrom(encoded_with_size); @@ -391,7 +395,7 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) { ASSERT_OK(edit.DecodeFrom(encoded_with_terminate)); auto& wal_addition = edit.GetWalAdditions()[0]; ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber); - ASSERT_FALSE(wal_addition.GetMetadata().HasSize()); + ASSERT_FALSE(wal_addition.GetMetadata().HasSyncedSize()); } } @@ -401,7 +405,7 @@ TEST_F(VersionEditTest, AddWalDecodeNoSize) { std::string encoded; PutVarint32(&encoded, Tag::kWalAddition); PutVarint64(&encoded, kLogNumber); - PutVarint32(&encoded, static_cast(WalAdditionTag::kSize)); + PutVarint32(&encoded, static_cast(WalAdditionTag::kSyncedSize)); // No real size after the size tag. { @@ -443,14 +447,14 @@ TEST_F(VersionEditTest, AddWalDebug) { for (int i = 0; i < n; i++) { const WalAddition& wal = wals[i]; ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[i]); - ASSERT_EQ(wal.GetMetadata().GetSizeInBytes(), kSizeInBytes[i]); + ASSERT_EQ(wal.GetMetadata().GetSyncedSizeInBytes(), kSizeInBytes[i]); } std::string expected_str = "VersionEdit {\n"; for (int i = 0; i < n; i++) { std::stringstream ss; ss << " WalAddition: log_number: " << kLogNumbers[i] - << " size_in_bytes: " << kSizeInBytes[i] << "\n"; + << " synced_size_in_bytes: " << kSizeInBytes[i] << " closed: 0\n"; expected_str += ss.str(); } expected_str += " ColumnFamily: 0\n}\n"; @@ -460,7 +464,8 @@ TEST_F(VersionEditTest, AddWalDebug) { for (int i = 0; i < n; i++) { std::stringstream ss; ss << "{\"LogNumber\": " << kLogNumbers[i] << ", " - << "\"SizeInBytes\": " << kSizeInBytes[i] << "}"; + << "\"SyncedSizeInBytes\": " << kSizeInBytes[i] << ", " + << "\"Closed\": 0}"; if (i < n - 1) ss << ", "; expected_json += ss.str(); } diff --git a/db/wal_edit.cc b/db/wal_edit.cc index 8f15540e1..023b471d3 100644 --- a/db/wal_edit.cc +++ b/db/wal_edit.cc @@ -14,9 +14,13 @@ namespace ROCKSDB_NAMESPACE { void WalAddition::EncodeTo(std::string* dst) const { PutVarint64(dst, number_); - if (metadata_.HasSize()) { - PutVarint32(dst, static_cast(WalAdditionTag::kSize)); - PutVarint64(dst, metadata_.GetSizeInBytes()); + if (metadata_.HasSyncedSize()) { + PutVarint32(dst, static_cast(WalAdditionTag::kSyncedSize)); + PutVarint64(dst, metadata_.GetSyncedSizeInBytes()); + } + + if (metadata_.IsClosed()) { + PutVarint32(dst, static_cast(WalAdditionTag::kClosed)); } PutVarint32(dst, static_cast(WalAdditionTag::kTerminate)); @@ -36,12 +40,16 @@ Status WalAddition::DecodeFrom(Slice* src) { } WalAdditionTag tag = static_cast(tag_value); switch (tag) { - case WalAdditionTag::kSize: { + case WalAdditionTag::kSyncedSize: { uint64_t size = 0; if (!GetVarint64(src, &size)) { return Status::Corruption(class_name, "Error decoding WAL file size"); } - metadata_.SetSizeInBytes(size); + metadata_.SetSyncedSizeInBytes(size); + break; + } + case WalAdditionTag::kClosed: { + metadata_.SetClosed(); break; } // TODO: process future tags such as checksum. @@ -57,14 +65,16 @@ Status WalAddition::DecodeFrom(Slice* src) { } JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) { - jw << "LogNumber" << wal.GetLogNumber() << "SizeInBytes" - << wal.GetMetadata().GetSizeInBytes(); + jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes" + << wal.GetMetadata().GetSyncedSizeInBytes() << "Closed" + << wal.GetMetadata().IsClosed(); return jw; } std::ostream& operator<<(std::ostream& os, const WalAddition& wal) { os << "log_number: " << wal.GetLogNumber() - << " size_in_bytes: " << wal.GetMetadata().GetSizeInBytes(); + << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes() + << " closed: " << wal.GetMetadata().IsClosed(); return os; } @@ -106,26 +116,33 @@ std::string WalDeletion::DebugString() const { Status WalSet::AddWal(const WalAddition& wal) { auto it = wals_.lower_bound(wal.GetLogNumber()); - if (wal.GetMetadata().HasSize()) { - // The WAL must exist without size. - if (it == wals_.end() || it->first != wal.GetLogNumber()) { + bool existing = it != wals_.end() && it->first == wal.GetLogNumber(); + if (wal.GetMetadata().IsClosed()) { + // The WAL must exist and not closed. + if (!existing) { std::stringstream ss; ss << "WAL " << wal.GetLogNumber() << " is not created before closing"; return Status::Corruption("WalSet", ss.str()); } - if (it->second.HasSize()) { + if (it->second.IsClosed()) { std::stringstream ss; ss << "WAL " << wal.GetLogNumber() << " is closed more than once"; return Status::Corruption("WalSet", ss.str()); } + } + // If the WAL has synced size, it must >= the previous size. + if (existing && it->second.HasSyncedSize() && + (!wal.GetMetadata().HasSyncedSize() || + wal.GetMetadata().GetSyncedSizeInBytes() < + it->second.GetSyncedSizeInBytes())) { + std::stringstream ss; + ss << "WAL " << wal.GetLogNumber() + << " must not have smaller synced size than previous one"; + return Status::Corruption("WalSet", ss.str()); + } + if (existing) { it->second = wal.GetMetadata(); } else { - // The WAL must not exist beforehand. - if (it != wals_.end() && it->first == wal.GetLogNumber()) { - std::stringstream ss; - ss << "WAL " << wal.GetLogNumber() << " is created more than once"; - return Status::Corruption("WalSet", ss.str()); - } wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()}); } return Status::OK(); @@ -150,7 +167,7 @@ Status WalSet::DeleteWal(const WalDeletion& wal) { ss << "WAL " << wal.GetLogNumber() << " must exist before deletion"; return Status::Corruption("WalSet", ss.str()); } - if (!it->second.HasSize()) { + if (!it->second.IsClosed()) { std::stringstream ss; ss << "WAL " << wal.GetLogNumber() << " must be closed before deletion"; return Status::Corruption("WalSet", ss.str()); diff --git a/db/wal_edit.h b/db/wal_edit.h index e5b577e1b..183b4ae5e 100644 --- a/db/wal_edit.h +++ b/db/wal_edit.h @@ -4,6 +4,8 @@ // (found in the LICENSE.Apache file in the root directory). // WAL related classes used in VersionEdit and VersionSet. +// Modifications to WalAddition and WalDeletion may need to update +// VersionEdit and its related tests. #pragma once @@ -13,6 +15,7 @@ #include #include "logging/event_logger.h" +#include "port/port.h" #include "rocksdb/rocksdb_namespace.h" namespace ROCKSDB_NAMESPACE { @@ -28,28 +31,39 @@ class WalMetadata { public: WalMetadata() = default; - explicit WalMetadata(uint64_t size_bytes) : size_bytes_(size_bytes) {} + explicit WalMetadata(uint64_t synced_size_bytes) + : synced_size_bytes_(synced_size_bytes) {} - bool HasSize() const { return size_bytes_ != kUnknownWalSize; } + bool IsClosed() const { return closed_; } - void SetSizeInBytes(uint64_t bytes) { size_bytes_ = bytes; } + void SetClosed() { closed_ = true; } - uint64_t GetSizeInBytes() const { return size_bytes_; } + bool HasSyncedSize() const { return synced_size_bytes_ != kUnknownWalSize; } + + void SetSyncedSizeInBytes(uint64_t bytes) { synced_size_bytes_ = bytes; } + + uint64_t GetSyncedSizeInBytes() const { return synced_size_bytes_; } private: - // The size of WAL is unknown, used when the WAL is not closed yet. - constexpr static uint64_t kUnknownWalSize = 0; + // The size of WAL is unknown, used when the WAL is not synced yet or is + // empty. + constexpr static uint64_t kUnknownWalSize = port::kMaxUint64; - // Size of a closed WAL in bytes. - uint64_t size_bytes_ = kUnknownWalSize; + // Size of the most recently synced WAL in bytes. + uint64_t synced_size_bytes_ = kUnknownWalSize; + + // Whether the WAL is closed. + bool closed_ = false; }; // These tags are persisted to MANIFEST, so it's part of the user API. enum class WalAdditionTag : uint32_t { // Indicates that there are no more tags. kTerminate = 1, - // Size in bytes. - kSize = 2, + // Synced Size in bytes. + kSyncedSize = 2, + // Whether the WAL is closed. + kClosed = 3, // Add tags in the future, such as checksum? }; @@ -117,15 +131,15 @@ using WalDeletions = std::vector; class WalSet { public: // Add WAL(s). - // If the WAL has size, it means the WAL is closed, - // then there must be an existing WAL without size that is added - // when creating the WAL, otherwise, return Status::Corruption. + // If the WAL is closed, + // then there must be an existing unclosed WAL, + // otherwise, return Status::Corruption. // Can happen when applying a VersionEdit or recovering from MANIFEST. Status AddWal(const WalAddition& wal); Status AddWals(const WalAdditions& wals); // Delete WAL(s). - // The WAL to be deleted must exist, otherwise, + // The WAL to be deleted must exist and be closed, otherwise, // return Status::Corruption. // Can happen when applying a VersionEdit or recovering from MANIFEST. Status DeleteWal(const WalDeletion& wal); diff --git a/db/wal_edit_test.cc b/db/wal_edit_test.cc index fb3bad070..571d6fe3f 100644 --- a/db/wal_edit_test.cc +++ b/db/wal_edit_test.cc @@ -24,7 +24,9 @@ TEST(WalSet, AddDeleteReset) { // Close WAL 1 - 5. for (WalNumber log_number = 1; log_number <= 5; log_number++) { - wals.AddWal(WalAddition(log_number, WalMetadata(100))); + WalMetadata wal(100); + wal.SetClosed(); + wals.AddWal(WalAddition(log_number, wal)); } ASSERT_EQ(wals.GetWals().size(), 10); @@ -49,20 +51,23 @@ TEST(WalSet, Overwrite) { constexpr uint64_t kBytes = 200; WalSet wals; wals.AddWal(WalAddition(kNumber)); - ASSERT_FALSE(wals.GetWals().at(kNumber).HasSize()); + ASSERT_FALSE(wals.GetWals().at(kNumber).HasSyncedSize()); wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))); - ASSERT_TRUE(wals.GetWals().at(kNumber).HasSize()); - ASSERT_EQ(wals.GetWals().at(kNumber).GetSizeInBytes(), kBytes); + ASSERT_TRUE(wals.GetWals().at(kNumber).HasSyncedSize()); + ASSERT_EQ(wals.GetWals().at(kNumber).GetSyncedSizeInBytes(), kBytes); } -TEST(WalSet, CreateTwice) { +TEST(WalSet, SmallerSyncedSize) { constexpr WalNumber kNumber = 100; + constexpr uint64_t kBytes = 100; WalSet wals; - ASSERT_OK(wals.AddWal(WalAddition(kNumber))); - Status s = wals.AddWal(WalAddition(kNumber)); + ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)))); + Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(0))); ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE(s.ToString().find("WAL 100 is created more than once") != - std::string::npos); + ASSERT_TRUE( + s.ToString().find( + "WAL 100 must not have smaller synced size than previous one") != + std::string::npos); } TEST(WalSet, CloseTwice) { @@ -70,8 +75,10 @@ TEST(WalSet, CloseTwice) { constexpr uint64_t kBytes = 200; WalSet wals; ASSERT_OK(wals.AddWal(WalAddition(kNumber))); - ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)))); - Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))); + WalMetadata wal(kBytes); + wal.SetClosed(); + ASSERT_OK(wals.AddWal(WalAddition(kNumber, wal))); + Status s = wals.AddWal(WalAddition(kNumber, wal)); ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.ToString().find("WAL 100 is closed more than once") != std::string::npos); @@ -81,7 +88,9 @@ TEST(WalSet, CloseBeforeCreate) { constexpr WalNumber kNumber = 100; constexpr uint64_t kBytes = 200; WalSet wals; - Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))); + WalMetadata wal(kBytes); + wal.SetClosed(); + Status s = wals.AddWal(WalAddition(kNumber, wal)); ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.ToString().find("WAL 100 is not created before closing") != std::string::npos); @@ -92,11 +101,15 @@ TEST(WalSet, CreateAfterClose) { constexpr uint64_t kBytes = 200; WalSet wals; ASSERT_OK(wals.AddWal(WalAddition(kNumber))); - ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)))); + WalMetadata wal(kBytes); + wal.SetClosed(); + ASSERT_OK(wals.AddWal(WalAddition(kNumber, wal))); Status s = wals.AddWal(WalAddition(kNumber)); ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE(s.ToString().find("WAL 100 is created more than once") != - std::string::npos); + ASSERT_TRUE( + s.ToString().find( + "WAL 100 must not have smaller synced size than previous one") != + std::string::npos); } TEST(WalSet, DeleteNonExistingWal) {