Fail recovery when MANIFEST record checksum mismatch (#6996)
Summary: https://github.com/facebook/rocksdb/issues/5411 refactored `VersionSet::Recover` but introduced a bug, explained as follows. Before, once a checksum mismatch happens, `reporter` will set `s` to be non-ok. Therefore, Recover will stop processing the MANIFEST any further. ``` // Correct // Inside Recover LogReporter reporter; reporter.status = &s; log::Reader reader(..., reporter); while (reader.ReadRecord() && s.ok()) { ... } ``` The bug is that, the local variable `s` in `ReadAndRecover` won't be updated by `reporter` while reading the MANIFEST. It is possible that the reader sees a checksum mismatch in a record, but `ReadRecord` retries internally read and finds the next valid record. The mismatched record will be ignored and no error is reported. ``` // Incorrect // Inside Recover LogReporter reporter; reporter.status = &s; log::Reader reader(..., reporter); s = ReadAndRecover(reader, ...); // Inside ReadAndRecover Status s; // Shadows the s in Recover. while (reader.ReadRecord() && s.ok()) { ... } ``` `LogReporter` can use a separate `log_read_status` to track the errors while reading the MANIFEST. RocksDB can process more MANIFEST entries only if `log_read_status.ok()`. Test plan (devserver): make check Pull Request resolved: https://github.com/facebook/rocksdb/pull/6996 Reviewed By: ajkr Differential Revision: D22105746 Pulled By: riversand963 fbshipit-source-id: b22f717a423457a41ca152a242abbb64cf91fc38
This commit is contained in:
parent
a7b0f02c47
commit
201c5a5e61
@ -4,6 +4,7 @@
|
|||||||
* Fix potential file descriptor leakage in PosixEnv's IsDirectory() and NewRandomAccessFile().
|
* Fix potential file descriptor leakage in PosixEnv's IsDirectory() and NewRandomAccessFile().
|
||||||
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).
|
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).
|
||||||
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
|
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
|
||||||
|
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
|
||||||
|
|
||||||
## 6.10.2 (6/5/2020)
|
## 6.10.2 (6/5/2020)
|
||||||
### Bug fix
|
### Bug fix
|
||||||
|
@ -1999,6 +1999,33 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
|
|||||||
}
|
}
|
||||||
#endif // !ROCKSDB_LITE
|
#endif // !ROCKSDB_LITE
|
||||||
|
|
||||||
|
TEST_F(DBBasicTest, ManifestChecksumMismatch) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
ASSERT_OK(Put("bar", "value"));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
|
SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
|
||||||
|
auto* crc = reinterpret_cast<uint32_t*>(arg);
|
||||||
|
*crc = *crc + 1;
|
||||||
|
});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
WriteOptions write_opts;
|
||||||
|
write_opts.disableWAL = true;
|
||||||
|
Status s = db_->Put(write_opts, "foo", "value");
|
||||||
|
ASSERT_OK(s);
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
|
ASSERT_OK(Put("foo", "value1"));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
s = TryReopen(options);
|
||||||
|
ASSERT_TRUE(s.IsCorruption());
|
||||||
|
}
|
||||||
|
|
||||||
class DBBasicTestMultiGet : public DBTestBase {
|
class DBBasicTestMultiGet : public DBTestBase {
|
||||||
public:
|
public:
|
||||||
DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
|
DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
|
||||||
|
@ -147,6 +147,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
|
|||||||
// Compute the crc of the record type and the payload.
|
// Compute the crc of the record type and the payload.
|
||||||
crc = crc32c::Extend(crc, ptr, n);
|
crc = crc32c::Extend(crc, ptr, n);
|
||||||
crc = crc32c::Mask(crc); // Adjust for storage
|
crc = crc32c::Mask(crc); // Adjust for storage
|
||||||
|
TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
|
||||||
|
&crc);
|
||||||
EncodeFixed32(buf, crc);
|
EncodeFixed32(buf, crc);
|
||||||
|
|
||||||
// Write the header and the payload
|
// Write the header and the payload
|
||||||
|
@ -27,12 +27,17 @@ VersionEditHandler::VersionEditHandler(
|
|||||||
assert(version_set_ != nullptr);
|
assert(version_set_ != nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
|
void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status,
|
||||||
|
std::string* db_id) {
|
||||||
Slice record;
|
Slice record;
|
||||||
std::string scratch;
|
std::string scratch;
|
||||||
|
assert(log_read_status);
|
||||||
|
assert(log_read_status->ok());
|
||||||
|
|
||||||
size_t recovered_edits = 0;
|
size_t recovered_edits = 0;
|
||||||
Status s = Initialize();
|
Status s = Initialize();
|
||||||
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
|
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
|
||||||
|
log_read_status->ok()) {
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
s = edit.DecodeFrom(record);
|
s = edit.DecodeFrom(record);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
@ -70,13 +75,15 @@ Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!log_read_status->ok()) {
|
||||||
|
s = *log_read_status;
|
||||||
|
}
|
||||||
|
|
||||||
CheckIterationResult(reader, &s);
|
CheckIterationResult(reader, &s);
|
||||||
|
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
status_ = s;
|
status_ = s;
|
||||||
}
|
}
|
||||||
return s;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Status VersionEditHandler::Initialize() {
|
Status VersionEditHandler::Initialize() {
|
||||||
|
@ -40,7 +40,8 @@ class VersionEditHandler {
|
|||||||
|
|
||||||
virtual ~VersionEditHandler() {}
|
virtual ~VersionEditHandler() {}
|
||||||
|
|
||||||
Status Iterate(log::Reader& reader, std::string* db_id);
|
void Iterate(log::Reader& reader, Status* log_read_status,
|
||||||
|
std::string* db_id);
|
||||||
|
|
||||||
const Status& status() const { return status_; }
|
const Status& status() const { return status_; }
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
#include "db/version_set.h"
|
#include "db/version_set.h"
|
||||||
|
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <array>
|
#include <array>
|
||||||
#include <cinttypes>
|
#include <cinttypes>
|
||||||
@ -19,6 +20,7 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "compaction/compaction.h"
|
#include "compaction/compaction.h"
|
||||||
#include "db/internal_stats.h"
|
#include "db/internal_stats.h"
|
||||||
#include "db/log_reader.h"
|
#include "db/log_reader.h"
|
||||||
@ -50,6 +52,7 @@
|
|||||||
#include "table/table_reader.h"
|
#include "table/table_reader.h"
|
||||||
#include "table/two_level_iterator.h"
|
#include "table/two_level_iterator.h"
|
||||||
#include "test_util/sync_point.h"
|
#include "test_util/sync_point.h"
|
||||||
|
#include "util/cast_util.h"
|
||||||
#include "util/coding.h"
|
#include "util/coding.h"
|
||||||
#include "util/stop_watch.h"
|
#include "util/stop_watch.h"
|
||||||
#include "util/string_util.h"
|
#include "util/string_util.h"
|
||||||
@ -4383,24 +4386,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
|
|||||||
if (dbname.back() != '/') {
|
if (dbname.back() != '/') {
|
||||||
manifest_path->push_back('/');
|
manifest_path->push_back('/');
|
||||||
}
|
}
|
||||||
*manifest_path += fname;
|
manifest_path->append(fname);
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status VersionSet::ReadAndRecover(
|
Status VersionSet::ReadAndRecover(
|
||||||
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
|
log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
|
||||||
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
|
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
|
||||||
std::unordered_map<int, std::string>& column_families_not_found,
|
std::unordered_map<int, std::string>& column_families_not_found,
|
||||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
|
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
|
||||||
builders,
|
builders,
|
||||||
VersionEditParams* version_edit_params, std::string* db_id) {
|
Status* log_read_status, VersionEditParams* version_edit_params,
|
||||||
assert(reader != nullptr);
|
std::string* db_id) {
|
||||||
assert(read_buffer != nullptr);
|
assert(read_buffer != nullptr);
|
||||||
|
assert(log_read_status != nullptr);
|
||||||
Status s;
|
Status s;
|
||||||
Slice record;
|
Slice record;
|
||||||
std::string scratch;
|
std::string scratch;
|
||||||
size_t recovered_edits = 0;
|
size_t recovered_edits = 0;
|
||||||
while (reader->ReadRecord(&record, &scratch) && s.ok()) {
|
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
|
||||||
|
log_read_status->ok()) {
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
s = edit.DecodeFrom(record);
|
s = edit.DecodeFrom(record);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
@ -4444,6 +4449,9 @@ Status VersionSet::ReadAndRecover(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!log_read_status->ok()) {
|
||||||
|
s = *log_read_status;
|
||||||
|
}
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
// Clear the buffer if we fail to decode/apply an edit.
|
// Clear the buffer if we fail to decode/apply an edit.
|
||||||
read_buffer->Clear();
|
read_buffer->Clear();
|
||||||
@ -4490,8 +4498,7 @@ Status VersionSet::Recover(
|
|||||||
db_options_->log_readahead_size));
|
db_options_->log_readahead_size));
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
VersionBuilderMap builders;
|
||||||
builders;
|
|
||||||
|
|
||||||
// add default column family
|
// add default column family
|
||||||
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
|
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
|
||||||
@ -4513,12 +4520,13 @@ Status VersionSet::Recover(
|
|||||||
VersionEditParams version_edit_params;
|
VersionEditParams version_edit_params;
|
||||||
{
|
{
|
||||||
VersionSet::LogReporter reporter;
|
VersionSet::LogReporter reporter;
|
||||||
reporter.status = &s;
|
Status log_read_status;
|
||||||
|
reporter.status = &log_read_status;
|
||||||
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
|
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
|
||||||
true /* checksum */, 0 /* log_number */);
|
true /* checksum */, 0 /* log_number */);
|
||||||
AtomicGroupReadBuffer read_buffer;
|
AtomicGroupReadBuffer read_buffer;
|
||||||
s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
|
s = ReadAndRecover(reader, &read_buffer, cf_name_to_options,
|
||||||
column_families_not_found, builders,
|
column_families_not_found, builders, &log_read_status,
|
||||||
&version_edit_params, db_id);
|
&version_edit_params, db_id);
|
||||||
current_manifest_file_size = reader.GetReadOffset();
|
current_manifest_file_size = reader.GetReadOffset();
|
||||||
assert(current_manifest_file_size != 0);
|
assert(current_manifest_file_size != 0);
|
||||||
@ -4783,21 +4791,20 @@ Status VersionSet::TryRecoverFromOneManifest(
|
|||||||
db_options_->log_readahead_size));
|
db_options_->log_readahead_size));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(s.ok());
|
||||||
VersionSet::LogReporter reporter;
|
VersionSet::LogReporter reporter;
|
||||||
reporter.status = &s;
|
reporter.status = &s;
|
||||||
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
|
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
|
||||||
/*checksum=*/true, /*log_num=*/0);
|
/*checksum=*/true, /*log_num=*/0);
|
||||||
{
|
|
||||||
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
|
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
|
||||||
const_cast<VersionSet*>(this));
|
const_cast<VersionSet*>(this));
|
||||||
|
|
||||||
s = handler_pit.Iterate(reader, db_id);
|
handler_pit.Iterate(reader, &s, db_id);
|
||||||
|
|
||||||
assert(nullptr != has_missing_table_file);
|
assert(nullptr != has_missing_table_file);
|
||||||
*has_missing_table_file = handler_pit.HasMissingFiles();
|
*has_missing_table_file = handler_pit.HasMissingFiles();
|
||||||
}
|
|
||||||
|
|
||||||
return s;
|
return handler_pit.status();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
|
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
|
||||||
@ -5878,8 +5885,7 @@ Status ReactiveVersionSet::Recover(
|
|||||||
// In recovery, nobody else can access it, so it's fine to set it to be
|
// In recovery, nobody else can access it, so it's fine to set it to be
|
||||||
// initialized earlier.
|
// initialized earlier.
|
||||||
default_cfd->set_initialized();
|
default_cfd->set_initialized();
|
||||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
VersionBuilderMap builders;
|
||||||
builders;
|
|
||||||
std::unordered_map<int, std::string> column_families_not_found;
|
std::unordered_map<int, std::string> column_families_not_found;
|
||||||
builders.insert(
|
builders.insert(
|
||||||
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
|
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
|
||||||
@ -5887,7 +5893,7 @@ Status ReactiveVersionSet::Recover(
|
|||||||
|
|
||||||
manifest_reader_status->reset(new Status());
|
manifest_reader_status->reset(new Status());
|
||||||
manifest_reporter->reset(new LogReporter());
|
manifest_reporter->reset(new LogReporter());
|
||||||
static_cast<LogReporter*>(manifest_reporter->get())->status =
|
static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
|
||||||
manifest_reader_status->get();
|
manifest_reader_status->get();
|
||||||
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
|
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
|
||||||
log::Reader* reader = manifest_reader->get();
|
log::Reader* reader = manifest_reader->get();
|
||||||
@ -5896,10 +5902,9 @@ Status ReactiveVersionSet::Recover(
|
|||||||
VersionEdit version_edit;
|
VersionEdit version_edit;
|
||||||
while (s.ok() && retry < 1) {
|
while (s.ok() && retry < 1) {
|
||||||
assert(reader != nullptr);
|
assert(reader != nullptr);
|
||||||
Slice record;
|
s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options,
|
||||||
std::string scratch;
|
column_families_not_found, builders,
|
||||||
s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
|
manifest_reader_status->get(), &version_edit);
|
||||||
column_families_not_found, builders, &version_edit);
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
bool enough = version_edit.has_next_file_number_ &&
|
bool enough = version_edit.has_next_file_number_ &&
|
||||||
version_edit.has_log_number_ &&
|
version_edit.has_log_number_ &&
|
||||||
|
@ -1102,6 +1102,10 @@ class VersionSet {
|
|||||||
void SetIOStatusOK() { io_status_ = IOStatus::OK(); }
|
void SetIOStatusOK() { io_status_ = IOStatus::OK(); }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
using VersionBuilderMap =
|
||||||
|
std::unordered_map<uint32_t,
|
||||||
|
std::unique_ptr<BaseReferencedVersionBuilder>>;
|
||||||
|
|
||||||
struct ManifestWriter;
|
struct ManifestWriter;
|
||||||
|
|
||||||
friend class Version;
|
friend class Version;
|
||||||
@ -1113,7 +1117,9 @@ class VersionSet {
|
|||||||
struct LogReporter : public log::Reader::Reporter {
|
struct LogReporter : public log::Reader::Reporter {
|
||||||
Status* status;
|
Status* status;
|
||||||
virtual void Corruption(size_t /*bytes*/, const Status& s) override {
|
virtual void Corruption(size_t /*bytes*/, const Status& s) override {
|
||||||
if (this->status->ok()) *this->status = s;
|
if (status->ok()) {
|
||||||
|
*status = s;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1144,13 +1150,14 @@ class VersionSet {
|
|||||||
const VersionEdit* edit);
|
const VersionEdit* edit);
|
||||||
|
|
||||||
Status ReadAndRecover(
|
Status ReadAndRecover(
|
||||||
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
|
log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
|
||||||
const std::unordered_map<std::string, ColumnFamilyOptions>&
|
const std::unordered_map<std::string, ColumnFamilyOptions>&
|
||||||
name_to_options,
|
name_to_options,
|
||||||
std::unordered_map<int, std::string>& column_families_not_found,
|
std::unordered_map<int, std::string>& column_families_not_found,
|
||||||
std::unordered_map<
|
std::unordered_map<
|
||||||
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
|
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
|
||||||
VersionEditParams* version_edit, std::string* db_id = nullptr);
|
Status* log_read_status, VersionEditParams* version_edit,
|
||||||
|
std::string* db_id = nullptr);
|
||||||
|
|
||||||
// REQUIRES db mutex
|
// REQUIRES db mutex
|
||||||
Status ApplyOneVersionEditToBuilder(
|
Status ApplyOneVersionEditToBuilder(
|
||||||
@ -1279,8 +1286,7 @@ class ReactiveVersionSet : public VersionSet {
|
|||||||
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader);
|
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
|
VersionBuilderMap active_version_builders_;
|
||||||
active_version_builders_;
|
|
||||||
AtomicGroupReadBuffer read_buffer_;
|
AtomicGroupReadBuffer read_buffer_;
|
||||||
// Number of version edits to skip by ReadAndApply at the beginning of a new
|
// Number of version edits to skip by ReadAndApply at the beginning of a new
|
||||||
// MANIFEST created by primary.
|
// MANIFEST created by primary.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user