efe827baf0
Summary: Currently, when a WAL becomes obsolete after flushing, if VersionSet::WalSet does not contain the WAL, we do not track the WAL obsoletion event in MANIFEST. But consider this case: * WAL 10 is synced, a VersionEdit is LogAndApplied to MANIFEST to log this WAL addition event, but the VersionEdit is not applied to WalSet yet since its corresponding ManifestWriter is still pending in the write queue; * Since the above ManifestWriter is blocking, the LogAndApply will block on a conditional variable and release the db mutex, so another LogAndApply can proceed to enqueue other VersionEdits concurrently; * Now flush happens, and WAL 10 becomes obsolete, although WalSet does not contain WAL 10 yet, we should call LogAndApply to enqueue a VersionEdit to indicate the obsoletion of WAL 10; * otherwise, when the queued edit indicating WAL 10 addition is logged to MANIFEST, and DB crashes and reopens, the WAL 10 might have been removed from disk, but it still exists in MANIFEST. This PR changes the behavior to: always `LogAndApply` any WAL addition or obsoletion event, without considering the order issues caused by concurrency, but when applying the edits to `WalSet`, do not add the WALs if they are already obsolete. In this approach, the logical events of WAL addition and obsoletion are always tracked in MANIFEST, so we can inspect the MANIFEST and know all the previous WAL events, but we choose to ignore certain events due to the concurrency issues such as the case above, or the case in https://github.com/facebook/rocksdb/pull/7725. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7759 Test Plan: make check Reviewed By: pdillinger Differential Revision: D25423089 Pulled By: cheng-chang fbshipit-source-id: 9cb9a7fbc1875bf954f2a42f9b6cfd6d49a7b21c
205 lines
5.7 KiB
C++
205 lines
5.7 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/wal_edit.h"
|
|
|
|
#include "rocksdb/slice.h"
|
|
#include "rocksdb/status.h"
|
|
#include "util/coding.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
void WalAddition::EncodeTo(std::string* dst) const {
|
|
PutVarint64(dst, number_);
|
|
|
|
if (metadata_.HasSyncedSize()) {
|
|
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
|
|
PutVarint64(dst, metadata_.GetSyncedSizeInBytes());
|
|
}
|
|
|
|
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
|
|
}
|
|
|
|
Status WalAddition::DecodeFrom(Slice* src) {
|
|
constexpr char class_name[] = "WalAddition";
|
|
|
|
if (!GetVarint64(src, &number_)) {
|
|
return Status::Corruption(class_name, "Error decoding WAL log number");
|
|
}
|
|
|
|
while (true) {
|
|
uint32_t tag_value = 0;
|
|
if (!GetVarint32(src, &tag_value)) {
|
|
return Status::Corruption(class_name, "Error decoding tag");
|
|
}
|
|
WalAdditionTag tag = static_cast<WalAdditionTag>(tag_value);
|
|
switch (tag) {
|
|
case WalAdditionTag::kSyncedSize: {
|
|
uint64_t size = 0;
|
|
if (!GetVarint64(src, &size)) {
|
|
return Status::Corruption(class_name, "Error decoding WAL file size");
|
|
}
|
|
metadata_.SetSyncedSizeInBytes(size);
|
|
break;
|
|
}
|
|
// TODO: process future tags such as checksum.
|
|
case WalAdditionTag::kTerminate:
|
|
return Status::OK();
|
|
default: {
|
|
std::stringstream ss;
|
|
ss << "Unknown tag " << tag_value;
|
|
return Status::Corruption(class_name, ss.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
|
|
jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes"
|
|
<< wal.GetMetadata().GetSyncedSizeInBytes();
|
|
return jw;
|
|
}
|
|
|
|
std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
|
|
os << "log_number: " << wal.GetLogNumber()
|
|
<< " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes();
|
|
return os;
|
|
}
|
|
|
|
std::string WalAddition::DebugString() const {
|
|
std::ostringstream oss;
|
|
oss << *this;
|
|
return oss.str();
|
|
}
|
|
|
|
void WalDeletion::EncodeTo(std::string* dst) const {
|
|
PutVarint64(dst, number_);
|
|
}
|
|
|
|
Status WalDeletion::DecodeFrom(Slice* src) {
|
|
constexpr char class_name[] = "WalDeletion";
|
|
|
|
if (!GetVarint64(src, &number_)) {
|
|
return Status::Corruption(class_name, "Error decoding WAL log number");
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal) {
|
|
jw << "LogNumber" << wal.GetLogNumber();
|
|
return jw;
|
|
}
|
|
|
|
std::ostream& operator<<(std::ostream& os, const WalDeletion& wal) {
|
|
os << "log_number: " << wal.GetLogNumber();
|
|
return os;
|
|
}
|
|
|
|
std::string WalDeletion::DebugString() const {
|
|
std::ostringstream oss;
|
|
oss << *this;
|
|
return oss.str();
|
|
}
|
|
|
|
Status WalSet::AddWal(const WalAddition& wal) {
|
|
if (wal.GetLogNumber() < min_wal_number_to_keep_) {
|
|
// The WAL has been obsolete, ignore it.
|
|
return Status::OK();
|
|
}
|
|
|
|
auto it = wals_.lower_bound(wal.GetLogNumber());
|
|
bool existing = it != wals_.end() && it->first == wal.GetLogNumber();
|
|
if (existing && !wal.GetMetadata().HasSyncedSize()) {
|
|
std::stringstream ss;
|
|
ss << "WAL " << wal.GetLogNumber() << " is created more than once";
|
|
return Status::Corruption("WalSet::AddWal", ss.str());
|
|
}
|
|
// If the WAL has synced size, it must >= the previous size.
|
|
if (wal.GetMetadata().HasSyncedSize() && existing &&
|
|
it->second.HasSyncedSize() &&
|
|
wal.GetMetadata().GetSyncedSizeInBytes() <
|
|
it->second.GetSyncedSizeInBytes()) {
|
|
std::stringstream ss;
|
|
ss << "WAL " << wal.GetLogNumber()
|
|
<< " must not have smaller synced size than previous one";
|
|
return Status::Corruption("WalSet::AddWal", ss.str());
|
|
}
|
|
if (existing) {
|
|
it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes());
|
|
} else {
|
|
wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()});
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status WalSet::AddWals(const WalAdditions& wals) {
|
|
Status s;
|
|
for (const WalAddition& wal : wals) {
|
|
s = AddWal(wal);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status WalSet::DeleteWalsBefore(WalNumber wal) {
|
|
if (wal > min_wal_number_to_keep_) {
|
|
min_wal_number_to_keep_ = wal;
|
|
wals_.erase(wals_.begin(), wals_.lower_bound(wal));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void WalSet::Reset() {
|
|
wals_.clear();
|
|
min_wal_number_to_keep_ = 0;
|
|
}
|
|
|
|
Status WalSet::CheckWals(
|
|
Env* env,
|
|
const std::unordered_map<WalNumber, std::string>& logs_on_disk) const {
|
|
assert(env != nullptr);
|
|
|
|
Status s;
|
|
for (const auto& wal : wals_) {
|
|
const uint64_t log_number = wal.first;
|
|
const WalMetadata& wal_meta = wal.second;
|
|
|
|
if (!wal_meta.HasSyncedSize()) {
|
|
// The WAL and WAL directory is not even synced,
|
|
// so the WAL's inode may not be persisted,
|
|
// then the WAL might not show up when listing WAL directory.
|
|
continue;
|
|
}
|
|
|
|
if (logs_on_disk.find(log_number) == logs_on_disk.end()) {
|
|
std::stringstream ss;
|
|
ss << "Missing WAL with log number: " << log_number << ".";
|
|
s = Status::Corruption(ss.str());
|
|
break;
|
|
}
|
|
|
|
uint64_t log_file_size = 0;
|
|
s = env->GetFileSize(logs_on_disk.at(log_number), &log_file_size);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
if (log_file_size < wal_meta.GetSyncedSizeInBytes()) {
|
|
std::stringstream ss;
|
|
ss << "Size mismatch: WAL (log number: " << log_number
|
|
<< ") in MANIFEST is " << wal_meta.GetSyncedSizeInBytes()
|
|
<< " bytes , but actually is " << log_file_size << " bytes on disk.";
|
|
s = Status::Corruption(ss.str());
|
|
break;
|
|
}
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|