07030c6f4a
Summary: Consider the case: 1. All column families are flushed, so all WALs become obsolete, but no WAL is removed from disk yet because the removal is asynchronous, a VersionEdit is written to MANIFEST indicating that WALs before a certain WAL number are obsolete, let's say this number is 3; 2. `SyncWAL` is called, so all the on-disk WALs are synced, and if track_and_verify_wal_in_manifest=true, the WALs will be tracked in MANIFEST, let's say the WAL numbers are 1 and 2; 3. DB crashes; 4. During DB recovery, when replaying MANIFEST, we first see that WAL with number < 3 are obsolete, then we see that WAL 1 and 2 are synced, so according to current implementation of `WalSet`, the `WalSet` will be recovered to include WAL 1 and 2; 5. WAL 1 and 2 are asynchronously deleted from disk, then the WAL verification algorithm fails with `Corruption: missing WAL`. The above case is reproduced in a new unit test `DBBasicTestTrackWal::DoNotTrackObsoleteWal`. The fix is to maintain the upper bound of the obsolete WAL numbers, any WAL with number less than the maintained number is considered to be obsolete, so shouldn't be tracked even if they are later synced. The number is maintained in `WalSet`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7725 Test Plan: 1. a new unit test `DBBasicTestTrackWal::DoNotTrackObsoleteWal` is added. 2. run `make crash_test` on devserver. Reviewed By: riversand963 Differential Revision: D25238914 Pulled By: cheng-chang fbshipit-source-id: f5dccd57c3d89f19565ec5731f2d42f06d272b72
204 lines
5.8 KiB
C++
204 lines
5.8 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/wal_edit.h"
|
|
|
|
#include "rocksdb/slice.h"
|
|
#include "rocksdb/status.h"
|
|
#include "util/coding.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
void WalAddition::EncodeTo(std::string* dst) const {
|
|
PutVarint64(dst, number_);
|
|
|
|
if (metadata_.HasSyncedSize()) {
|
|
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
|
|
PutVarint64(dst, metadata_.GetSyncedSizeInBytes());
|
|
}
|
|
|
|
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
|
|
}
|
|
|
|
Status WalAddition::DecodeFrom(Slice* src) {
|
|
constexpr char class_name[] = "WalAddition";
|
|
|
|
if (!GetVarint64(src, &number_)) {
|
|
return Status::Corruption(class_name, "Error decoding WAL log number");
|
|
}
|
|
|
|
while (true) {
|
|
uint32_t tag_value = 0;
|
|
if (!GetVarint32(src, &tag_value)) {
|
|
return Status::Corruption(class_name, "Error decoding tag");
|
|
}
|
|
WalAdditionTag tag = static_cast<WalAdditionTag>(tag_value);
|
|
switch (tag) {
|
|
case WalAdditionTag::kSyncedSize: {
|
|
uint64_t size = 0;
|
|
if (!GetVarint64(src, &size)) {
|
|
return Status::Corruption(class_name, "Error decoding WAL file size");
|
|
}
|
|
metadata_.SetSyncedSizeInBytes(size);
|
|
break;
|
|
}
|
|
// TODO: process future tags such as checksum.
|
|
case WalAdditionTag::kTerminate:
|
|
return Status::OK();
|
|
default: {
|
|
std::stringstream ss;
|
|
ss << "Unknown tag " << tag_value;
|
|
return Status::Corruption(class_name, ss.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
|
|
jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes"
|
|
<< wal.GetMetadata().GetSyncedSizeInBytes();
|
|
return jw;
|
|
}
|
|
|
|
std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
|
|
os << "log_number: " << wal.GetLogNumber()
|
|
<< " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes();
|
|
return os;
|
|
}
|
|
|
|
std::string WalAddition::DebugString() const {
|
|
std::ostringstream oss;
|
|
oss << *this;
|
|
return oss.str();
|
|
}
|
|
|
|
void WalDeletion::EncodeTo(std::string* dst) const {
|
|
PutVarint64(dst, number_);
|
|
}
|
|
|
|
Status WalDeletion::DecodeFrom(Slice* src) {
|
|
constexpr char class_name[] = "WalDeletion";
|
|
|
|
if (!GetVarint64(src, &number_)) {
|
|
return Status::Corruption(class_name, "Error decoding WAL log number");
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal) {
|
|
jw << "LogNumber" << wal.GetLogNumber();
|
|
return jw;
|
|
}
|
|
|
|
std::ostream& operator<<(std::ostream& os, const WalDeletion& wal) {
|
|
os << "log_number: " << wal.GetLogNumber();
|
|
return os;
|
|
}
|
|
|
|
std::string WalDeletion::DebugString() const {
|
|
std::ostringstream oss;
|
|
oss << *this;
|
|
return oss.str();
|
|
}
|
|
|
|
Status WalSet::AddWal(const WalAddition& wal) {
|
|
if (wal.GetLogNumber() < min_wal_number_to_keep_) {
|
|
std::stringstream ss;
|
|
ss << "min_wal_number_to_keep is " << min_wal_number_to_keep_ << ", so WAL "
|
|
<< wal.GetLogNumber() << " is obsolete";
|
|
return Status::Corruption("WalSet::AddWal", ss.str());
|
|
}
|
|
|
|
auto it = wals_.lower_bound(wal.GetLogNumber());
|
|
bool existing = it != wals_.end() && it->first == wal.GetLogNumber();
|
|
if (existing && !wal.GetMetadata().HasSyncedSize()) {
|
|
std::stringstream ss;
|
|
ss << "WAL " << wal.GetLogNumber() << " is created more than once";
|
|
return Status::Corruption("WalSet::AddWal", ss.str());
|
|
}
|
|
// If the WAL has synced size, it must >= the previous size.
|
|
if (wal.GetMetadata().HasSyncedSize() && existing &&
|
|
it->second.HasSyncedSize() &&
|
|
wal.GetMetadata().GetSyncedSizeInBytes() <
|
|
it->second.GetSyncedSizeInBytes()) {
|
|
std::stringstream ss;
|
|
ss << "WAL " << wal.GetLogNumber()
|
|
<< " must not have smaller synced size than previous one";
|
|
return Status::Corruption("WalSet::AddWal", ss.str());
|
|
}
|
|
if (existing) {
|
|
it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes());
|
|
} else {
|
|
wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()});
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status WalSet::AddWals(const WalAdditions& wals) {
|
|
Status s;
|
|
for (const WalAddition& wal : wals) {
|
|
s = AddWal(wal);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status WalSet::DeleteWalsBefore(WalNumber wal) {
|
|
wals_.erase(wals_.begin(), wals_.lower_bound(wal));
|
|
return Status::OK();
|
|
}
|
|
|
|
void WalSet::Reset() {
|
|
wals_.clear();
|
|
min_wal_number_to_keep_ = 0;
|
|
}
|
|
|
|
Status WalSet::CheckWals(
|
|
Env* env,
|
|
const std::unordered_map<WalNumber, std::string>& logs_on_disk) const {
|
|
assert(env != nullptr);
|
|
|
|
Status s;
|
|
for (const auto& wal : wals_) {
|
|
const uint64_t log_number = wal.first;
|
|
const WalMetadata& wal_meta = wal.second;
|
|
|
|
if (!wal_meta.HasSyncedSize()) {
|
|
// The WAL and WAL directory is not even synced,
|
|
// so the WAL's inode may not be persisted,
|
|
// then the WAL might not show up when listing WAL directory.
|
|
continue;
|
|
}
|
|
|
|
if (logs_on_disk.find(log_number) == logs_on_disk.end()) {
|
|
std::stringstream ss;
|
|
ss << "Missing WAL with log number: " << log_number << ".";
|
|
s = Status::Corruption(ss.str());
|
|
break;
|
|
}
|
|
|
|
uint64_t log_file_size = 0;
|
|
s = env->GetFileSize(logs_on_disk.at(log_number), &log_file_size);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
if (log_file_size < wal_meta.GetSyncedSizeInBytes()) {
|
|
std::stringstream ss;
|
|
ss << "Size mismatch: WAL (log number: " << log_number
|
|
<< ") in MANIFEST is " << wal_meta.GetSyncedSizeInBytes()
|
|
<< " bytes , but actually is " << log_file_size << " bytes on disk.";
|
|
s = Status::Corruption(ss.str());
|
|
break;
|
|
}
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|