Fix UnmarkEOF for partial blocks

Summary:
Blocks in the transaction log are a fixed size, but the last block in the transaction log file is usually a partial block. When a new record is added after the reader hit the end of the file, a new physical record will be appended to the last block. ReadPhysicalRecord can only read full blocks and assumes that the file position indicator is aligned to the start of a block. If the reader is forced to read further by simply clearing the EOF flag, ReadPhysicalRecord will read a full block starting from somewhere in the middle of a real block, causing it to lose alignment and to have a partial physical record at the end of the read buffer. This will result in length mismatches and checksum failures. When the log file is tailed for replication this will cause the log iterator to become invalid, necessitating the creation of a new iterator which will have to read the log file from scratch.

This diff fixes this issue by reading the remaining portion of the last block we read from. This is done when the reader is forced to read further (UnmarkEOF is called).

Test Plan:
- Added unit tests
- Stress test (with replication). Check dbdir/LOG file for corruptions.
- Test on test tier

Reviewers: emayanke, haobo, dhruba

Reviewed By: haobo

CC: vamsi, sheki, dhruba, kailiu, igor

Differential Revision: https://reviews.facebook.net/D15249
This commit is contained in:
Schalk-Willem Kruger 2014-01-27 14:49:10 -08:00
parent 832158e7f7
commit 3d33da75ef
3 changed files with 232 additions and 29 deletions

View File

@ -28,6 +28,8 @@ Reader::Reader(unique_ptr<SequentialFile>&& file, Reporter* reporter,
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
read_error_(false),
eof_offset_(0),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset) {
@ -170,6 +172,69 @@ uint64_t Reader::LastRecordOffset() {
return last_record_offset_;
}
void Reader::UnmarkEOF() {
if (read_error_) {
return;
}
eof_ = false;
if (eof_offset_ == 0) {
return;
}
// If the EOF was in the middle of a block (a partial block was read) we have
// to read the rest of the block as ReadPhysicalRecord can only read full
// blocks and expects the file position indicator to be aligned to the start
// of a block.
//
// consumed_bytes + buffer_size() + remaining == kBlockSize
size_t consumed_bytes = eof_offset_ - buffer_.size();
size_t remaining = kBlockSize - eof_offset_;
// backing_store_ is used to concatenate what is left in buffer_ and
// the remainder of the block. If buffer_ already uses backing_store_,
// we just append the new data.
if (buffer_.data() != backing_store_ + consumed_bytes) {
// Buffer_ does not use backing_store_ for storage.
// Copy what is left in buffer_ to backing_store.
memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
}
Slice read_buffer;
Status status = file_->Read(remaining, &read_buffer,
backing_store_ + eof_offset_);
size_t added = read_buffer.size();
end_of_buffer_offset_ += added;
if (!status.ok()) {
if (added > 0) {
ReportDrop(added, status);
}
read_error_ = true;
return;
}
if (read_buffer.data() != backing_store_ + eof_offset_) {
// Read did not write to backing_store_
memmove(backing_store_ + eof_offset_, read_buffer.data(),
read_buffer.size());
}
buffer_ = Slice(backing_store_ + consumed_bytes,
eof_offset_ + added - consumed_bytes);
if (added < remaining) {
eof_ = true;
eof_offset_ += added;
} else {
eof_offset_ = 0;
}
}
void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
}
@ -184,7 +249,7 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < (size_t)kHeaderSize) {
if (!eof_) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
@ -192,10 +257,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
read_error_ = true;
return kEof;
} else if (buffer_.size() < (size_t)kBlockSize) {
eof_ = true;
eof_offset_ = buffer_.size();
}
continue;
} else if (buffer_.size() == 0) {

View File

@ -69,9 +69,10 @@ class Reader {
// when we know more data has been written to the file. we can use this
// function to force the reader to look again in the file.
void UnmarkEOF() {
eof_ = false;
}
// Also aligns the file position indicator to the start of the next block
// by reading the rest of the data from the EOF position to the end of the
// block that was partially read.
void UnmarkEOF();
SequentialFile* file() { return file_.get(); }
@ -82,6 +83,11 @@ class Reader {
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool read_error_; // Error occurred while reading from file
// Offset of the file position indicator within the last block when an
// EOF was detected.
size_t eof_offset_;
// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;

View File

@ -47,36 +47,93 @@ class LogTest {
public:
std::string contents_;
explicit StringDest(Slice& reader_contents) :
WritableFile(),
contents_(""),
reader_contents_(reader_contents),
last_flush_(0) {
reader_contents_ = Slice(contents_.data(), 0);
};
virtual Status Close() { return Status::OK(); }
virtual Status Flush() { return Status::OK(); }
virtual Status Flush() {
ASSERT_TRUE(reader_contents_.size() <= last_flush_);
size_t offset = last_flush_ - reader_contents_.size();
reader_contents_ = Slice(
contents_.data() + offset,
contents_.size() - offset);
last_flush_ = contents_.size();
return Status::OK();
}
virtual Status Sync() { return Status::OK(); }
virtual Status Append(const Slice& slice) {
contents_.append(slice.data(), slice.size());
return Status::OK();
}
void Drop(size_t bytes) {
contents_.resize(contents_.size() - bytes);
reader_contents_ = Slice(
reader_contents_.data(), reader_contents_.size() - bytes);
last_flush_ = contents_.size();
}
private:
Slice& reader_contents_;
size_t last_flush_;
};
class StringSource : public SequentialFile {
public:
Slice contents_;
Slice& contents_;
bool force_error_;
size_t force_error_position_;
bool force_eof_;
size_t force_eof_position_;
bool returned_partial_;
StringSource() : force_error_(false), returned_partial_(false) { }
explicit StringSource(Slice& contents) :
contents_(contents),
force_error_(false),
force_error_position_(0),
force_eof_(false),
force_eof_position_(0),
returned_partial_(false) { }
virtual Status Read(size_t n, Slice* result, char* scratch) {
ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
if (force_error_) {
force_error_ = false;
returned_partial_ = true;
return Status::Corruption("read error");
if (force_error_position_ >= n) {
force_error_position_ -= n;
} else {
*result = Slice(contents_.data(), force_error_position_);
contents_.remove_prefix(force_error_position_);
force_error_ = false;
returned_partial_ = true;
return Status::Corruption("read error");
}
}
if (contents_.size() < n) {
n = contents_.size();
returned_partial_ = true;
}
*result = Slice(contents_.data(), n);
if (force_eof_) {
if (force_eof_position_ >= n) {
force_eof_position_ -= n;
} else {
force_eof_ = false;
n = force_eof_position_;
returned_partial_ = true;
}
}
// By using scratch we ensure that caller has control over the
// lifetime of result.data()
memcpy(scratch, contents_.data(), n);
*result = Slice(scratch, n);
contents_.remove_prefix(n);
return Status::OK();
}
@ -123,10 +180,10 @@ class LogTest {
src->contents_ = dest_contents();
}
Slice reader_contents_;
unique_ptr<StringDest> dest_holder_;
unique_ptr<StringSource> source_holder_;
ReportCollector report_;
bool reading_;
Writer writer_;
Reader reader_;
@ -135,16 +192,15 @@ class LogTest {
static uint64_t initial_offset_last_record_offsets_[];
public:
LogTest() : dest_holder_(new StringDest),
source_holder_(new StringSource),
reading_(false),
LogTest() : reader_contents_(),
dest_holder_(new StringDest(reader_contents_)),
source_holder_(new StringSource(reader_contents_)),
writer_(std::move(dest_holder_)),
reader_(std::move(source_holder_), &report_, true/*checksum*/,
0/*initial_offset*/) {
}
void Write(const std::string& msg) {
ASSERT_TRUE(!reading_) << "Write() after starting to read";
writer_.AddRecord(Slice(msg));
}
@ -153,10 +209,6 @@ class LogTest {
}
std::string Read() {
if (!reading_) {
reading_ = true;
reset_source_contents();
}
std::string scratch;
Slice record;
if (reader_.ReadRecord(&record, &scratch)) {
@ -175,7 +227,9 @@ class LogTest {
}
void ShrinkSize(int bytes) {
dest_contents().resize(dest_contents().size() - bytes);
auto dest = dynamic_cast<StringDest*>(writer_.file());
assert(dest);
dest->Drop(bytes);
}
void FixChecksum(int header_offset, int len) {
@ -185,9 +239,10 @@ class LogTest {
EncodeFixed32(&dest_contents()[header_offset], crc);
}
void ForceError() {
void ForceError(size_t position = 0) {
auto src = dynamic_cast<StringSource*>(reader_.file());
src->force_error_ = true;
src->force_error_position_ = position;
}
size_t DroppedBytes() const {
@ -198,6 +253,22 @@ class LogTest {
return report_.message_;
}
void ForceEOF(size_t position = 0) {
auto src = dynamic_cast<StringSource*>(reader_.file());
src->force_eof_ = true;
src->force_eof_position_ = position;
}
void UnmarkEOF() {
auto src = dynamic_cast<StringSource*>(reader_.file());
src->returned_partial_ = false;
reader_.UnmarkEOF();
}
bool IsEOF() {
return reader_.IsEOF();
}
// Returns OK iff recorded error message contains "msg"
std::string MatchError(const std::string& msg) const {
if (report_.message_.find(msg) == std::string::npos) {
@ -217,9 +288,7 @@ class LogTest {
void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
WriteInitialOffsetLog();
reading_ = true;
unique_ptr<StringSource> source(new StringSource);
source->contents_ = dest_contents();
unique_ptr<StringSource> source(new StringSource(reader_contents_));
unique_ptr<Reader> offset_reader(
new Reader(std::move(source), &report_, true/*checksum*/,
WrittenBytes() + offset_past_end));
@ -231,9 +300,7 @@ class LogTest {
void CheckInitialOffsetRecord(uint64_t initial_offset,
int expected_record_offset) {
WriteInitialOffsetLog();
reading_ = true;
unique_ptr<StringSource> source(new StringSource);
source->contents_ = dest_contents();
unique_ptr<StringSource> source(new StringSource(reader_contents_));
unique_ptr<Reader> offset_reader(
new Reader(std::move(source), &report_, true/*checksum*/,
initial_offset));
@ -520,6 +587,70 @@ TEST(LogTest, ReadPastEnd) {
CheckOffsetPastEndReturnsNoRecords(5);
}
TEST(LogTest, ClearEofSingleBlock) {
Write("foo");
Write("bar");
ForceEOF(3 + kHeaderSize + 2);
ASSERT_EQ("foo", Read());
UnmarkEOF();
ASSERT_EQ("bar", Read());
ASSERT_TRUE(IsEOF());
ASSERT_EQ("EOF", Read());
Write("xxx");
UnmarkEOF();
ASSERT_EQ("xxx", Read());
ASSERT_TRUE(IsEOF());
}
TEST(LogTest, ClearEofMultiBlock) {
size_t num_full_blocks = 5;
size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25;
Write(BigString("foo", n));
Write(BigString("bar", n));
ForceEOF(n + num_full_blocks * kHeaderSize + 10);
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_TRUE(IsEOF());
UnmarkEOF();
ASSERT_EQ(BigString("bar", n), Read());
ASSERT_TRUE(IsEOF());
Write(BigString("xxx", n));
UnmarkEOF();
ASSERT_EQ(BigString("xxx", n), Read());
ASSERT_TRUE(IsEOF());
}
TEST(LogTest, ClearEofError) {
// If an error occurs during Read() in UnmarkEOF(), the records contained
// in the buffer should be returned on subsequent calls of ReadRecord()
// until no more full records are left, whereafter ReadRecord() should return
// false to indicate that it cannot read any further.
Write("foo");
Write("bar");
UnmarkEOF();
ASSERT_EQ("foo", Read());
ASSERT_TRUE(IsEOF());
Write("xxx");
ForceError(0);
UnmarkEOF();
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, ClearEofError2) {
Write("foo");
Write("bar");
UnmarkEOF();
ASSERT_EQ("foo", Read());
Write("xxx");
ForceError(3);
UnmarkEOF();
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("read error"));
}
} // namespace log
} // namespace rocksdb