Clean up CompressBlock/CompressBlockInternal a bit (#7249)

Summary:
The patch cleans up and refactors `CompressBlock` and `CompressBlockInternal` a bit.
In particular, it does the following:
* It renames `CompressBlockInternal` to `CompressData` and moves it to `util/compression.h`,
where other general compression-related utilities are located. This will facilitate reuse in the
BlobDB write path.
* The signature of the method is changed so it now takes `compression_format_version`
(similarly to the compression library specific methods) instead of `format_version` (which is
specific to the block based table).
* `GetCompressionFormatForVersion` no longer takes `compression_type` as a parameter.
This parameter was only used in a (not entirely up-to-date) assertion; also, removing it
eliminates the need to ensure this precondition holds at all call sites.
* Does some minor cleanup in `CompressBlock`, for instance, it is now possible to pass
only one of `sampled_output_fast` and `sampled_output_slow`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7249

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D23087278

Pulled By: ltamasi

fbshipit-source-id: e6316e45baed8b4e7de7c1780c90501c2a3439b3
This commit is contained in:
Levi Tamasi 2020-08-12 18:24:27 -07:00 committed by Facebook GitHub Bot
parent 1f9f630b27
commit 9d6f48ec1d
5 changed files with 97 additions and 107 deletions

View File

@ -1481,8 +1481,7 @@ TEST_P(CompressionFailuresTest, CompressionFailures) {
if (compression_failure_type_ == kTestCompressionFail) { if (compression_failure_type_ == kTestCompressionFail) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::CompressBlockInternal:TamperWithReturnValue", "CompressData:TamperWithReturnValue", [](void* arg) {
[](void* arg) {
bool* ret = static_cast<bool*>(arg); bool* ret = static_cast<bool*>(arg);
*ret = false; *ret = false;
}); });

View File

