Integrate WAL compression into log reader/writer. (#9642)

Summary:
Integrate the streaming compress/uncompress API into WAL compression.
The streaming compression object is stored in the log_writer along with a reusable output buffer to store the compressed buffer(s).
The streaming uncompress object is stored in the log_reader along with a reusable output buffer to store the uncompressed buffer(s).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9642

Test Plan:
Added unit tests to verify different scenarios - large buffers, split compressed buffers, etc.

Future optimizations:
The overhead for small records is quite high, so it makes sense to compress only buffers above a certain threshold and use a separate record type to indicate that those records are compressed.

Reviewed By: anand1976

Differential Revision: D34709167

Pulled By: sidroyc

fbshipit-source-id: a37a3cd1301adff6152fb3fcd23726106af07dd4
This commit is contained in:
Siddhartha Roychowdhury 2022-03-09 15:49:53 -08:00 committed by Facebook GitHub Bot
parent 565fcead22
commit fec4403ff1
7 changed files with 226 additions and 38 deletions

View File

@ -10,6 +10,7 @@
#include "db/log_reader.h"
#include <stdio.h>
#include "file/sequence_file_reader.h"
#include "port/lang.h"
#include "rocksdb/env.h"
@ -41,10 +42,14 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
recycled_(false),
first_record_read_(false),
compression_type_(kNoCompression),
compression_type_record_read_(false) {}
compression_type_record_read_(false),
uncompress_(nullptr) {}
Reader::~Reader() {
delete[] backing_store_;
if (uncompress_) {
delete uncompress_;
}
}
// For kAbsoluteConsistency, on clean shutdown we don't expect any error
@ -58,6 +63,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode wal_recovery_mode) {
scratch->clear();
record->clear();
if (uncompress_) {
uncompress_->Reset();
}
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
@ -235,8 +243,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
ReportCorruption(fragment.size(),
"could not decode SetCompressionType record");
} else {
compression_type_ = compression_record.GetCompressionType();
compression_type_record_read_ = true;
InitCompression(compression_record);
}
break;
}
@ -450,9 +457,43 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
buffer_.remove_prefix(header_size + length);
if (!uncompress_ || type == kSetCompressionType) {
*result = Slice(header + header_size, length);
return type;
} else {
// Uncompress compressed records
uncompressed_record_.clear();
size_t uncompressed_size = 0;
int remaining = 0;
do {
remaining = uncompress_->Uncompress(header + header_size, length,
uncompressed_buffer_.get(),
&uncompressed_size);
if (remaining < 0) {
buffer_.clear();
return kBadRecord;
}
if (uncompressed_size > 0) {
uncompressed_record_.append(uncompressed_buffer_.get(),
uncompressed_size);
}
} while (remaining > 0 || uncompressed_size == kBlockSize);
*result = Slice(uncompressed_record_);
return type;
}
}
}
// Initialize uncompress related fields
void Reader::InitCompression(const CompressionTypeRecord& compression_record) {
compression_type_ = compression_record.GetCompressionType();
compression_type_record_read_ = true;
constexpr uint32_t compression_format_version = 2;
uncompress_ = StreamingUncompress::Create(
compression_type_, compression_format_version, kBlockSize);
assert(uncompress_ != nullptr);
uncompressed_buffer_ = std::unique_ptr<char[]>(new char[kBlockSize]);
assert(uncompressed_buffer_);
}
bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
@ -461,6 +502,9 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
assert(scratch != nullptr);
record->clear();
scratch->clear();
if (uncompress_) {
uncompress_->Reset();
}
uint64_t prospective_record_offset = 0;
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
@ -562,7 +606,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
ReportCorruption(fragment.size(),
"could not decode SetCompressionType record");
} else {
compression_type_ = compression_record.GetCompressionType();
InitCompression(compression_record);
}
break;
}
@ -700,9 +744,33 @@ bool FragmentBufferedReader::TryReadFragment(
buffer_.remove_prefix(header_size + length);
if (!uncompress_ || type == kSetCompressionType) {
*fragment = Slice(header + header_size, length);
*fragment_type_or_err = type;
return true;
} else {
// Uncompress compressed records
uncompressed_record_.clear();
size_t uncompressed_size = 0;
int remaining = 0;
do {
remaining = uncompress_->Uncompress(header + header_size, length,
uncompressed_buffer_.get(),
&uncompressed_size);
if (remaining < 0) {
buffer_.clear();
*fragment_type_or_err = kBadRecord;
return true;
}
if (uncompressed_size > 0) {
uncompressed_record_.append(uncompressed_buffer_.get(),
uncompressed_size);
}
} while (remaining > 0 || uncompressed_size == kBlockSize);
*fragment = Slice(std::move(uncompressed_record_));
*fragment_type_or_err = type;
return true;
}
}
} // namespace log

