From 30fb9dd50f72d901847695cb919f25d74d6547e3 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Fri, 25 Sep 2020 09:00:29 -0700 Subject: [PATCH] Introduce a helper method UncompressData (#7434) Summary: The patch introduces a helper method in `util/compression.h` called `UncompressData` that dispatches calls to the correct uncompression method based on type, and changes `UncompressBlockContentsForCompressionType` and `Benchmark::Uncompress` in `db_bench` so they are implemented in terms of the new method. This eliminates some code duplication. (`Benchmark::Compress` is also updated to use the previously introduced `CompressData` helper.) In addition, the patch brings the implementation of `Snappy_Uncompress` into sync with the other uncompression methods by making the method compute the buffer size and allocate the buffer itself. Finally, the patch eliminates some potentially risky back-and-forth conversions between various unsigned and signed integer types by exposing the size of the allocated buffer as a `size_t` instead of an `int`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7434 Test Plan: `make check` `./db_bench -benchmarks=compress,uncompress --compression_type ...` Reviewed By: riversand963 Differential Revision: D23900011 Pulled By: ltamasi fbshipit-source-id: b25df63ceec4639889be94acb22eb53e530c54e0 --- port/win/xpress_win.cc | 15 ++---- port/win/xpress_win.h | 3 +- table/format.cc | 96 ++++--------------------------------- tools/db_bench_tool.cc | 96 +++++-------------------------------- util/compression.h | 106 ++++++++++++++++++++++++++++------------- 5 files changed, 99 insertions(+), 217 deletions(-) diff --git a/port/win/xpress_win.cc b/port/win/xpress_win.cc index c016d62b2..9039ec89b 100644 --- a/port/win/xpress_win.cc +++ b/port/win/xpress_win.cc @@ -129,10 +129,9 @@ bool Compress(const char* input, size_t length, std::string* output) { } char* Decompress(const char* input_data, size_t input_length, - int* decompress_size) { - + size_t* uncompressed_size) { assert(input_data != nullptr); - assert(decompress_size != nullptr); + assert(uncompressed_size != nullptr); if (input_length == 0) { return nullptr; @@ -185,14 +184,6 @@ char* Decompress(const char* input_data, size_t input_length, assert(decompressedBufferSize > 0); - // On Windows we are limited to a 32-bit int for the - // output data size argument - // so we hopefully never get here - if (decompressedBufferSize > std::numeric_limits::max()) { - assert(false); - return nullptr; - } - // The callers are deallocating using delete[] // thus we must allocate with new[] std::unique_ptr outputBuffer(new char[decompressedBufferSize]); @@ -216,7 +207,7 @@ char* Decompress(const char* input_data, size_t input_length, return nullptr; } - *decompress_size = static_cast(decompressedDataSize); + *uncompressed_size = decompressedDataSize; // Return the raw buffer to the caller supporting the tradition return outputBuffer.release(); diff --git a/port/win/xpress_win.h b/port/win/xpress_win.h index 1214bb7b9..d491f963d 100644 --- a/port/win/xpress_win.h +++ b/port/win/xpress_win.h @@ -20,8 +20,7 @@ namespace xpress { bool Compress(const char* input, size_t length, std::string* output); char* Decompress(const char* input_data, size_t input_length, - int* decompress_size); - + size_t* uncompressed_size); } } } // namespace ROCKSDB_NAMESPACE diff --git a/table/format.cc b/table/format.cc index af74222ba..23dc0bbc1 100644 --- a/table/format.cc +++ b/table/format.cc @@ -347,100 +347,24 @@ Status UncompressBlockContentsForCompressionType( BlockContents* contents, uint32_t format_version, const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) { Status ret = Status::OK(); - CacheAllocationPtr ubuf; assert(uncompression_info.type() != kNoCompression && "Invalid compression type"); StopWatchNano timer(ioptions.env, ShouldReportDetailedTime( ioptions.env, ioptions.statistics)); - int decompress_size = 0; - switch (uncompression_info.type()) { - case kSnappyCompression: { - size_t ulength = 0; - static char snappy_corrupt_msg[] = - "Snappy not supported or corrupted Snappy compressed block contents"; - if (!Snappy_GetUncompressedLength(data, n, &ulength)) { - return Status::Corruption(snappy_corrupt_msg); - } - ubuf = AllocateBlock(ulength, allocator); - if (!Snappy_Uncompress(data, n, ubuf.get())) { - return Status::Corruption(snappy_corrupt_msg); - } - *contents = BlockContents(std::move(ubuf), ulength); - break; - } - case kZlibCompression: - ubuf = Zlib_Uncompress(uncompression_info, data, n, &decompress_size, - GetCompressFormatForVersion(format_version), - allocator); - if (!ubuf) { - static char zlib_corrupt_msg[] = - "Zlib not supported or corrupted Zlib compressed block contents"; - return Status::Corruption(zlib_corrupt_msg); - } - *contents = BlockContents(std::move(ubuf), decompress_size); - break; - case kBZip2Compression: - ubuf = BZip2_Uncompress(data, n, &decompress_size, - GetCompressFormatForVersion(format_version), - allocator); - if (!ubuf) { - static char bzip2_corrupt_msg[] = - "Bzip2 not supported or corrupted Bzip2 compressed block contents"; - return Status::Corruption(bzip2_corrupt_msg); - } - *contents = BlockContents(std::move(ubuf), decompress_size); - break; - case kLZ4Compression: - ubuf = LZ4_Uncompress(uncompression_info, data, n, &decompress_size, - GetCompressFormatForVersion(format_version), - allocator); - if (!ubuf) { - static char lz4_corrupt_msg[] = - "LZ4 not supported or corrupted LZ4 compressed block contents"; - return Status::Corruption(lz4_corrupt_msg); - } - *contents = BlockContents(std::move(ubuf), decompress_size); - break; - case kLZ4HCCompression: - ubuf = LZ4_Uncompress(uncompression_info, data, n, &decompress_size, - GetCompressFormatForVersion(format_version), - allocator); - if (!ubuf) { - static char lz4hc_corrupt_msg[] = - "LZ4HC not supported or corrupted LZ4HC compressed block contents"; - return Status::Corruption(lz4hc_corrupt_msg); - } - *contents = BlockContents(std::move(ubuf), decompress_size); - break; - case kXpressCompression: - // XPRESS allocates memory internally, thus no support for custom - // allocator. - ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size)); - if (!ubuf) { - static char xpress_corrupt_msg[] = - "XPRESS not supported or corrupted XPRESS compressed block " - "contents"; - return Status::Corruption(xpress_corrupt_msg); - } - *contents = BlockContents(std::move(ubuf), decompress_size); - break; - case kZSTD: - case kZSTDNotFinalCompression: - ubuf = ZSTD_Uncompress(uncompression_info, data, n, &decompress_size, - allocator); - if (!ubuf) { - static char zstd_corrupt_msg[] = - "ZSTD not supported or corrupted ZSTD compressed block contents"; - return Status::Corruption(zstd_corrupt_msg); - } - *contents = BlockContents(std::move(ubuf), decompress_size); - break; - default: - return Status::Corruption("bad block type"); + size_t uncompressed_size = 0; + CacheAllocationPtr ubuf = + UncompressData(uncompression_info, data, n, &uncompressed_size, + GetCompressFormatForVersion(format_version), allocator); + if (!ubuf) { + return Status::Corruption( + "Unsupported compression method or corrupted compressed block contents", + CompressionTypeToString(uncompression_info.type())); } + *contents = BlockContents(std::move(ubuf), uncompressed_size); + if (ShouldReportDetailedTime(ioptions.env, ioptions.statistics)) { RecordTimeToHistogram(ioptions.statistics, DECOMPRESSION_TIMES_NANOS, timer.ElapsedNanos()); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 0e3aa223b..b81cae6ce 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -2443,40 +2443,10 @@ class Benchmark { inline bool CompressSlice(const CompressionInfo& compression_info, const Slice& input, std::string* compressed) { - bool ok = true; - switch (FLAGS_compression_type_e) { - case ROCKSDB_NAMESPACE::kSnappyCompression: - ok = Snappy_Compress(compression_info, input.data(), input.size(), - compressed); - break; - case ROCKSDB_NAMESPACE::kZlibCompression: - ok = Zlib_Compress(compression_info, 2, input.data(), input.size(), - compressed); - break; - case ROCKSDB_NAMESPACE::kBZip2Compression: - ok = BZip2_Compress(compression_info, 2, input.data(), input.size(), - compressed); - break; - case ROCKSDB_NAMESPACE::kLZ4Compression: - ok = LZ4_Compress(compression_info, 2, input.data(), input.size(), - compressed); - break; - case ROCKSDB_NAMESPACE::kLZ4HCCompression: - ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(), - compressed); - break; - case ROCKSDB_NAMESPACE::kXpressCompression: - ok = XPRESS_Compress(input.data(), - input.size(), compressed); - break; - case ROCKSDB_NAMESPACE::kZSTD: - ok = ZSTD_Compress(compression_info, input.data(), input.size(), - compressed); - break; - default: - ok = false; - } - return ok; + constexpr uint32_t compress_format_version = 2; + + return CompressData(input, compression_info, compress_format_version, + compressed); } void PrintHeader() { @@ -3601,57 +3571,15 @@ class Benchmark { bool ok = CompressSlice(compression_info, input, &compressed); int64_t bytes = 0; - int decompress_size; + size_t uncompressed_size = 0; while (ok && bytes < 1024 * 1048576) { - CacheAllocationPtr uncompressed; - switch (FLAGS_compression_type_e) { - case ROCKSDB_NAMESPACE::kSnappyCompression: { - // get size and allocate here to make comparison fair - size_t ulength = 0; - if (!Snappy_GetUncompressedLength(compressed.data(), - compressed.size(), &ulength)) { - ok = false; - break; - } - uncompressed = AllocateBlock(ulength, nullptr); - ok = Snappy_Uncompress(compressed.data(), compressed.size(), - uncompressed.get()); - break; - } - case ROCKSDB_NAMESPACE::kZlibCompression: - uncompressed = - Zlib_Uncompress(uncompression_info, compressed.data(), - compressed.size(), &decompress_size, 2); - ok = uncompressed.get() != nullptr; - break; - case ROCKSDB_NAMESPACE::kBZip2Compression: - uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(), - &decompress_size, 2); - ok = uncompressed.get() != nullptr; - break; - case ROCKSDB_NAMESPACE::kLZ4Compression: - uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(), - compressed.size(), &decompress_size, 2); - ok = uncompressed.get() != nullptr; - break; - case ROCKSDB_NAMESPACE::kLZ4HCCompression: - uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(), - compressed.size(), &decompress_size, 2); - ok = uncompressed.get() != nullptr; - break; - case ROCKSDB_NAMESPACE::kXpressCompression: - uncompressed.reset(XPRESS_Uncompress( - compressed.data(), compressed.size(), &decompress_size)); - ok = uncompressed.get() != nullptr; - break; - case ROCKSDB_NAMESPACE::kZSTD: - uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(), - compressed.size(), &decompress_size); - ok = uncompressed.get() != nullptr; - break; - default: - ok = false; - } + constexpr uint32_t compress_format_version = 2; + + CacheAllocationPtr uncompressed = UncompressData( + uncompression_info, compressed.data(), compressed.size(), + &uncompressed_size, compress_format_version); + + ok = uncompressed.get() != nullptr; bytes += input.size(); thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress); } diff --git a/util/compression.h b/util/compression.h index 67db5d60f..4f5254499 100644 --- a/util/compression.h +++ b/util/compression.h @@ -616,26 +616,30 @@ inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input, #endif } -inline bool Snappy_GetUncompressedLength(const char* input, size_t length, - size_t* result) { +inline CacheAllocationPtr Snappy_Uncompress( + const char* input, size_t length, size_t* uncompressed_size, + MemoryAllocator* allocator = nullptr) { #ifdef SNAPPY - return snappy::GetUncompressedLength(input, length, result); -#else - (void)input; - (void)length; - (void)result; - return false; -#endif -} + size_t uncompressed_length = 0; + if (!snappy::GetUncompressedLength(input, length, &uncompressed_length)) { + return nullptr; + } -inline bool Snappy_Uncompress(const char* input, size_t length, char* output) { -#ifdef SNAPPY - return snappy::RawUncompress(input, length, output); + CacheAllocationPtr output = AllocateBlock(uncompressed_length, allocator); + + if (!snappy::RawUncompress(input, length, output.get())) { + return nullptr; + } + + *uncompressed_size = uncompressed_length; + + return output; #else (void)input; (void)length; - (void)output; - return false; + (void)uncompressed_size; + (void)allocator; + return nullptr; #endif } @@ -754,7 +758,7 @@ inline bool Zlib_Compress(const CompressionInfo& info, // dictionary. inline CacheAllocationPtr Zlib_Uncompress( const UncompressionInfo& info, const char* input_data, size_t input_length, - int* decompress_size, uint32_t compress_format_version, + size_t* uncompressed_size, uint32_t compress_format_version, MemoryAllocator* allocator = nullptr, int windowBits = -14) { #ifdef ZLIB uint32_t output_len = 0; @@ -836,14 +840,15 @@ inline CacheAllocationPtr Zlib_Uncompress( // If we encoded decompressed block size, we should have no bytes left assert(compress_format_version != 2 || _stream.avail_out == 0); - *decompress_size = static_cast(output_len - _stream.avail_out); + assert(output_len >= _stream.avail_out); + *uncompressed_size = output_len - _stream.avail_out; inflateEnd(&_stream); return output; #else (void)info; (void)input_data; (void)input_length; - (void)decompress_size; + (void)uncompressed_size; (void)compress_format_version; (void)allocator; (void)windowBits; @@ -917,7 +922,7 @@ inline bool BZip2_Compress(const CompressionInfo& /*info*/, // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format inline CacheAllocationPtr BZip2_Uncompress( - const char* input_data, size_t input_length, int* decompress_size, + const char* input_data, size_t input_length, size_t* uncompressed_size, uint32_t compress_format_version, MemoryAllocator* allocator = nullptr) { #ifdef BZIP2 uint32_t output_len = 0; @@ -982,13 +987,14 @@ inline CacheAllocationPtr BZip2_Uncompress( // If we encoded decompressed block size, we should have no bytes left assert(compress_format_version != 2 || _stream.avail_out == 0); - *decompress_size = static_cast(output_len - _stream.avail_out); + assert(output_len >= _stream.avail_out); + *uncompressed_size = output_len - _stream.avail_out; BZ2_bzDecompressEnd(&_stream); return output; #else (void)input_data; (void)input_length; - (void)decompress_size; + (void)uncompressed_size; (void)compress_format_version; (void)allocator; return nullptr; @@ -1073,7 +1079,7 @@ inline bool LZ4_Compress(const CompressionInfo& info, inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info, const char* input_data, size_t input_length, - int* decompress_size, + size_t* uncompressed_size, uint32_t compress_format_version, MemoryAllocator* allocator = nullptr) { #ifdef LZ4 @@ -1096,6 +1102,9 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info, } auto output = AllocateBlock(output_len, allocator); + + int decompress_bytes = 0; + #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); const Slice& compression_dict = info.dict().GetRawDict(); @@ -1103,26 +1112,27 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info, LZ4_setStreamDecode(stream, compression_dict.data(), static_cast(compression_dict.size())); } - *decompress_size = LZ4_decompress_safe_continue( + decompress_bytes = LZ4_decompress_safe_continue( stream, input_data, output.get(), static_cast(input_length), static_cast(output_len)); LZ4_freeStreamDecode(stream); #else // up to r123 - *decompress_size = LZ4_decompress_safe(input_data, output.get(), + decompress_bytes = LZ4_decompress_safe(input_data, output.get(), static_cast(input_length), static_cast(output_len)); #endif // LZ4_VERSION_NUMBER >= 10400 - if (*decompress_size < 0) { + if (decompress_bytes < 0) { return nullptr; } - assert(*decompress_size == static_cast(output_len)); + assert(decompress_bytes == static_cast(output_len)); + *uncompressed_size = decompress_bytes; return output; #else // LZ4 (void)info; (void)input_data; (void)input_length; - (void)decompress_size; + (void)uncompressed_size; (void)compress_format_version; (void)allocator; return nullptr; @@ -1227,13 +1237,13 @@ inline bool XPRESS_Compress(const char* /*input*/, size_t /*length*/, #ifdef XPRESS inline char* XPRESS_Uncompress(const char* input_data, size_t input_length, - int* decompress_size) { - return port::xpress::Decompress(input_data, input_length, decompress_size); + size_t* uncompressed_size) { + return port::xpress::Decompress(input_data, input_length, uncompressed_size); } #else inline char* XPRESS_Uncompress(const char* /*input_data*/, size_t /*input_length*/, - int* /*decompress_size*/) { + size_t* /*uncompressed_size*/) { return nullptr; } #endif @@ -1298,7 +1308,7 @@ inline bool ZSTD_Compress(const CompressionInfo& info, const char* input, // dictionary. inline CacheAllocationPtr ZSTD_Uncompress( const UncompressionInfo& info, const char* input_data, size_t input_length, - int* decompress_size, MemoryAllocator* allocator = nullptr) { + size_t* uncompressed_size, MemoryAllocator* allocator = nullptr) { #ifdef ZSTD uint32_t output_len = 0; if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, @@ -1329,13 +1339,13 @@ inline CacheAllocationPtr ZSTD_Uncompress( ZSTD_decompress(output.get(), output_len, input_data, input_length); #endif // ZSTD_VERSION_NUMBER >= 500 assert(actual_output_length == output_len); - *decompress_size = static_cast(actual_output_length); + *uncompressed_size = actual_output_length; return output; #else // ZSTD (void)info; (void)input_data; (void)input_length; - (void)decompress_size; + (void)uncompressed_size; (void)allocator; return nullptr; #endif @@ -1449,4 +1459,34 @@ inline bool CompressData(const Slice& raw, return ret; } +inline CacheAllocationPtr UncompressData( + const UncompressionInfo& uncompression_info, const char* data, size_t n, + size_t* uncompressed_size, uint32_t compress_format_version, + MemoryAllocator* allocator = nullptr) { + switch (uncompression_info.type()) { + case kSnappyCompression: + return Snappy_Uncompress(data, n, uncompressed_size, allocator); + case kZlibCompression: + return Zlib_Uncompress(uncompression_info, data, n, uncompressed_size, + compress_format_version, allocator); + case kBZip2Compression: + return BZip2_Uncompress(data, n, uncompressed_size, + compress_format_version, allocator); + case kLZ4Compression: + case kLZ4HCCompression: + return LZ4_Uncompress(uncompression_info, data, n, uncompressed_size, + compress_format_version, allocator); + case kXpressCompression: + // XPRESS allocates memory internally, thus no support for custom + // allocator. + return CacheAllocationPtr(XPRESS_Uncompress(data, n, uncompressed_size)); + case kZSTD: + case kZSTDNotFinalCompression: + return ZSTD_Uncompress(uncompression_info, data, n, uncompressed_size, + allocator); + default: + return CacheAllocationPtr(); + } +} + } // namespace ROCKSDB_NAMESPACE