@ -105,63 +105,6 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
return compressed_size < raw_size - (raw_size / 8u); return compressed_size < raw_size - (raw_size / 8u);
} }
bool CompressBlockInternal(const Slice& raw,
const CompressionInfo& compression_info,
uint32_t format_version,
std::string* compressed_output) {
bool ret;
// Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough".
switch (compression_info.type()) {
case kSnappyCompression:
ret = Snappy_Compress(compression_info, raw.data(), raw.size(),
compressed_output);
break;
case kZlibCompression:
ret = Zlib_Compress(
compression_info,
GetCompressFormatForVersion(kZlibCompression, format_version),
raw.data(), raw.size(), compressed_output);
break;
case kBZip2Compression:
ret = BZip2_Compress(
compression_info,
GetCompressFormatForVersion(kBZip2Compression, format_version),
raw.data(), raw.size(), compressed_output);
break;
case kLZ4Compression:
ret = LZ4_Compress(
compression_info,
GetCompressFormatForVersion(kLZ4Compression, format_version),
raw.data(), raw.size(), compressed_output);
break;
case kLZ4HCCompression:
ret = LZ4HC_Compress(
compression_info,
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
raw.data(), raw.size(), compressed_output);
break;
case kXpressCompression:
ret = XPRESS_Compress(raw.data(), raw.size(), compressed_output);
break;
case kZSTD:
case kZSTDNotFinalCompression:
ret = ZSTD_Compress(compression_info, raw.data(), raw.size(),
compressed_output);
break;
default:
// Do not recognize this compression type
ret = false;
}
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::CompressBlockInternal:TamperWithReturnValue",
static_cast<void*>(&ret));
return ret;
}
} // namespace } // namespace
// format_version is the block format as defined in include/rocksdb/table.h // format_version is the block format as defined in include/rocksdb/table.h
@ -170,11 +113,9 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
bool do_sample, std::string* compressed_output, bool do_sample, std::string* compressed_output,
std::string* sampled_output_fast, std::string* sampled_output_fast,
std::string* sampled_output_slow) { std::string* sampled_output_slow) {
*type = info.type(); assert(type);
assert(compressed_output);
if (info.type() == kNoCompression && !info.SampleForCompression()) { assert(compressed_output->empty());
return raw;
}
// If requested, we sample one in every N block with a // If requested, we sample one in every N block with a
// fast and slow compression algorithm and report the stats. // fast and slow compression algorithm and report the stats.
@ -182,10 +123,10 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
// enabling compression and they also get a hint about which // enabling compression and they also get a hint about which
// compression algorithm wil be beneficial. // compression algorithm wil be beneficial.
if (do_sample && info.SampleForCompression() && if (do_sample && info.SampleForCompression() &&
Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) && Random::GetTLSInstance()->OneIn(
sampled_output_fast && sampled_output_slow) { static_cast<int>(info.SampleForCompression()))) {
// Sampling with a fast compression algorithm // Sampling with a fast compression algorithm
if (LZ4_Supported() || Snappy_Supported()) { if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) {
CompressionType c = CompressionType c =
LZ4_Supported() ? kLZ4Compression : kSnappyCompression; LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
CompressionContext context(c); CompressionContext context(c);
@ -194,33 +135,46 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
CompressionDict::GetEmptyDict(), c, CompressionDict::GetEmptyDict(), c,
info.SampleForCompression()); info.SampleForCompression());
CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast); CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
sampled_output_fast);
} }
// Sampling with a slow but high-compression algorithm // Sampling with a slow but high-compression algorithm
if (ZSTD_Supported() || Zlib_Supported()) { if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) {
CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression; CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
CompressionContext context(c); CompressionContext context(c);
CompressionOptions options; CompressionOptions options;
CompressionInfo info_tmp(options, context, CompressionInfo info_tmp(options, context,
CompressionDict::GetEmptyDict(), c, CompressionDict::GetEmptyDict(), c,
info.SampleForCompression()); info.SampleForCompression());
CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
sampled_output_slow);
} }
} }
// Actually compress the data if (info.type() == kNoCompression) {
if (*type != kNoCompression) { *type = kNoCompression;
if (CompressBlockInternal(raw, info, format_version, compressed_output) && return raw;
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
} }
// Compression method is not supported, or not good // Actually compress the data; if the compression method is not supported,
// compression ratio, so just fall back to uncompressed form. // or the compression fails etc., just fall back to uncompressed
*type = kNoCompression; if (!CompressData(raw, info, GetCompressFormatForVersion(format_version),
return raw; compressed_output)) {
*type = kNoCompression;
return raw;
}
// Check the compression ratio; if it's not good enough, just fall back to
// uncompressed
if (!GoodCompressionRatio(compressed_output->size(), raw.size())) {
*type = kNoCompression;
return raw;
}
*type = info.type();
return *compressed_output;
} }
// kBlockBasedTableMagicNumber was picked by running // kBlockBasedTableMagicNumber was picked by running

View File

@ -371,10 +371,9 @@ Status UncompressBlockContentsForCompressionType(
break; break;
} }
case kZlibCompression: case kZlibCompression:
ubuf = Zlib_Uncompress( ubuf = Zlib_Uncompress(uncompression_info, data, n, &decompress_size,
uncompression_info, data, n, &decompress_size, GetCompressFormatForVersion(format_version),
GetCompressFormatForVersion(kZlibCompression, format_version), allocator);
allocator);
if (!ubuf) { if (!ubuf) {
static char zlib_corrupt_msg[] = static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents"; "Zlib not supported or corrupted Zlib compressed block contents";
@ -383,10 +382,9 @@ Status UncompressBlockContentsForCompressionType(
*contents = BlockContents(std::move(ubuf), decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size);
break; break;
case kBZip2Compression: case kBZip2Compression:
ubuf = BZip2_Uncompress( ubuf = BZip2_Uncompress(data, n, &decompress_size,
data, n, &decompress_size, GetCompressFormatForVersion(format_version),
GetCompressFormatForVersion(kBZip2Compression, format_version), allocator);
allocator);
if (!ubuf) { if (!ubuf) {
static char bzip2_corrupt_msg[] = static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents"; "Bzip2 not supported or corrupted Bzip2 compressed block contents";
@ -395,10 +393,9 @@ Status UncompressBlockContentsForCompressionType(
*contents = BlockContents(std::move(ubuf), decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size);
break; break;
case kLZ4Compression: case kLZ4Compression:
ubuf = LZ4_Uncompress( ubuf = LZ4_Uncompress(uncompression_info, data, n, &decompress_size,
uncompression_info, data, n, &decompress_size, GetCompressFormatForVersion(format_version),
GetCompressFormatForVersion(kLZ4Compression, format_version), allocator);
allocator);
if (!ubuf) { if (!ubuf) {
static char lz4_corrupt_msg[] = static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents"; "LZ4 not supported or corrupted LZ4 compressed block contents";
@ -407,10 +404,9 @@ Status UncompressBlockContentsForCompressionType(
*contents = BlockContents(std::move(ubuf), decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size);
break; break;
case kLZ4HCCompression: case kLZ4HCCompression:
ubuf = LZ4_Uncompress( ubuf = LZ4_Uncompress(uncompression_info, data, n, &decompress_size,
uncompression_info, data, n, &decompress_size, GetCompressFormatForVersion(format_version),
GetCompressFormatForVersion(kLZ4HCCompression, format_version), allocator);
allocator);
if (!ubuf) { if (!ubuf) {
static char lz4hc_corrupt_msg[] = static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents"; "LZ4HC not supported or corrupted LZ4HC compressed block contents";

View File

@ -110,19 +110,11 @@ struct IndexValue {
std::string ToString(bool hex, bool have_first_key) const; std::string ToString(bool hex, bool have_first_key) const;
}; };
inline uint32_t GetCompressFormatForVersion(CompressionType compression_type, inline uint32_t GetCompressFormatForVersion(uint32_t format_version) {
uint32_t version) { // As of format_version 2, we encode compressed block with
#ifdef NDEBUG
(void)compression_type;
#endif
// snappy is not versioned
assert(compression_type != kSnappyCompression &&
compression_type != kXpressCompression &&
compression_type != kNoCompression);
// As of version 2, we encode compressed block with
// compress_format_version == 2. Before that, the version is 1. // compress_format_version == 2. Before that, the version is 1.
// DO NOT CHANGE THIS FUNCTION, it affects disk format // DO NOT CHANGE THIS FUNCTION, it affects disk format
return version >= 2 ? 2 : 1; return format_version >= 2 ? 2 : 1;
} }
inline bool BlockBasedTableSupportedVersion(uint32_t version) { inline bool BlockBasedTableSupportedVersion(uint32_t version) {

View File

@ -23,6 +23,7 @@
#include "memory/memory_allocator.h" #include "memory/memory_allocator.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "test_util/sync_point.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/compression_context_cache.h" #include "util/compression_context_cache.h"
#include "util/string_util.h" #include "util/string_util.h"
@ -1400,4 +1401,52 @@ inline std::string ZSTD_TrainDictionary(const std::string& samples,
#endif // ZSTD_VERSION_NUMBER >= 10103 #endif // ZSTD_VERSION_NUMBER >= 10103
} }
inline bool CompressData(const Slice& raw,
const CompressionInfo& compression_info,
uint32_t compress_format_version,
std::string* compressed_output) {
bool ret = false;
// Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough".
switch (compression_info.type()) {
case kSnappyCompression:
ret = Snappy_Compress(compression_info, raw.data(), raw.size(),
compressed_output);
break;
case kZlibCompression:
ret = Zlib_Compress(compression_info, compress_format_version, raw.data(),
raw.size(), compressed_output);
break;
case kBZip2Compression:
ret = BZip2_Compress(compression_info, compress_format_version,
raw.data(), raw.size(), compressed_output);
break;
case kLZ4Compression:
ret = LZ4_Compress(compression_info, compress_format_version, raw.data(),
raw.size(), compressed_output);
break;
case kLZ4HCCompression:
ret = LZ4HC_Compress(compression_info, compress_format_version,
raw.data(), raw.size(), compressed_output);
break;
case kXpressCompression:
ret = XPRESS_Compress(raw.data(), raw.size(), compressed_output);
break;
case kZSTD:
case kZSTDNotFinalCompression:
ret = ZSTD_Compress(compression_info, raw.data(), raw.size(),
compressed_output);
break;
default:
// Do not recognize this compression type
break;
}
TEST_SYNC_POINT_CALLBACK("CompressData:TamperWithReturnValue",
static_cast<void*>(&ret));
return ret;
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE