diff --git a/CMakeLists.txt b/CMakeLists.txt index c159300a2..5de221b0f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -830,6 +830,7 @@ set(SOURCES util/coding.cc util/compaction_job_stats_impl.cc util/comparator.cc + util/compression.cc util/compression_context_cache.cc util/concurrent_task_limiter_impl.cc util/crc32c.cc diff --git a/TARGETS b/TARGETS index 494eb45c0..5cebc27f0 100644 --- a/TARGETS +++ b/TARGETS @@ -227,6 +227,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "util/coding.cc", "util/compaction_job_stats_impl.cc", "util/comparator.cc", + "util/compression.cc", "util/compression_context_cache.cc", "util/concurrent_task_limiter_impl.cc", "util/crc32c.cc", @@ -544,6 +545,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "util/coding.cc", "util/compaction_job_stats_impl.cc", "util/comparator.cc", + "util/compression.cc", "util/compression_context_cache.cc", "util/concurrent_task_limiter_impl.cc", "util/crc32c.cc", diff --git a/db/log_test.cc b/db/log_test.cc index 3723347c8..59de62d13 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -17,6 +17,7 @@ #include "util/coding.h" #include "util/crc32c.h" #include "util/random.h" +#include "utilities/memory_allocators.h" namespace ROCKSDB_NAMESPACE { namespace log { @@ -918,6 +919,73 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(CompressionType::kNoCompression, CompressionType::kZSTD))); +class StreamingCompressionTest + : public ::testing::TestWithParam> {}; + +TEST_P(StreamingCompressionTest, Basic) { + size_t input_size = std::get<0>(GetParam()); + CompressionType compression_type = std::get<1>(GetParam()); + if (!StreamingCompressionTypeSupported(compression_type)) { + ROCKSDB_GTEST_SKIP("Test requires support for compression type"); + return; + } + CompressionOptions opts; + constexpr uint32_t compression_format_version = 2; + StreamingCompress* compress = StreamingCompress::Create( + compression_type, opts, compression_format_version, kBlockSize); + StreamingUncompress* uncompress = StreamingUncompress::Create( + compression_type, compression_format_version, kBlockSize); + MemoryAllocator* allocator = new DefaultMemoryAllocator(); + std::string input_buffer = BigString("abc", input_size); + std::vector compressed_buffers; + size_t remaining; + // Call compress till the entire input is consumed + do { + char* output_buffer = (char*)allocator->Allocate(kBlockSize); + size_t output_size; + remaining = compress->Compress(input_buffer.c_str(), input_size, + output_buffer, &output_size); + if (output_size > 0) { + std::string compressed_buffer; + compressed_buffer.assign(output_buffer, output_size); + compressed_buffers.emplace_back(std::move(compressed_buffer)); + } + allocator->Deallocate((void*)output_buffer); + } while (remaining > 0); + std::string uncompressed_buffer = ""; + int ret_val = 0; + size_t output_size; + char* uncompressed_output_buffer = (char*)allocator->Allocate(kBlockSize); + // Uncompress the fragments and concatenate them. + for (int i = 0; i < (int)compressed_buffers.size(); i++) { + // Call uncompress till either the entire input is consumed or the output + // buffer size is equal to the allocated output buffer size. + do { + ret_val = uncompress->Uncompress( + compressed_buffers[i].c_str(), compressed_buffers[i].size(), + uncompressed_output_buffer, &output_size); + if (output_size > 0) { + std::string uncompressed_fragment; + uncompressed_fragment.assign(uncompressed_output_buffer, output_size); + uncompressed_buffer += uncompressed_fragment; + } + } while (ret_val > 0 || output_size == kBlockSize); + } + allocator->Deallocate((void*)uncompressed_output_buffer); + delete allocator; + delete compress; + delete uncompress; + // The final return value from uncompress() should be 0. + ASSERT_EQ(ret_val, 0); + ASSERT_EQ(input_buffer, uncompressed_buffer); +} + +INSTANTIATE_TEST_CASE_P( + StreamingCompression, StreamingCompressionTest, + ::testing::Combine(::testing::Values(10, 100, 1000, kBlockSize, + kBlockSize * 2), + ::testing::Values(CompressionType::kZSTD))); + } // namespace log } // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index 8e9517aeb..f8c1f0307 100644 --- a/src.mk +++ b/src.mk @@ -214,6 +214,7 @@ LIB_SOURCES = \ util/coding.cc \ util/compaction_job_stats_impl.cc \ util/comparator.cc \ + util/compression.cc \ util/compression_context_cache.cc \ util/concurrent_task_limiter_impl.cc \ util/crc32c.cc \ diff --git a/util/compression.cc b/util/compression.cc new file mode 100644 index 000000000..aacb72d22 --- /dev/null +++ b/util/compression.cc @@ -0,0 +1,116 @@ +// Copyright (c) 2022-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/compression.h" + +namespace ROCKSDB_NAMESPACE { + +StreamingCompress* StreamingCompress::Create(CompressionType compression_type, + const CompressionOptions& opts, + uint32_t compress_format_version, + size_t max_output_len) { + switch (compression_type) { + case kZSTD: { + if (!ZSTD_Streaming_Supported()) { + return nullptr; + } + return new ZSTDStreamingCompress(opts, compress_format_version, + max_output_len); + } + default: + return nullptr; + } +} + +StreamingUncompress* StreamingUncompress::Create( + CompressionType compression_type, uint32_t compress_format_version, + size_t max_output_len) { + switch (compression_type) { + case kZSTD: { + if (!ZSTD_Streaming_Supported()) { + return nullptr; + } + return new ZSTDStreamingUncompress(compress_format_version, + max_output_len); + } + default: + return nullptr; + } +} + +int ZSTDStreamingCompress::Compress(const char* input, size_t input_size, + char* output, size_t* output_size) { + assert(input != nullptr && output != nullptr && input_size > 0 && + output_size != nullptr); + *output_size = 0; +#ifndef ZSTD_STREAMING + (void)input; + (void)input_size; + (void)output; + return -1; +#else + if (input_buffer_.src == nullptr || input_buffer_.src != input) { + // New input + // Catch errors where the previous input was not fully decompressed. + assert(input_buffer_.pos == input_buffer_.size); + input_buffer_ = {input, input_size, /*pos=*/0}; + } else if (input_buffer_.src == input) { + // Same input, not fully compressed. + } + ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0}; + const size_t remaining = + ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_flush); + if (ZSTD_isError(remaining)) { + // Failure + Reset(); + return -1; + } + // Success + *output_size = output_buffer.pos; + return (int)(input_buffer_.size - input_buffer_.pos); +#endif +} + +void ZSTDStreamingCompress::Reset() { +#ifdef ZSTD_STREAMING + ZSTD_CCtx_reset(cctx_, ZSTD_ResetDirective::ZSTD_reset_session_only); + input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0}; +#endif +} + +int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size, + char* output, size_t* output_size) { + assert(input != nullptr && output != nullptr && input_size > 0 && + output_size != nullptr); + *output_size = 0; +#ifdef ZSTD_STREAMING + if (input_buffer_.src != input) { + // New input + input_buffer_ = {input, input_size, /*pos=*/0}; + } + ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0}; + size_t ret = ZSTD_decompressStream(dctx_, &output_buffer, &input_buffer_); + if (ZSTD_isError(ret)) { + Reset(); + return -1; + } + *output_size = output_buffer.pos; + return (int)(input_buffer_.size - input_buffer_.pos); +#else + (void)input; + (void)input_size; + (void)output; + return -1; +#endif +} + +void ZSTDStreamingUncompress::Reset() { +#ifdef ZSTD_STREAMING + ZSTD_DCtx_reset(dctx_, ZSTD_ResetDirective::ZSTD_reset_session_only); + input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0}; +#endif +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/util/compression.h b/util/compression.h index 8f21807e6..a318c0cab 100644 --- a/util/compression.h +++ b/util/compression.h @@ -49,6 +49,7 @@ #include #if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+ #include +#define ZSTD_STREAMING #endif // ZSTD_VERSION_NUMBER >= 10103 namespace ROCKSDB_NAMESPACE { // Need this for the context allocation override @@ -1593,4 +1594,137 @@ class CompressionTypeRecord { CompressionType compression_type_; }; +// Base class to implement compression for a stream of buffers. +// Instantiate an implementation of the class using Create() with the +// compression type and use Compress() repeatedly. +// The output buffer needs to be at least max_output_len. +// Call Reset() in between frame boundaries or in case of an error. +// NOTE: This class is not thread safe. +class StreamingCompress { + public: + StreamingCompress(CompressionType compression_type, + const CompressionOptions& opts, + uint32_t compress_format_version, size_t max_output_len) + : compression_type_(compression_type), + opts_(opts), + compress_format_version_(compress_format_version), + max_output_len_(max_output_len) {} + virtual ~StreamingCompress() = default; + // compress should be called repeatedly with the same input till the method + // returns 0 + // Parameters: + // input - buffer to compress + // input_size - size of input buffer + // output - compressed buffer allocated by caller, should be at least + // max_output_len + // output_size - size of the output buffer + // Returns -1 for errors, the remaining size of the input buffer that needs to + // be compressed + virtual int Compress(const char* input, size_t input_size, char* output, + size_t* output_size) = 0; + // static method to create object of a class inherited from StreamingCompress + // based on the actual compression type. + static StreamingCompress* Create(CompressionType compression_type, + const CompressionOptions& opts, + uint32_t compress_format_version, + size_t max_output_len); + virtual void Reset() = 0; + + protected: + const CompressionType compression_type_; + const CompressionOptions opts_; + const uint32_t compress_format_version_; + const size_t max_output_len_; +}; + +// Base class to uncompress a stream of compressed buffers. +// Instantiate an implementation of the class using Create() with the +// compression type and use Uncompress() repeatedly. +// The output buffer needs to be at least max_output_len. +// Call Reset() in between frame boundaries or in case of an error. +// NOTE: This class is not thread safe. +class StreamingUncompress { + public: + StreamingUncompress(CompressionType compression_type, + uint32_t compress_format_version, size_t max_output_len) + : compression_type_(compression_type), + compress_format_version_(compress_format_version), + max_output_len_(max_output_len) {} + virtual ~StreamingUncompress() = default; + // uncompress should be called again with the same input if output_size is + // equal to max_output_len or with the next input fragment. + // Parameters: + // input - buffer to uncompress + // input_size - size of input buffer + // output - uncompressed buffer allocated by caller, should be at least + // max_output_len + // output_size - size of the output buffer + // Returns -1 for errors, remaining input to be processed otherwise. + virtual int Uncompress(const char* input, size_t input_size, char* output, + size_t* output_size) = 0; + static StreamingUncompress* Create(CompressionType compression_type, + uint32_t compress_format_version, + size_t max_output_len); + virtual void Reset() = 0; + + protected: + CompressionType compression_type_; + uint32_t compress_format_version_; + size_t max_output_len_; +}; + +class ZSTDStreamingCompress final : public StreamingCompress { + public: + explicit ZSTDStreamingCompress(const CompressionOptions& opts, + uint32_t compress_format_version, + size_t max_output_len) + : StreamingCompress(kZSTD, opts, compress_format_version, + max_output_len) { +#ifdef ZSTD_STREAMING + cctx_ = ZSTD_createCCtx(); + assert(cctx_ != nullptr); + input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0}; +#endif + } + ~ZSTDStreamingCompress() override { +#ifdef ZSTD_STREAMING + ZSTD_freeCCtx(cctx_); +#endif + } + int Compress(const char* input, size_t input_size, char* output, + size_t* output_size) override; + void Reset() override; +#ifdef ZSTD_STREAMING + ZSTD_CCtx* cctx_; + ZSTD_inBuffer input_buffer_; +#endif +}; + +class ZSTDStreamingUncompress final : public StreamingUncompress { + public: + explicit ZSTDStreamingUncompress(uint32_t compress_format_version, + size_t max_output_len) + : StreamingUncompress(kZSTD, compress_format_version, max_output_len) { +#ifdef ZSTD_STREAMING + dctx_ = ZSTD_createDCtx(); + assert(dctx_ != nullptr); + input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0}; +#endif + } + ~ZSTDStreamingUncompress() override { +#ifdef ZSTD_STREAMING + ZSTD_freeDCtx(dctx_); +#endif + } + int Uncompress(const char* input, size_t input_size, char* output, + size_t* output_size) override; + void Reset() override; + + private: +#ifdef ZSTD_STREAMING + ZSTD_DCtx* dctx_; + ZSTD_inBuffer input_buffer_; +#endif +}; + } // namespace ROCKSDB_NAMESPACE