db/log_reader: treat bad record length or checksum as EOF

If we are in kTolerateCorruptedTailRecords, treat these
errors as the end of the log.  This is particularly
important for recycled logs, where we will regularly see
corrupted headers (bad length or checksum) when replaying
a log.  If we are aligned with a block boundary or get lucky,
we will land on an old header and see the log number
mismatch, but more commonly we will land midway through
some previous block and record and effectively see noise.
These must be treated as the end of the log in order for
recycling to work.

This makes the LogTest.Recycle/1 test pass.

We also modify a number of existing tests because the
recycled log files behave fundamentally differently in that
they always stop when they reach the first bad record.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2015-12-11 17:17:43 -05:00 committed by krad
parent 7947aba68c
commit 34df1c94d5
3 changed files with 45 additions and 14 deletions

View File

@ -22,9 +22,8 @@ Reader::Reporter::~Reporter() {
}
Reader::Reader(std::shared_ptr<Logger> info_log,
unique_ptr<SequentialFileReader>&& _file,
Reporter* reporter, bool checksum, uint64_t initial_offset,
uint64_t log_num)
unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
bool checksum, uint64_t initial_offset, uint64_t log_num)
: info_log_(info_log),
file_(std::move(_file)),
reporter_(reporter),
@ -37,7 +36,8 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset),
log_number_(log_num) {}
log_number_(log_num),
recycled_(false) {}
Reader::~Reader() {
delete[] backing_store_;
@ -192,6 +192,12 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
break;
case kBadRecordLen:
if (recycled_ &&
wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords) {
scratch->clear();
return false;
}
ReportCorruption(drop_size, "bad record length");
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
@ -201,6 +207,12 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
break;
case kBadRecordChecksum:
if (recycled_ &&
wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords) {
scratch->clear();
return false;
}
ReportCorruption(drop_size, "checksum mismatch");
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
@ -355,6 +367,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
const uint32_t length = a | (b << 8);
int header_size = kHeaderSize;
if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
if (end_of_buffer_offset_ - buffer_.size() == 0) {
recycled_ = true;
}
header_size = kRecyclableHeaderSize;
// We need enough for the larger header
if (buffer_.size() < (size_t)kRecyclableHeaderSize) {

View File

@ -113,6 +113,9 @@ class Reader {
// which log number this is
uint64_t const log_number_;
// Whether this is a recycled log file
bool recycled_;
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,

View File

@ -447,9 +447,13 @@ TEST_P(LogTest, BadLength) {
Write("foo");
// Least significant size byte is stored in header[4].
IncrementByte(4, 1);
ASSERT_EQ("foo", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length"));
if (!GetParam()) {
ASSERT_EQ("foo", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length"));
} else {
ASSERT_EQ("EOF", Read());
}
}
TEST_P(LogTest, BadLengthAtEndIsIgnored) {
@ -472,8 +476,13 @@ TEST_P(LogTest, ChecksumMismatch) {
Write("foooooo");
IncrementByte(0, 14);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(14U + 4 * !!GetParam(), DroppedBytes());
ASSERT_EQ("OK", MatchError("checksum mismatch"));
if (!GetParam()) {
ASSERT_EQ(14U, DroppedBytes());
ASSERT_EQ("OK", MatchError("checksum mismatch"));
} else {
ASSERT_EQ(0U, DroppedBytes());
ASSERT_EQ("", ReportMessage());
}
}
TEST_P(LogTest, UnexpectedMiddleType) {
@ -570,11 +579,15 @@ TEST_P(LogTest, ErrorJoinsRecords) {
SetByte(offset, 'x');
}
ASSERT_EQ("correct", Read());
ASSERT_EQ("EOF", Read());
size_t dropped = DroppedBytes();
ASSERT_LE(dropped, 2 * kBlockSize + 100);
ASSERT_GE(dropped, 2 * kBlockSize);
if (!GetParam()) {
ASSERT_EQ("correct", Read());
ASSERT_EQ("EOF", Read());
size_t dropped = DroppedBytes();
ASSERT_LE(dropped, 2 * kBlockSize + 100);
ASSERT_GE(dropped, 2 * kBlockSize);
} else {
ASSERT_EQ("EOF", Read());
}
}
TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }