log_{reader,write}: recyclable record format
Introduce new tags for records that have a log_number. This changes the header size from 7 to 11 for these records, making this a backward-incompatible change. If we read a record that belongs to a different log_number (i.e., a previous instantiation of this log file, before it was most recently recycled), we return kOldRecord from ReadPhysicalRecord. ReadRecord will translate this into a kEof or kBadRecord depending on what the WAL recovery mode is. We make several adjustments to the log_test.cc tests to compensate for the fact that the header size varies between the two modes. Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
parent
4104e9bb67
commit
a7b2bedfb0
@ -22,14 +22,24 @@ enum RecordType {
|
||||
// For fragments
|
||||
kFirstType = 2,
|
||||
kMiddleType = 3,
|
||||
kLastType = 4
|
||||
kLastType = 4,
|
||||
|
||||
// For recycled log files
|
||||
kRecyclableFullType = 5,
|
||||
kRecyclableFirstType = 6,
|
||||
kRecyclableMiddleType = 7,
|
||||
kRecyclableLastType = 8,
|
||||
};
|
||||
static const int kMaxRecordType = kLastType;
|
||||
static const int kMaxRecordType = kRecyclableLastType;
|
||||
|
||||
static const unsigned int kBlockSize = 32768;
|
||||
|
||||
// Header is checksum (4 bytes), type (1 byte), length (2 bytes).
|
||||
static const int kHeaderSize = 4 + 1 + 2;
|
||||
|
||||
// Recyclable header is checksum (4 bytes), type (1 byte), log number
|
||||
// (4 bytes), length (2 bytes).
|
||||
static const int kRecyclableHeaderSize = 4 + 1 + 4 + 2;
|
||||
|
||||
} // namespace log
|
||||
} // namespace rocksdb
|
||||
|
113
db/log_reader.cc
113
db/log_reader.cc
@ -95,6 +95,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
|
||||
const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
|
||||
switch (record_type) {
|
||||
case kFullType:
|
||||
case kRecyclableFullType:
|
||||
if (in_fragmented_record && !scratch->empty()) {
|
||||
// Handle bug in earlier versions of log::Writer where
|
||||
// it could emit an empty kFirstType record at the tail end
|
||||
@ -109,6 +110,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
|
||||
return true;
|
||||
|
||||
case kFirstType:
|
||||
case kRecyclableFirstType:
|
||||
if (in_fragmented_record && !scratch->empty()) {
|
||||
// Handle bug in earlier versions of log::Writer where
|
||||
// it could emit an empty kFirstType record at the tail end
|
||||
@ -122,6 +124,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
|
||||
break;
|
||||
|
||||
case kMiddleType:
|
||||
case kRecyclableMiddleType:
|
||||
if (!in_fragmented_record) {
|
||||
ReportCorruption(fragment.size(),
|
||||
"missing start of fragmented record(1)");
|
||||
@ -131,6 +134,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
|
||||
break;
|
||||
|
||||
case kLastType:
|
||||
case kRecyclableLastType:
|
||||
if (!in_fragmented_record) {
|
||||
ReportCorruption(fragment.size(),
|
||||
"missing start of fragmented record(2)");
|
||||
@ -162,6 +166,23 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
|
||||
}
|
||||
return false;
|
||||
|
||||
case kOldRecord:
|
||||
if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
|
||||
// Treat a record from a previous instance of the log as EOF.
|
||||
if (in_fragmented_record) {
|
||||
if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
|
||||
// in clean shutdown we don't expect any error in the log files
|
||||
ReportCorruption(scratch->size(), "error reading trailing data");
|
||||
}
|
||||
// This can be caused by the writer dying immediately after
|
||||
// writing a physical record but before completing the next; don't
|
||||
// treat it as a corruption, just ignore the entire logical record.
|
||||
scratch->clear();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
// fall-thru
|
||||
|
||||
case kBadRecord:
|
||||
if (in_fragmented_record) {
|
||||
ReportCorruption(scratch->size(), "error in middle of record");
|
||||
@ -263,37 +284,49 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
|
||||
}
|
||||
}
|
||||
|
||||
bool Reader::ReadMore(size_t* drop_size, int *error) {
|
||||
if (!eof_ && !read_error_) {
|
||||
// Last read was a full read, so this is a trailer to skip
|
||||
buffer_.clear();
|
||||
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
|
||||
end_of_buffer_offset_ += buffer_.size();
|
||||
if (!status.ok()) {
|
||||
buffer_.clear();
|
||||
ReportDrop(kBlockSize, status);
|
||||
read_error_ = true;
|
||||
*error = kEof;
|
||||
return false;
|
||||
} else if (buffer_.size() < (size_t)kBlockSize) {
|
||||
eof_ = true;
|
||||
eof_offset_ = buffer_.size();
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
// Note that if buffer_ is non-empty, we have a truncated header at the
|
||||
// end of the file, which can be caused by the writer crashing in the
|
||||
// middle of writing the header. Unless explicitly requested we don't
|
||||
// considering this an error, just report EOF.
|
||||
if (buffer_.size()) {
|
||||
*drop_size = buffer_.size();
|
||||
buffer_.clear();
|
||||
*error = kBadHeader;
|
||||
return false;
|
||||
}
|
||||
buffer_.clear();
|
||||
*error = kEof;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
|
||||
while (true) {
|
||||
// We need at least the minimum header size
|
||||
if (buffer_.size() < (size_t)kHeaderSize) {
|
||||
if (!eof_ && !read_error_) {
|
||||
// Last read was a full read, so this is a trailer to skip
|
||||
buffer_.clear();
|
||||
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
|
||||
end_of_buffer_offset_ += buffer_.size();
|
||||
if (!status.ok()) {
|
||||
buffer_.clear();
|
||||
ReportDrop(kBlockSize, status);
|
||||
read_error_ = true;
|
||||
return kEof;
|
||||
} else if (buffer_.size() < (size_t)kBlockSize) {
|
||||
eof_ = true;
|
||||
eof_offset_ = buffer_.size();
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
// Note that if buffer_ is non-empty, we have a truncated header at the
|
||||
// end of the file, which can be caused by the writer crashing in the
|
||||
// middle of writing the header. Unless explicitly requested we don't
|
||||
// considering this an error, just report EOF.
|
||||
if (buffer_.size()) {
|
||||
*drop_size = buffer_.size();
|
||||
buffer_.clear();
|
||||
return kBadHeader;
|
||||
}
|
||||
buffer_.clear();
|
||||
return kEof;
|
||||
int r;
|
||||
if (!ReadMore(drop_size, &r)) {
|
||||
return r;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Parse the header
|
||||
@ -302,7 +335,23 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
|
||||
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
|
||||
const unsigned int type = header[6];
|
||||
const uint32_t length = a | (b << 8);
|
||||
if (kHeaderSize + length > buffer_.size()) {
|
||||
int header_size = kHeaderSize;
|
||||
if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
|
||||
header_size = kRecyclableHeaderSize;
|
||||
// We need enough for the larger header
|
||||
if (buffer_.size() < (size_t)kRecyclableHeaderSize) {
|
||||
int r;
|
||||
if (!ReadMore(drop_size, &r)) {
|
||||
return r;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const uint32_t log_num = DecodeFixed32(header + 7);
|
||||
if (log_num != log_number_) {
|
||||
return kOldRecord;
|
||||
}
|
||||
}
|
||||
if (header_size + length > buffer_.size()) {
|
||||
*drop_size = buffer_.size();
|
||||
buffer_.clear();
|
||||
if (!eof_) {
|
||||
@ -331,7 +380,7 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
|
||||
// Check crc
|
||||
if (checksum_) {
|
||||
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
|
||||
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
|
||||
uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
|
||||
if (actual_crc != expected_crc) {
|
||||
// Drop the rest of the buffer since "length" itself may have
|
||||
// been corrupted and if we trust it, we could find some
|
||||
@ -344,16 +393,16 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
|
||||
}
|
||||
}
|
||||
|
||||
buffer_.remove_prefix(kHeaderSize + length);
|
||||
buffer_.remove_prefix(header_size + length);
|
||||
|
||||
// Skip physical record that started before initial_offset_
|
||||
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
|
||||
if (end_of_buffer_offset_ - buffer_.size() - header_size - length <
|
||||
initial_offset_) {
|
||||
result->clear();
|
||||
return kBadRecord;
|
||||
}
|
||||
|
||||
*result = Slice(header + kHeaderSize, length);
|
||||
*result = Slice(header + header_size, length);
|
||||
return type;
|
||||
}
|
||||
}
|
||||
|
@ -124,6 +124,8 @@ class Reader {
|
||||
kBadRecord = kMaxRecordType + 2,
|
||||
// Returned when we fail to read a valid header.
|
||||
kBadHeader = kMaxRecordType + 3,
|
||||
// Returned when we read an old record from a previous user of the log.
|
||||
kOldRecord = kMaxRecordType + 4,
|
||||
};
|
||||
|
||||
// Skips all blocks that are completely before "initial_offset_".
|
||||
@ -134,6 +136,9 @@ class Reader {
|
||||
// Return type, or one of the preceding special values
|
||||
unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size);
|
||||
|
||||
// Read some more
|
||||
bool ReadMore(size_t* drop_size, int *error);
|
||||
|
||||
// Reports dropped bytes to the reporter.
|
||||
// buffer_ must be updated to remove the dropped bytes prior to invocation.
|
||||
void ReportCorruption(size_t bytes, const char* reason);
|
||||
|
105
db/log_test.cc
105
db/log_test.cc
@ -153,7 +153,7 @@ class LogTest : public ::testing::TestWithParam<int> {
|
||||
|
||||
// Record metadata for testing initial offset functionality
|
||||
static size_t initial_offset_record_sizes_[];
|
||||
static uint64_t initial_offset_last_record_offsets_[];
|
||||
uint64_t initial_offset_last_record_offsets_[4];
|
||||
|
||||
public:
|
||||
LogTest()
|
||||
@ -163,8 +163,16 @@ class LogTest : public ::testing::TestWithParam<int> {
|
||||
source_holder_(
|
||||
test::GetSequentialFileReader(new StringSource(reader_contents_))),
|
||||
writer_(std::move(dest_holder_), 123, GetParam()),
|
||||
reader_(NULL, std::move(source_holder_), &report_,
|
||||
true /*checksum*/, 0 /*initial_offset*/, 123) {}
|
||||
reader_(NULL, std::move(source_holder_), &report_, true /*checksum*/,
|
||||
0 /*initial_offset*/, 123) {
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
initial_offset_last_record_offsets_[0] = 0;
|
||||
initial_offset_last_record_offsets_[1] = header_size + 10000;
|
||||
initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000);
|
||||
initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) +
|
||||
(2 * log::kBlockSize - 1000) +
|
||||
3 * header_size;
|
||||
}
|
||||
|
||||
void Write(const std::string& msg) {
|
||||
writer_.AddRecord(Slice(msg));
|
||||
@ -200,9 +208,11 @@ class LogTest : public ::testing::TestWithParam<int> {
|
||||
dest->Drop(bytes);
|
||||
}
|
||||
|
||||
void FixChecksum(int header_offset, int len) {
|
||||
void FixChecksum(int header_offset, int len, bool recyclable) {
|
||||
// Compute crc of type/len/data
|
||||
uint32_t crc = crc32c::Value(&dest_contents()[header_offset+6], 1 + len);
|
||||
int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize;
|
||||
uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6],
|
||||
header_size - 6 + len);
|
||||
crc = crc32c::Mask(crc);
|
||||
EncodeFixed32(&dest_contents()[header_offset], crc);
|
||||
}
|
||||
@ -292,13 +302,6 @@ size_t LogTest::initial_offset_record_sizes_[] =
|
||||
2 * log::kBlockSize - 1000, // Span three blocks
|
||||
1};
|
||||
|
||||
uint64_t LogTest::initial_offset_last_record_offsets_[] =
|
||||
{0,
|
||||
kHeaderSize + 10000,
|
||||
2 * (kHeaderSize + 10000),
|
||||
2 * (kHeaderSize + 10000) +
|
||||
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
|
||||
|
||||
TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
|
||||
|
||||
TEST_P(LogTest, ReadWrite) {
|
||||
@ -336,9 +339,10 @@ TEST_P(LogTest, Fragmentation) {
|
||||
|
||||
TEST_P(LogTest, MarginalTrailer) {
|
||||
// Make a trailer that is exactly the same length as an empty record.
|
||||
const int n = kBlockSize - 2*kHeaderSize;
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
const int n = kBlockSize - 2 * header_size;
|
||||
Write(BigString("foo", n));
|
||||
ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize), WrittenBytes());
|
||||
ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
|
||||
Write("");
|
||||
Write("bar");
|
||||
ASSERT_EQ(BigString("foo", n), Read());
|
||||
@ -349,9 +353,10 @@ TEST_P(LogTest, MarginalTrailer) {
|
||||
|
||||
TEST_P(LogTest, MarginalTrailer2) {
|
||||
// Make a trailer that is exactly the same length as an empty record.
|
||||
const int n = kBlockSize - 2*kHeaderSize;
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
const int n = kBlockSize - 2 * header_size;
|
||||
Write(BigString("foo", n));
|
||||
ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize), WrittenBytes());
|
||||
ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
|
||||
Write("bar");
|
||||
ASSERT_EQ(BigString("foo", n), Read());
|
||||
ASSERT_EQ("bar", Read());
|
||||
@ -361,9 +366,10 @@ TEST_P(LogTest, MarginalTrailer2) {
|
||||
}
|
||||
|
||||
TEST_P(LogTest, ShortTrailer) {
|
||||
const int n = kBlockSize - 2*kHeaderSize + 4;
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
const int n = kBlockSize - 2 * header_size + 4;
|
||||
Write(BigString("foo", n));
|
||||
ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize + 4), WrittenBytes());
|
||||
ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
|
||||
Write("");
|
||||
Write("bar");
|
||||
ASSERT_EQ(BigString("foo", n), Read());
|
||||
@ -373,9 +379,10 @@ TEST_P(LogTest, ShortTrailer) {
|
||||
}
|
||||
|
||||
TEST_P(LogTest, AlignedEof) {
|
||||
const int n = kBlockSize - 2*kHeaderSize + 4;
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
const int n = kBlockSize - 2 * header_size + 4;
|
||||
Write(BigString("foo", n));
|
||||
ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize + 4), WrittenBytes());
|
||||
ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
|
||||
ASSERT_EQ(BigString("foo", n), Read());
|
||||
ASSERT_EQ("EOF", Read());
|
||||
}
|
||||
@ -407,7 +414,7 @@ TEST_P(LogTest, BadRecordType) {
|
||||
Write("foo");
|
||||
// Type is stored in header[6]
|
||||
IncrementByte(6, 100);
|
||||
FixChecksum(0, 3);
|
||||
FixChecksum(0, 3, false);
|
||||
ASSERT_EQ("EOF", Read());
|
||||
ASSERT_EQ(3U, DroppedBytes());
|
||||
ASSERT_EQ("OK", MatchError("unknown record type"));
|
||||
@ -432,7 +439,8 @@ TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
|
||||
}
|
||||
|
||||
TEST_P(LogTest, BadLength) {
|
||||
const int kPayloadSize = kBlockSize - kHeaderSize;
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
const int kPayloadSize = kBlockSize - header_size;
|
||||
Write(BigString("bar", kPayloadSize));
|
||||
Write("foo");
|
||||
// Least significant size byte is stored in header[4].
|
||||
@ -459,17 +467,17 @@ TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
|
||||
}
|
||||
|
||||
TEST_P(LogTest, ChecksumMismatch) {
|
||||
Write("foo");
|
||||
IncrementByte(0, 10);
|
||||
Write("foooooo");
|
||||
IncrementByte(0, 14);
|
||||
ASSERT_EQ("EOF", Read());
|
||||
ASSERT_EQ(10U, DroppedBytes());
|
||||
ASSERT_EQ(14U + 4 * !!GetParam(), DroppedBytes());
|
||||
ASSERT_EQ("OK", MatchError("checksum mismatch"));
|
||||
}
|
||||
|
||||
TEST_P(LogTest, UnexpectedMiddleType) {
|
||||
Write("foo");
|
||||
SetByte(6, kMiddleType);
|
||||
FixChecksum(0, 3);
|
||||
SetByte(6, GetParam() ? kRecyclableMiddleType : kMiddleType);
|
||||
FixChecksum(0, 3, !!GetParam());
|
||||
ASSERT_EQ("EOF", Read());
|
||||
ASSERT_EQ(3U, DroppedBytes());
|
||||
ASSERT_EQ("OK", MatchError("missing start"));
|
||||
@ -477,8 +485,8 @@ TEST_P(LogTest, UnexpectedMiddleType) {
|
||||
|
||||
TEST_P(LogTest, UnexpectedLastType) {
|
||||
Write("foo");
|
||||
SetByte(6, kLastType);
|
||||
FixChecksum(0, 3);
|
||||
SetByte(6, GetParam() ? kRecyclableLastType : kLastType);
|
||||
FixChecksum(0, 3, !!GetParam());
|
||||
ASSERT_EQ("EOF", Read());
|
||||
ASSERT_EQ(3U, DroppedBytes());
|
||||
ASSERT_EQ("OK", MatchError("missing start"));
|
||||
@ -487,8 +495,8 @@ TEST_P(LogTest, UnexpectedLastType) {
|
||||
TEST_P(LogTest, UnexpectedFullType) {
|
||||
Write("foo");
|
||||
Write("bar");
|
||||
SetByte(6, kFirstType);
|
||||
FixChecksum(0, 3);
|
||||
SetByte(6, GetParam() ? kRecyclableFirstType : kFirstType);
|
||||
FixChecksum(0, 3, !!GetParam());
|
||||
ASSERT_EQ("bar", Read());
|
||||
ASSERT_EQ("EOF", Read());
|
||||
ASSERT_EQ(3U, DroppedBytes());
|
||||
@ -498,8 +506,8 @@ TEST_P(LogTest, UnexpectedFullType) {
|
||||
TEST_P(LogTest, UnexpectedFirstType) {
|
||||
Write("foo");
|
||||
Write(BigString("bar", 100000));
|
||||
SetByte(6, kFirstType);
|
||||
FixChecksum(0, 3);
|
||||
SetByte(6, GetParam() ? kRecyclableFirstType : kFirstType);
|
||||
FixChecksum(0, 3, !!GetParam());
|
||||
ASSERT_EQ(BigString("bar", 100000), Read());
|
||||
ASSERT_EQ("EOF", Read());
|
||||
ASSERT_EQ(3U, DroppedBytes());
|
||||
@ -573,13 +581,25 @@ TEST_P(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
|
||||
|
||||
TEST_P(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
|
||||
|
||||
TEST_P(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); }
|
||||
TEST_P(LogTest, ReadSecondStart) {
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
CheckInitialOffsetRecord(10000 + header_size, 1);
|
||||
}
|
||||
|
||||
TEST_P(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); }
|
||||
TEST_P(LogTest, ReadThirdOneOff) {
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
CheckInitialOffsetRecord(10000 + header_size + 1, 2);
|
||||
}
|
||||
|
||||
TEST_P(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
|
||||
TEST_P(LogTest, ReadThirdStart) {
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
CheckInitialOffsetRecord(20000 + 2 * header_size, 2);
|
||||
}
|
||||
|
||||
TEST_P(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }
|
||||
TEST_P(LogTest, ReadFourthOneOff) {
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
CheckInitialOffsetRecord(20000 + 2 * header_size + 1, 3);
|
||||
}
|
||||
|
||||
TEST_P(LogTest, ReadFourthFirstBlockTrailer) {
|
||||
CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
|
||||
@ -594,8 +614,9 @@ TEST_P(LogTest, ReadFourthLastBlock) {
|
||||
}
|
||||
|
||||
TEST_P(LogTest, ReadFourthStart) {
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
CheckInitialOffsetRecord(
|
||||
2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
|
||||
2 * (header_size + 1000) + (2 * log::kBlockSize - 1000) + 3 * header_size,
|
||||
3);
|
||||
}
|
||||
|
||||
@ -606,7 +627,8 @@ TEST_P(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
|
||||
TEST_P(LogTest, ClearEofSingleBlock) {
|
||||
Write("foo");
|
||||
Write("bar");
|
||||
ForceEOF(3 + kHeaderSize + 2);
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
ForceEOF(3 + header_size + 2);
|
||||
ASSERT_EQ("foo", Read());
|
||||
UnmarkEOF();
|
||||
ASSERT_EQ("bar", Read());
|
||||
@ -620,10 +642,11 @@ TEST_P(LogTest, ClearEofSingleBlock) {
|
||||
|
||||
TEST_P(LogTest, ClearEofMultiBlock) {
|
||||
size_t num_full_blocks = 5;
|
||||
size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25;
|
||||
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
|
||||
size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
|
||||
Write(BigString("foo", n));
|
||||
Write(BigString("bar", n));
|
||||
ForceEOF(n + num_full_blocks * kHeaderSize + 10);
|
||||
ForceEOF(n + num_full_blocks * header_size + header_size + 3);
|
||||
ASSERT_EQ(BigString("foo", n), Read());
|
||||
ASSERT_TRUE(IsEOF());
|
||||
UnmarkEOF();
|
||||
|
@ -37,6 +37,10 @@ Status Writer::AddRecord(const Slice& slice) {
|
||||
const char* ptr = slice.data();
|
||||
size_t left = slice.size();
|
||||
|
||||
// Header size varies depending on whether we are recycling or not.
|
||||
const int header_size =
|
||||
recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize;
|
||||
|
||||
// Fragment the record if necessary and emit it. Note that if slice
|
||||
// is empty, we still want to iterate once to emit a single
|
||||
// zero-length record
|
||||
@ -45,32 +49,34 @@ Status Writer::AddRecord(const Slice& slice) {
|
||||
do {
|
||||
const int leftover = kBlockSize - block_offset_;
|
||||
assert(leftover >= 0);
|
||||
if (leftover < kHeaderSize) {
|
||||
if (leftover < header_size) {
|
||||
// Switch to a new block
|
||||
if (leftover > 0) {
|
||||
// Fill the trailer (literal below relies on kHeaderSize being 7)
|
||||
assert(kHeaderSize == 7);
|
||||
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
|
||||
// Fill the trailer (literal below relies on kHeaderSize and
|
||||
// kRecyclableHeaderSize being <= 11)
|
||||
assert(header_size <= 11);
|
||||
dest_->Append(
|
||||
Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", leftover));
|
||||
}
|
||||
block_offset_ = 0;
|
||||
}
|
||||
|
||||
// Invariant: we never leave < kHeaderSize bytes in a block.
|
||||
assert(static_cast<int>(kBlockSize) - block_offset_ >= kHeaderSize);
|
||||
// Invariant: we never leave < header_size bytes in a block.
|
||||
assert(static_cast<int>(kBlockSize) - block_offset_ >= header_size);
|
||||
|
||||
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
|
||||
const size_t avail = kBlockSize - block_offset_ - header_size;
|
||||
const size_t fragment_length = (left < avail) ? left : avail;
|
||||
|
||||
RecordType type;
|
||||
const bool end = (left == fragment_length);
|
||||
if (begin && end) {
|
||||
type = kFullType;
|
||||
type = recycle_log_files_ ? kRecyclableFullType : kFullType;
|
||||
} else if (begin) {
|
||||
type = kFirstType;
|
||||
type = recycle_log_files_ ? kRecyclableFirstType : kFirstType;
|
||||
} else if (end) {
|
||||
type = kLastType;
|
||||
type = recycle_log_files_ ? kRecyclableLastType : kLastType;
|
||||
} else {
|
||||
type = kMiddleType;
|
||||
type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType;
|
||||
}
|
||||
|
||||
s = EmitPhysicalRecord(type, ptr, fragment_length);
|
||||
@ -83,28 +89,48 @@ Status Writer::AddRecord(const Slice& slice) {
|
||||
|
||||
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
|
||||
assert(n <= 0xffff); // Must fit in two bytes
|
||||
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
|
||||
|
||||
size_t header_size;
|
||||
char buf[kRecyclableHeaderSize];
|
||||
|
||||
// Format the header
|
||||
char buf[kHeaderSize];
|
||||
buf[4] = static_cast<char>(n & 0xff);
|
||||
buf[5] = static_cast<char>(n >> 8);
|
||||
buf[6] = static_cast<char>(t);
|
||||
|
||||
uint32_t crc = type_crc_[t];
|
||||
if (t < kRecyclableFullType) {
|
||||
// Legacy record format
|
||||
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
|
||||
header_size = kHeaderSize;
|
||||
} else {
|
||||
// Recyclable record format
|
||||
assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize);
|
||||
header_size = kRecyclableHeaderSize;
|
||||
|
||||
// Only encode low 32-bits of the 64-bit log number. This means
|
||||
// we will fail to detect an old record if we recycled a log from
|
||||
// ~4 billion logs ago, but that is effectively impossible, and
|
||||
// even if it were we'dbe far more likely to see a false positive
|
||||
// on the 32-bit CRC.
|
||||
EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_));
|
||||
crc = crc32c::Extend(crc, buf + 7, 4);
|
||||
}
|
||||
|
||||
// Compute the crc of the record type and the payload.
|
||||
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
|
||||
crc = crc32c::Mask(crc); // Adjust for storage
|
||||
crc = crc32c::Extend(crc, ptr, n);
|
||||
crc = crc32c::Mask(crc); // Adjust for storage
|
||||
EncodeFixed32(buf, crc);
|
||||
|
||||
// Write the header and the payload
|
||||
Status s = dest_->Append(Slice(buf, kHeaderSize));
|
||||
Status s = dest_->Append(Slice(buf, header_size));
|
||||
if (s.ok()) {
|
||||
s = dest_->Append(Slice(ptr, n));
|
||||
if (s.ok()) {
|
||||
s = dest_->Flush();
|
||||
}
|
||||
}
|
||||
block_offset_ += kHeaderSize + n;
|
||||
block_offset_ += header_size + n;
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -43,7 +43,7 @@ namespace log {
|
||||
* Data is written out in kBlockSize chunks. If next record does not fit
|
||||
* into the space left, the leftover space will be padded with \0.
|
||||
*
|
||||
* Record format:
|
||||
* Legacy record format:
|
||||
*
|
||||
* +---------+-----------+-----------+--- ... ---+
|
||||
* |CRC (4B) | Size (2B) | Type (1B) | Payload |
|
||||
@ -57,6 +57,15 @@ namespace log {
|
||||
* blocks that are larger than kBlockSize
|
||||
* Payload = Byte stream as long as specified by the payload size
|
||||
*
|
||||
* Recyclable record format:
|
||||
*
|
||||
* +---------+-----------+-----------+----------------+--- ... ---+
|
||||
* |CRC (4B) | Size (2B) | Type (1B) | Log number (4B)| Payload |
|
||||
* +---------+-----------+-----------+----------------+--- ... ---+
|
||||
*
|
||||
* Same as above, with the addition of
|
||||
* Log number = 32bit log file number, so that we can distinguish between
|
||||
* records written by the most recent log writer vs a previous one.
|
||||
*/
|
||||
class Writer {
|
||||
public:
|
||||
|
Loading…
x
Reference in New Issue
Block a user