rocksdb/db/wal_edit.cc
Cheng Chang 00ee89b584 Track WAL in MANIFEST: update WalMetadata for WAL syncing (#7414)
Summary:
There are some tricky behaviors related to WAL sync:

- When creating a WAL, the WAL might not be synced, if the WAL directory is not synced, the WAL file's metadata may not even be synced to disk, so during recovery, when listing the WAL directory, the WAL may not even show up.
- During each DB::Write, the WriteOption can control whether the WAL should be synced, so a WAL previously not synced on creation can be synced during Write.

For each `SyncWAL`, we'll track the synced status and the current WAL size. Previously, we only track the WAL size on closing.
During recovery, we check that the on-disk WAL size is >= the last synced size.

So this PR introduces `synced_size` and `closed` to `WalMetadata` for the above design update.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7414

Test Plan:
- updated wal_edit_test
- updated version_edit_test

Reviewed By: riversand963

Differential Revision: D23796127

Pulled By: cheng-chang

fbshipit-source-id: 5498ab80f537c48a10157e71a4745716aef5cf30
2020-09-22 14:35:14 -07:00

193 lines
5.3 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/wal_edit.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
void WalAddition::EncodeTo(std::string* dst) const {
PutVarint64(dst, number_);
if (metadata_.HasSyncedSize()) {
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
PutVarint64(dst, metadata_.GetSyncedSizeInBytes());
}
if (metadata_.IsClosed()) {
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kClosed));
}
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
}
Status WalAddition::DecodeFrom(Slice* src) {
constexpr char class_name[] = "WalAddition";
if (!GetVarint64(src, &number_)) {
return Status::Corruption(class_name, "Error decoding WAL log number");
}
while (true) {
uint32_t tag_value = 0;
if (!GetVarint32(src, &tag_value)) {
return Status::Corruption(class_name, "Error decoding tag");
}
WalAdditionTag tag = static_cast<WalAdditionTag>(tag_value);
switch (tag) {
case WalAdditionTag::kSyncedSize: {
uint64_t size = 0;
if (!GetVarint64(src, &size)) {
return Status::Corruption(class_name, "Error decoding WAL file size");
}
metadata_.SetSyncedSizeInBytes(size);
break;
}
case WalAdditionTag::kClosed: {
metadata_.SetClosed();
break;
}
// TODO: process future tags such as checksum.
case WalAdditionTag::kTerminate:
return Status::OK();
default: {
std::stringstream ss;
ss << "Unknown tag " << tag_value;
return Status::Corruption(class_name, ss.str());
}
}
}
}
JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes"
<< wal.GetMetadata().GetSyncedSizeInBytes() << "Closed"
<< wal.GetMetadata().IsClosed();
return jw;
}
std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
os << "log_number: " << wal.GetLogNumber()
<< " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes()
<< " closed: " << wal.GetMetadata().IsClosed();
return os;
}
std::string WalAddition::DebugString() const {
std::ostringstream oss;
oss << *this;
return oss.str();
}
void WalDeletion::EncodeTo(std::string* dst) const {
PutVarint64(dst, number_);
}
Status WalDeletion::DecodeFrom(Slice* src) {
constexpr char class_name[] = "WalDeletion";
if (!GetVarint64(src, &number_)) {
return Status::Corruption(class_name, "Error decoding WAL log number");
}
return Status::OK();
}
JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal) {
jw << "LogNumber" << wal.GetLogNumber();
return jw;
}
std::ostream& operator<<(std::ostream& os, const WalDeletion& wal) {
os << "log_number: " << wal.GetLogNumber();
return os;
}
std::string WalDeletion::DebugString() const {
std::ostringstream oss;
oss << *this;
return oss.str();
}
Status WalSet::AddWal(const WalAddition& wal) {
auto it = wals_.lower_bound(wal.GetLogNumber());
bool existing = it != wals_.end() && it->first == wal.GetLogNumber();
if (wal.GetMetadata().IsClosed()) {
// The WAL must exist and not closed.
if (!existing) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is not created before closing";
return Status::Corruption("WalSet", ss.str());
}
if (it->second.IsClosed()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " is closed more than once";
return Status::Corruption("WalSet", ss.str());
}
}
// If the WAL has synced size, it must >= the previous size.
if (existing && it->second.HasSyncedSize() &&
(!wal.GetMetadata().HasSyncedSize() ||
wal.GetMetadata().GetSyncedSizeInBytes() <
it->second.GetSyncedSizeInBytes())) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber()
<< " must not have smaller synced size than previous one";
return Status::Corruption("WalSet", ss.str());
}
if (existing) {
it->second = wal.GetMetadata();
} else {
wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()});
}
return Status::OK();
}
Status WalSet::AddWals(const WalAdditions& wals) {
Status s;
for (const WalAddition& wal : wals) {
s = AddWal(wal);
if (!s.ok()) {
break;
}
}
return s;
}
Status WalSet::DeleteWal(const WalDeletion& wal) {
auto it = wals_.find(wal.GetLogNumber());
// The WAL must exist and has been closed.
if (it == wals_.end()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " must exist before deletion";
return Status::Corruption("WalSet", ss.str());
}
if (!it->second.IsClosed()) {
std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() << " must be closed before deletion";
return Status::Corruption("WalSet", ss.str());
}
wals_.erase(it);
return Status::OK();
}
Status WalSet::DeleteWals(const WalDeletions& wals) {
Status s;
for (const WalDeletion& wal : wals) {
s = DeleteWal(wal);
if (!s.ok()) {
break;
}
}
return s;
}
void WalSet::Reset() { wals_.clear(); }
} // namespace ROCKSDB_NAMESPACE