View File

@ -136,6 +136,11 @@ class Reader {
CompressionType compression_type_;
// Track whether the compression type record has been read or not.
bool compression_type_record_read_;
StreamingUncompress* uncompress_;
// Reusable uncompressed output buffer
std::unique_ptr<char[]> uncompressed_buffer_;
// Reusable uncompressed record
std::string uncompressed_record_;
// Extend record types with the following special values
enum {
@ -167,6 +172,8 @@ class Reader {
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
void InitCompression(const CompressionTypeRecord& compression_record);
};
class FragmentBufferedReader : public Reader {

View File

@ -904,6 +904,11 @@ class CompressionLogTest : public LogTest {
};
TEST_P(CompressionLogTest, Empty) {
CompressionType compression_type = std::get<2>(GetParam());
if (!StreamingCompressionTypeSupported(compression_type)) {
ROCKSDB_GTEST_SKIP("Test requires support for compression type");
return;
}
ASSERT_OK(SetupTestEnv());
const bool compression_enabled =
std::get<2>(GetParam()) == kNoCompression ? false : true;
@ -913,6 +918,57 @@ TEST_P(CompressionLogTest, Empty) {
ASSERT_EQ("EOF", Read());
}
TEST_P(CompressionLogTest, ReadWrite) {
CompressionType compression_type = std::get<2>(GetParam());
if (!StreamingCompressionTypeSupported(compression_type)) {
ROCKSDB_GTEST_SKIP("Test requires support for compression type");
return;
}
ASSERT_OK(SetupTestEnv());
Write("foo");
Write("bar");
Write("");
Write("xxxx");
ASSERT_EQ("foo", Read());
ASSERT_EQ("bar", Read());
ASSERT_EQ("", Read());
ASSERT_EQ("xxxx", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
}
TEST_P(CompressionLogTest, ManyBlocks) {
CompressionType compression_type = std::get<2>(GetParam());
if (!StreamingCompressionTypeSupported(compression_type)) {
ROCKSDB_GTEST_SKIP("Test requires support for compression type");
return;
}
ASSERT_OK(SetupTestEnv());
for (int i = 0; i < 100000; i++) {
Write(NumberString(i));
}
for (int i = 0; i < 100000; i++) {
ASSERT_EQ(NumberString(i), Read());
}
ASSERT_EQ("EOF", Read());
}
TEST_P(CompressionLogTest, Fragmentation) {
CompressionType compression_type = std::get<2>(GetParam());
if (!StreamingCompressionTypeSupported(compression_type)) {
ROCKSDB_GTEST_SKIP("Test requires support for compression type");
return;
}
ASSERT_OK(SetupTestEnv());
Write("small");
Write(BigString("medium", 50000));
Write(BigString("large", 100000));
ASSERT_EQ("small", Read());
ASSERT_EQ(BigString("medium", 50000), Read());
ASSERT_EQ(BigString("large", 100000), Read());
ASSERT_EQ("EOF", Read());
}
INSTANTIATE_TEST_CASE_P(
Compression, CompressionLogTest,
::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
@ -942,34 +998,34 @@ TEST_P(StreamingCompressionTest, Basic) {
// Call compress till the entire input is consumed
do {
char* output_buffer = (char*)allocator->Allocate(kBlockSize);
size_t output_size;
size_t output_pos;
remaining = compress->Compress(input_buffer.c_str(), input_size,
output_buffer, &output_size);
if (output_size > 0) {
output_buffer, &output_pos);
if (output_pos > 0) {
std::string compressed_buffer;
compressed_buffer.assign(output_buffer, output_size);
compressed_buffer.assign(output_buffer, output_pos);
compressed_buffers.emplace_back(std::move(compressed_buffer));
}
allocator->Deallocate((void*)output_buffer);
} while (remaining > 0);
std::string uncompressed_buffer = "";
int ret_val = 0;
size_t output_size;
size_t output_pos;
char* uncompressed_output_buffer = (char*)allocator->Allocate(kBlockSize);
// Uncompress the fragments and concatenate them.
for (int i = 0; i < (int)compressed_buffers.size(); i++) {
// Call uncompress till either the entire input is consumed or the output
// buffer size is equal to the allocated output buffer size.
do {
ret_val = uncompress->Uncompress(
compressed_buffers[i].c_str(), compressed_buffers[i].size(),
uncompressed_output_buffer, &output_size);
if (output_size > 0) {
ret_val = uncompress->Uncompress(compressed_buffers[i].c_str(),
compressed_buffers[i].size(),
uncompressed_output_buffer, &output_pos);
if (output_pos > 0) {
std::string uncompressed_fragment;
uncompressed_fragment.assign(uncompressed_output_buffer, output_size);
uncompressed_fragment.assign(uncompressed_output_buffer, output_pos);
uncompressed_buffer += uncompressed_fragment;
}
} while (ret_val > 0 || output_size == kBlockSize);
} while (ret_val > 0 || output_pos == kBlockSize);
}
allocator->Deallocate((void*)uncompressed_output_buffer);
delete allocator;

View File

@ -10,6 +10,7 @@
#include "db/log_writer.h"
#include <stdint.h>
#include "file/writable_file_writer.h"
#include "rocksdb/env.h"
#include "util/coding.h"
@ -26,7 +27,8 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
log_number_(log_number),
recycle_log_files_(recycle_log_files),
manual_flush_(manual_flush),
compression_type_(compression_type) {
compression_type_(compression_type),
compress_(nullptr) {
for (int i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);
@ -37,6 +39,9 @@ Writer::~Writer() {
if (dest_) {
WriteBuffer().PermitUncheckedError();
}
if (compress_) {
delete compress_;
}
}
IOStatus Writer::WriteBuffer() { return dest_->Flush(); }
@ -64,6 +69,12 @@ IOStatus Writer::AddRecord(const Slice& slice,
// zero-length record
IOStatus s;
bool begin = true;
int compress_remaining = 0;
bool compress_start = false;
if (compress_) {
compress_->Reset();
compress_start = true;
}
do {
const int64_t leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
@ -87,10 +98,34 @@ IOStatus Writer::AddRecord(const Slice& slice,
assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size);
const size_t avail = kBlockSize - block_offset_ - header_size;
// Compress the record if compression is enabled.
// Compress() is called at least once (compress_start=true) and after the
// previous generated compressed chunk is written out as one or more
// physical records (left=0).
if (compress_ && (compress_start || left == 0)) {
compress_remaining = compress_->Compress(slice.data(), slice.size(),
compressed_buffer_.get(), &left);
if (compress_remaining < 0) {
// Set failure status
s = IOStatus::IOError("Unexpected WAL compression error");
s.SetDataLoss(true);
break;
} else if (left == 0) {
// Nothing left to compress
if (!compress_start) {
break;
}
}
compress_start = false;
ptr = compressed_buffer_.get();
}
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length);
const bool end = (left == fragment_length && compress_remaining == 0);
if (begin && end) {
type = recycle_log_files_ ? kRecyclableFullType : kFullType;
} else if (begin) {
@ -105,7 +140,7 @@ IOStatus Writer::AddRecord(const Slice& slice,
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
} while (s.ok() && (left > 0 || compress_remaining > 0));
if (s.ok()) {
if (!manual_flush_) {
@ -134,6 +169,18 @@ IOStatus Writer::AddCompressionTypeRecord() {
if (!manual_flush_) {
s = dest_->Flush();
}
// Initialize fields required for compression
const size_t max_output_buffer_len =
kBlockSize - (recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize);
CompressionOptions opts;
constexpr uint32_t compression_format_version = 2;
compress_ = StreamingCompress::Create(compression_type_, opts,
compression_format_version,
max_output_buffer_len);
assert(compress_ != nullptr);
compressed_buffer_ =
std::unique_ptr<char[]>(new char[max_output_buffer_len]);
assert(compressed_buffer_);
} else {
// Disable compression if the record could not be added.
compression_type_ = kNoCompression;

View File

@ -17,6 +17,7 @@
#include "rocksdb/io_status.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/compression.h"
namespace ROCKSDB_NAMESPACE {
@ -118,6 +119,9 @@ class Writer {
// Compression Type
CompressionType compression_type_;
StreamingCompress* compress_;
// Reusable compressed output buffer
std::unique_ptr<char[]> compressed_buffer_;
};
} // namespace log

View File

@ -41,10 +41,13 @@ StreamingUncompress* StreamingUncompress::Create(
}
int ZSTDStreamingCompress::Compress(const char* input, size_t input_size,
char* output, size_t* output_size) {
assert(input != nullptr && output != nullptr && input_size > 0 &&
output_size != nullptr);
*output_size = 0;
char* output, size_t* output_pos) {
assert(input != nullptr && output != nullptr && output_pos != nullptr);
*output_pos = 0;
// Don't need to compress an empty input
if (input_size == 0) {
return 0;
}
#ifndef ZSTD_STREAMING
(void)input;
(void)input_size;
@ -61,15 +64,15 @@ int ZSTDStreamingCompress::Compress(const char* input, size_t input_size,
}
ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0};
const size_t remaining =
ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_flush);
ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_end);
if (ZSTD_isError(remaining)) {
// Failure
Reset();
return -1;
}
// Success
*output_size = output_buffer.pos;
return (int)(input_buffer_.size - input_buffer_.pos);
*output_pos = output_buffer.pos;
return (int)remaining;
#endif
}
@ -81,10 +84,13 @@ void ZSTDStreamingCompress::Reset() {
}
int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
char* output, size_t* output_size) {
assert(input != nullptr && output != nullptr && input_size > 0 &&
output_size != nullptr);
*output_size = 0;
char* output, size_t* output_pos) {
assert(input != nullptr && output != nullptr && output_pos != nullptr);
*output_pos = 0;
// Don't need to uncompress an empty input
if (input_size == 0) {
return 0;
}
#ifdef ZSTD_STREAMING
if (input_buffer_.src != input) {
// New input
@ -96,7 +102,7 @@ int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
Reset();
return -1;
}
*output_size = output_buffer.pos;
*output_pos = output_buffer.pos;
return (int)(input_buffer_.size - input_buffer_.pos);
#else
(void)input;

View File

@ -1622,7 +1622,7 @@ class StreamingCompress {
// Returns -1 for errors, the remaining size of the input buffer that needs to
// be compressed
virtual int Compress(const char* input, size_t input_size, char* output,
size_t* output_size) = 0;
size_t* output_pos) = 0;
// static method to create object of a class inherited from StreamingCompress
// based on the actual compression type.
static StreamingCompress* Create(CompressionType compression_type,
@ -1662,7 +1662,7 @@ class StreamingUncompress {
// output_size - size of the output buffer
// Returns -1 for errors, remaining input to be processed otherwise.
virtual int Uncompress(const char* input, size_t input_size, char* output,
size_t* output_size) = 0;
size_t* output_pos) = 0;
static StreamingUncompress* Create(CompressionType compression_type,
uint32_t compress_format_version,
size_t max_output_len);
@ -1693,7 +1693,7 @@ class ZSTDStreamingCompress final : public StreamingCompress {
#endif
}
int Compress(const char* input, size_t input_size, char* output,
size_t* output_size) override;
size_t* output_pos) override;
void Reset() override;
#ifdef ZSTD_STREAMING
ZSTD_CCtx* cctx_;