Digest ZSTD compression dictionary once per SST file (#4251)
Summary: In RocksDB, for a given SST file, all data blocks are compressed with the same dictionary. When we compress a block using the dictionary's raw bytes, the compression library first has to digest the dictionary to get it into a usable form. This digestion work is redundant and ideally should be done once per file. ZSTD offers APIs for the caller to create and reuse a digested dictionary object (`ZSTD_CDict`). In this PR, we call `ZSTD_createCDict` once per file to digest the raw bytes. Then we use `ZSTD_compress_usingCDict` to compress each data block using the pre-digested dictionary. Once the file's created `ZSTD_freeCDict` releases the resources held by the digested dictionary. There are a couple other changes included in this PR: - Changed the parameter object for (un)compression functions from `CompressionContext`/`UncompressionContext` to `CompressionInfo`/`UncompressionInfo`. This avoids the previous pattern, where `CompressionContext`/`UncompressionContext` had to be mutated before calling a (un)compression function depending on whether dictionary should be used. I felt that mutation was error-prone so eliminated it. - Added support for digested uncompression dictionaries (`ZSTD_DDict`) as well. However, this PR does not support reusing them across uncompression calls for the same file. That work is deferred to a later PR when we will store the `ZSTD_DDict` objects in block cache. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4251 Differential Revision: D9257078 Pulled By: ajkr fbshipit-source-id: 21b8cb6bbdd48e459f1c62343780ab66c0a64438
This commit is contained in:
parent
ee234e83e3
commit
6c40806e51
@ -103,19 +103,19 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
|
|||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
// format_version is the block format as defined in include/rocksdb/table.h
|
// format_version is the block format as defined in include/rocksdb/table.h
|
||||||
Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info,
|
||||||
CompressionType* type, uint32_t format_version,
|
CompressionType* type, uint32_t format_version,
|
||||||
std::string* compressed_output) {
|
std::string* compressed_output) {
|
||||||
*type = compression_ctx.type();
|
*type = compression_info.type();
|
||||||
if (compression_ctx.type() == kNoCompression) {
|
if (compression_info.type() == kNoCompression) {
|
||||||
return raw;
|
return raw;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Will return compressed block contents if (1) the compression method is
|
// Will return compressed block contents if (1) the compression method is
|
||||||
// supported in this platform and (2) the compression rate is "good enough".
|
// supported in this platform and (2) the compression rate is "good enough".
|
||||||
switch (compression_ctx.type()) {
|
switch (compression_info.type()) {
|
||||||
case kSnappyCompression:
|
case kSnappyCompression:
|
||||||
if (Snappy_Compress(compression_ctx, raw.data(), raw.size(),
|
if (Snappy_Compress(compression_info, raw.data(), raw.size(),
|
||||||
compressed_output) &&
|
compressed_output) &&
|
||||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||||
return *compressed_output;
|
return *compressed_output;
|
||||||
@ -123,7 +123,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
|||||||
break; // fall back to no compression.
|
break; // fall back to no compression.
|
||||||
case kZlibCompression:
|
case kZlibCompression:
|
||||||
if (Zlib_Compress(
|
if (Zlib_Compress(
|
||||||
compression_ctx,
|
compression_info,
|
||||||
GetCompressFormatForVersion(kZlibCompression, format_version),
|
GetCompressFormatForVersion(kZlibCompression, format_version),
|
||||||
raw.data(), raw.size(), compressed_output) &&
|
raw.data(), raw.size(), compressed_output) &&
|
||||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||||
@ -132,7 +132,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
|||||||
break; // fall back to no compression.
|
break; // fall back to no compression.
|
||||||
case kBZip2Compression:
|
case kBZip2Compression:
|
||||||
if (BZip2_Compress(
|
if (BZip2_Compress(
|
||||||
compression_ctx,
|
compression_info,
|
||||||
GetCompressFormatForVersion(kBZip2Compression, format_version),
|
GetCompressFormatForVersion(kBZip2Compression, format_version),
|
||||||
raw.data(), raw.size(), compressed_output) &&
|
raw.data(), raw.size(), compressed_output) &&
|
||||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||||
@ -141,7 +141,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
|||||||
break; // fall back to no compression.
|
break; // fall back to no compression.
|
||||||
case kLZ4Compression:
|
case kLZ4Compression:
|
||||||
if (LZ4_Compress(
|
if (LZ4_Compress(
|
||||||
compression_ctx,
|
compression_info,
|
||||||
GetCompressFormatForVersion(kLZ4Compression, format_version),
|
GetCompressFormatForVersion(kLZ4Compression, format_version),
|
||||||
raw.data(), raw.size(), compressed_output) &&
|
raw.data(), raw.size(), compressed_output) &&
|
||||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||||
@ -150,7 +150,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
|||||||
break; // fall back to no compression.
|
break; // fall back to no compression.
|
||||||
case kLZ4HCCompression:
|
case kLZ4HCCompression:
|
||||||
if (LZ4HC_Compress(
|
if (LZ4HC_Compress(
|
||||||
compression_ctx,
|
compression_info,
|
||||||
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
|
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
|
||||||
raw.data(), raw.size(), compressed_output) &&
|
raw.data(), raw.size(), compressed_output) &&
|
||||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||||
@ -166,7 +166,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
|||||||
break;
|
break;
|
||||||
case kZSTD:
|
case kZSTD:
|
||||||
case kZSTDNotFinalCompression:
|
case kZSTDNotFinalCompression:
|
||||||
if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(),
|
if (ZSTD_Compress(compression_info, raw.data(), raw.size(),
|
||||||
compressed_output) &&
|
compressed_output) &&
|
||||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||||
return *compressed_output;
|
return *compressed_output;
|
||||||
@ -260,8 +260,9 @@ struct BlockBasedTableBuilder::Rep {
|
|||||||
PartitionedIndexBuilder* p_index_builder_ = nullptr;
|
PartitionedIndexBuilder* p_index_builder_ = nullptr;
|
||||||
|
|
||||||
std::string last_key;
|
std::string last_key;
|
||||||
// Compression dictionary or nullptr
|
CompressionType compression_type;
|
||||||
const std::string* compression_dict;
|
CompressionOptions compression_opts;
|
||||||
|
CompressionDict compression_dict;
|
||||||
CompressionContext compression_ctx;
|
CompressionContext compression_ctx;
|
||||||
std::unique_ptr<UncompressionContext> verify_ctx;
|
std::unique_ptr<UncompressionContext> verify_ctx;
|
||||||
TableProperties props;
|
TableProperties props;
|
||||||
@ -312,8 +313,9 @@ struct BlockBasedTableBuilder::Rep {
|
|||||||
table_options.data_block_hash_table_util_ratio),
|
table_options.data_block_hash_table_util_ratio),
|
||||||
range_del_block(1 /* block_restart_interval */),
|
range_del_block(1 /* block_restart_interval */),
|
||||||
internal_prefix_transform(_moptions.prefix_extractor.get()),
|
internal_prefix_transform(_moptions.prefix_extractor.get()),
|
||||||
compression_dict(_compression_dict),
|
compression_type(_compression_type),
|
||||||
compression_ctx(_compression_type, _compression_opts),
|
compression_opts(_compression_opts),
|
||||||
|
compression_ctx(_compression_type),
|
||||||
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
|
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
|
||||||
!table_opt.block_align),
|
!table_opt.block_align),
|
||||||
compressed_cache_key_prefix_size(0),
|
compressed_cache_key_prefix_size(0),
|
||||||
@ -324,6 +326,15 @@ struct BlockBasedTableBuilder::Rep {
|
|||||||
column_family_name(_column_family_name),
|
column_family_name(_column_family_name),
|
||||||
creation_time(_creation_time),
|
creation_time(_creation_time),
|
||||||
oldest_key_time(_oldest_key_time) {
|
oldest_key_time(_oldest_key_time) {
|
||||||
|
if (_compression_dict != nullptr) {
|
||||||
|
compression_dict.Init(*_compression_dict,
|
||||||
|
CompressionDict::Mode::kCompression,
|
||||||
|
_compression_type, _compression_opts.level);
|
||||||
|
} else {
|
||||||
|
compression_dict.Init(Slice() /* dict */,
|
||||||
|
CompressionDict::Mode::kEmpty,
|
||||||
|
_compression_type, _compression_opts.level);
|
||||||
|
}
|
||||||
if (table_options.index_type ==
|
if (table_options.index_type ==
|
||||||
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
||||||
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
|
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
|
||||||
@ -354,7 +365,7 @@ struct BlockBasedTableBuilder::Rep {
|
|||||||
_moptions.prefix_extractor != nullptr));
|
_moptions.prefix_extractor != nullptr));
|
||||||
if (table_options.verify_compression) {
|
if (table_options.verify_compression) {
|
||||||
verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
|
verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
|
||||||
compression_ctx.type()));
|
compression_type));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -498,7 +509,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
|
|||||||
assert(ok());
|
assert(ok());
|
||||||
Rep* r = rep_;
|
Rep* r = rep_;
|
||||||
|
|
||||||
auto type = r->compression_ctx.type();
|
auto type = r->compression_type;
|
||||||
Slice block_contents;
|
Slice block_contents;
|
||||||
bool abort_compression = false;
|
bool abort_compression = false;
|
||||||
|
|
||||||
@ -506,24 +517,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
|
|||||||
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
|
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
|
||||||
|
|
||||||
if (raw_block_contents.size() < kCompressionSizeLimit) {
|
if (raw_block_contents.size() < kCompressionSizeLimit) {
|
||||||
Slice compression_dict;
|
CompressionInfo compression_info(
|
||||||
if (is_data_block && r->compression_dict && r->compression_dict->size()) {
|
r->compression_opts, r->compression_ctx,
|
||||||
r->compression_ctx.dict() = *r->compression_dict;
|
is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(),
|
||||||
if (r->table_options.verify_compression) {
|
r->compression_type);
|
||||||
assert(r->verify_ctx != nullptr);
|
|
||||||
r->verify_ctx->dict() = *r->compression_dict;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Clear dictionary
|
|
||||||
r->compression_ctx.dict() = Slice();
|
|
||||||
if (r->table_options.verify_compression) {
|
|
||||||
assert(r->verify_ctx != nullptr);
|
|
||||||
r->verify_ctx->dict() = Slice();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
block_contents =
|
block_contents =
|
||||||
CompressBlock(raw_block_contents, r->compression_ctx, &type,
|
CompressBlock(raw_block_contents, compression_info, &type,
|
||||||
r->table_options.format_version, &r->compressed_output);
|
r->table_options.format_version, &r->compressed_output);
|
||||||
|
|
||||||
// Some of the compression algorithms are known to be unreliable. If
|
// Some of the compression algorithms are known to be unreliable. If
|
||||||
@ -532,8 +531,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
|
|||||||
if (type != kNoCompression && r->table_options.verify_compression) {
|
if (type != kNoCompression && r->table_options.verify_compression) {
|
||||||
// Retrieve the uncompressed contents into a new buffer
|
// Retrieve the uncompressed contents into a new buffer
|
||||||
BlockContents contents;
|
BlockContents contents;
|
||||||
|
UncompressionInfo uncompression_info(
|
||||||
|
*r->verify_ctx,
|
||||||
|
is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(),
|
||||||
|
r->compression_type);
|
||||||
Status stat = UncompressBlockContentsForCompressionType(
|
Status stat = UncompressBlockContentsForCompressionType(
|
||||||
*r->verify_ctx, block_contents.data(), block_contents.size(),
|
uncompression_info, block_contents.data(), block_contents.size(),
|
||||||
&contents, r->table_options.format_version, r->ioptions);
|
&contents, r->table_options.format_version, r->ioptions);
|
||||||
|
|
||||||
if (stat.ok()) {
|
if (stat.ok()) {
|
||||||
@ -780,7 +783,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
|
|||||||
? rep_->ioptions.merge_operator->Name()
|
? rep_->ioptions.merge_operator->Name()
|
||||||
: "nullptr";
|
: "nullptr";
|
||||||
rep_->props.compression_name =
|
rep_->props.compression_name =
|
||||||
CompressionTypeToString(rep_->compression_ctx.type());
|
CompressionTypeToString(rep_->compression_type);
|
||||||
rep_->props.prefix_extractor_name =
|
rep_->props.prefix_extractor_name =
|
||||||
rep_->moptions.prefix_extractor != nullptr
|
rep_->moptions.prefix_extractor != nullptr
|
||||||
? rep_->moptions.prefix_extractor->Name()
|
? rep_->moptions.prefix_extractor->Name()
|
||||||
@ -829,10 +832,10 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
|
|||||||
|
|
||||||
void BlockBasedTableBuilder::WriteCompressionDictBlock(
|
void BlockBasedTableBuilder::WriteCompressionDictBlock(
|
||||||
MetaIndexBuilder* meta_index_builder) {
|
MetaIndexBuilder* meta_index_builder) {
|
||||||
if (rep_->compression_dict && rep_->compression_dict->size()) {
|
if (rep_->compression_dict.GetRawDict().size()) {
|
||||||
BlockHandle compression_dict_block_handle;
|
BlockHandle compression_dict_block_handle;
|
||||||
if (ok()) {
|
if (ok()) {
|
||||||
WriteRawBlock(*rep_->compression_dict, kNoCompression,
|
WriteRawBlock(rep_->compression_dict.GetRawDict(), kNoCompression,
|
||||||
&compression_dict_block_handle);
|
&compression_dict_block_handle);
|
||||||
}
|
}
|
||||||
if (ok()) {
|
if (ok()) {
|
||||||
|
@ -131,7 +131,7 @@ class BlockBasedTableBuilder : public TableBuilder {
|
|||||||
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
|
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
|
||||||
};
|
};
|
||||||
|
|
||||||
Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
|
||||||
CompressionType* type, uint32_t format_version,
|
CompressionType* type, uint32_t format_version,
|
||||||
std::string* compressed_output);
|
std::string* compressed_output);
|
||||||
|
|
||||||
|
@ -1226,9 +1226,12 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
|||||||
|
|
||||||
// Retrieve the uncompressed contents into a new buffer
|
// Retrieve the uncompressed contents into a new buffer
|
||||||
BlockContents contents;
|
BlockContents contents;
|
||||||
UncompressionContext uncompresssion_ctx(compressed_block->compression_type(),
|
UncompressionContext context(compressed_block->compression_type());
|
||||||
compression_dict);
|
CompressionDict dict;
|
||||||
s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
|
dict.Init(compression_dict, CompressionDict::Mode::kUncompression,
|
||||||
|
compressed_block->compression_type());
|
||||||
|
UncompressionInfo info(context, dict, compressed_block->compression_type());
|
||||||
|
s = UncompressBlockContents(info, compressed_block->data(),
|
||||||
compressed_block->size(), &contents,
|
compressed_block->size(), &contents,
|
||||||
format_version, ioptions);
|
format_version, ioptions);
|
||||||
|
|
||||||
@ -1301,11 +1304,13 @@ Status BlockBasedTable::PutDataBlockToCache(
|
|||||||
BlockContents contents;
|
BlockContents contents;
|
||||||
Statistics* statistics = ioptions.statistics;
|
Statistics* statistics = ioptions.statistics;
|
||||||
if (raw_block->compression_type() != kNoCompression) {
|
if (raw_block->compression_type() != kNoCompression) {
|
||||||
UncompressionContext uncompression_ctx(raw_block->compression_type(),
|
UncompressionContext context(raw_block->compression_type());
|
||||||
compression_dict);
|
CompressionDict dict;
|
||||||
s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
|
dict.Init(compression_dict, CompressionDict::Mode::kUncompression,
|
||||||
raw_block->size(), &contents, format_version,
|
raw_block->compression_type());
|
||||||
ioptions);
|
UncompressionInfo info(context, dict, raw_block->compression_type());
|
||||||
|
s = UncompressBlockContents(info, raw_block->data(), raw_block->size(),
|
||||||
|
&contents, format_version, ioptions);
|
||||||
}
|
}
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
delete raw_block;
|
delete raw_block;
|
||||||
|
@ -227,10 +227,13 @@ Status BlockFetcher::ReadBlockContents() {
|
|||||||
|
|
||||||
if (do_uncompress_ && compression_type != kNoCompression) {
|
if (do_uncompress_ && compression_type != kNoCompression) {
|
||||||
// compressed page, uncompress, update cache
|
// compressed page, uncompress, update cache
|
||||||
UncompressionContext uncompression_ctx(compression_type, compression_dict_);
|
UncompressionContext context(compression_type);
|
||||||
status_ =
|
CompressionDict dict;
|
||||||
UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_,
|
dict.Init(compression_dict_, CompressionDict::Mode::kUncompression,
|
||||||
contents_, footer_.version(), ioptions_);
|
compression_type);
|
||||||
|
UncompressionInfo info(context, dict, compression_type);
|
||||||
|
status_ = UncompressBlockContents(info, slice_.data(), block_size_,
|
||||||
|
contents_, footer_.version(), ioptions_);
|
||||||
} else {
|
} else {
|
||||||
GetBlockContents();
|
GetBlockContents();
|
||||||
}
|
}
|
||||||
|
@ -284,18 +284,18 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
|
|||||||
}
|
}
|
||||||
|
|
||||||
Status UncompressBlockContentsForCompressionType(
|
Status UncompressBlockContentsForCompressionType(
|
||||||
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
|
const UncompressionInfo& uncompression_info, const char* data, size_t n,
|
||||||
BlockContents* contents, uint32_t format_version,
|
BlockContents* contents, uint32_t format_version,
|
||||||
const ImmutableCFOptions& ioptions) {
|
const ImmutableCFOptions& ioptions) {
|
||||||
std::unique_ptr<char[]> ubuf;
|
std::unique_ptr<char[]> ubuf;
|
||||||
|
|
||||||
assert(uncompression_ctx.type() != kNoCompression &&
|
assert(uncompression_info.type() != kNoCompression &&
|
||||||
"Invalid compression type");
|
"Invalid compression type");
|
||||||
|
|
||||||
StopWatchNano timer(ioptions.env,
|
StopWatchNano timer(ioptions.env,
|
||||||
ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
|
ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
|
||||||
int decompress_size = 0;
|
int decompress_size = 0;
|
||||||
switch (uncompression_ctx.type()) {
|
switch (uncompression_info.type()) {
|
||||||
case kSnappyCompression: {
|
case kSnappyCompression: {
|
||||||
size_t ulength = 0;
|
size_t ulength = 0;
|
||||||
static char snappy_corrupt_msg[] =
|
static char snappy_corrupt_msg[] =
|
||||||
@ -312,7 +312,7 @@ Status UncompressBlockContentsForCompressionType(
|
|||||||
}
|
}
|
||||||
case kZlibCompression:
|
case kZlibCompression:
|
||||||
ubuf.reset(Zlib_Uncompress(
|
ubuf.reset(Zlib_Uncompress(
|
||||||
uncompression_ctx, data, n, &decompress_size,
|
uncompression_info, data, n, &decompress_size,
|
||||||
GetCompressFormatForVersion(kZlibCompression, format_version)));
|
GetCompressFormatForVersion(kZlibCompression, format_version)));
|
||||||
if (!ubuf) {
|
if (!ubuf) {
|
||||||
static char zlib_corrupt_msg[] =
|
static char zlib_corrupt_msg[] =
|
||||||
@ -336,7 +336,7 @@ Status UncompressBlockContentsForCompressionType(
|
|||||||
break;
|
break;
|
||||||
case kLZ4Compression:
|
case kLZ4Compression:
|
||||||
ubuf.reset(LZ4_Uncompress(
|
ubuf.reset(LZ4_Uncompress(
|
||||||
uncompression_ctx, data, n, &decompress_size,
|
uncompression_info, data, n, &decompress_size,
|
||||||
GetCompressFormatForVersion(kLZ4Compression, format_version)));
|
GetCompressFormatForVersion(kLZ4Compression, format_version)));
|
||||||
if (!ubuf) {
|
if (!ubuf) {
|
||||||
static char lz4_corrupt_msg[] =
|
static char lz4_corrupt_msg[] =
|
||||||
@ -348,7 +348,7 @@ Status UncompressBlockContentsForCompressionType(
|
|||||||
break;
|
break;
|
||||||
case kLZ4HCCompression:
|
case kLZ4HCCompression:
|
||||||
ubuf.reset(LZ4_Uncompress(
|
ubuf.reset(LZ4_Uncompress(
|
||||||
uncompression_ctx, data, n, &decompress_size,
|
uncompression_info, data, n, &decompress_size,
|
||||||
GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
|
GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
|
||||||
if (!ubuf) {
|
if (!ubuf) {
|
||||||
static char lz4hc_corrupt_msg[] =
|
static char lz4hc_corrupt_msg[] =
|
||||||
@ -370,7 +370,8 @@ Status UncompressBlockContentsForCompressionType(
|
|||||||
break;
|
break;
|
||||||
case kZSTD:
|
case kZSTD:
|
||||||
case kZSTDNotFinalCompression:
|
case kZSTDNotFinalCompression:
|
||||||
ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size));
|
ubuf.reset(
|
||||||
|
ZSTD_Uncompress(uncompression_info, data, n, &decompress_size));
|
||||||
if (!ubuf) {
|
if (!ubuf) {
|
||||||
static char zstd_corrupt_msg[] =
|
static char zstd_corrupt_msg[] =
|
||||||
"ZSTD not supported or corrupted ZSTD compressed block contents";
|
"ZSTD not supported or corrupted ZSTD compressed block contents";
|
||||||
@ -400,14 +401,14 @@ Status UncompressBlockContentsForCompressionType(
|
|||||||
// buffer is returned via 'result' and it is upto the caller to
|
// buffer is returned via 'result' and it is upto the caller to
|
||||||
// free this buffer.
|
// free this buffer.
|
||||||
// format_version is the block format as defined in include/rocksdb/table.h
|
// format_version is the block format as defined in include/rocksdb/table.h
|
||||||
Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
|
Status UncompressBlockContents(const UncompressionInfo& uncompression_info,
|
||||||
const char* data, size_t n,
|
const char* data, size_t n,
|
||||||
BlockContents* contents, uint32_t format_version,
|
BlockContents* contents, uint32_t format_version,
|
||||||
const ImmutableCFOptions& ioptions) {
|
const ImmutableCFOptions& ioptions) {
|
||||||
assert(data[n] != kNoCompression);
|
assert(data[n] != kNoCompression);
|
||||||
assert(data[n] == uncompression_ctx.type());
|
assert(data[n] == uncompression_info.type());
|
||||||
return UncompressBlockContentsForCompressionType(
|
return UncompressBlockContentsForCompressionType(
|
||||||
uncompression_ctx, data, n, contents, format_version, ioptions);
|
uncompression_info, data, n, contents, format_version, ioptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -250,16 +250,17 @@ extern Status ReadBlockContents(
|
|||||||
// free this buffer.
|
// free this buffer.
|
||||||
// For description of compress_format_version and possible values, see
|
// For description of compress_format_version and possible values, see
|
||||||
// util/compression.h
|
// util/compression.h
|
||||||
extern Status UncompressBlockContents(
|
extern Status UncompressBlockContents(const UncompressionInfo& info,
|
||||||
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
|
const char* data, size_t n,
|
||||||
BlockContents* contents, uint32_t compress_format_version,
|
BlockContents* contents,
|
||||||
const ImmutableCFOptions& ioptions);
|
uint32_t compress_format_version,
|
||||||
|
const ImmutableCFOptions& ioptions);
|
||||||
|
|
||||||
// This is an extension to UncompressBlockContents that accepts
|
// This is an extension to UncompressBlockContents that accepts
|
||||||
// a specific compression type. This is used by un-wrapped blocks
|
// a specific compression type. This is used by un-wrapped blocks
|
||||||
// with no compression header.
|
// with no compression header.
|
||||||
extern Status UncompressBlockContentsForCompressionType(
|
extern Status UncompressBlockContentsForCompressionType(
|
||||||
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
|
const UncompressionInfo& info, const char* data, size_t n,
|
||||||
BlockContents* contents, uint32_t compress_format_version,
|
BlockContents* contents, uint32_t compress_format_version,
|
||||||
const ImmutableCFOptions& ioptions);
|
const ImmutableCFOptions& ioptions);
|
||||||
|
|
||||||
|
@ -1991,28 +1991,28 @@ class Benchmark {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool CompressSlice(const CompressionContext& compression_ctx,
|
inline bool CompressSlice(const CompressionInfo& compression_info,
|
||||||
const Slice& input, std::string* compressed) {
|
const Slice& input, std::string* compressed) {
|
||||||
bool ok = true;
|
bool ok = true;
|
||||||
switch (FLAGS_compression_type_e) {
|
switch (FLAGS_compression_type_e) {
|
||||||
case rocksdb::kSnappyCompression:
|
case rocksdb::kSnappyCompression:
|
||||||
ok = Snappy_Compress(compression_ctx, input.data(), input.size(),
|
ok = Snappy_Compress(compression_info, input.data(), input.size(),
|
||||||
compressed);
|
compressed);
|
||||||
break;
|
break;
|
||||||
case rocksdb::kZlibCompression:
|
case rocksdb::kZlibCompression:
|
||||||
ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(),
|
ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
|
||||||
compressed);
|
compressed);
|
||||||
break;
|
break;
|
||||||
case rocksdb::kBZip2Compression:
|
case rocksdb::kBZip2Compression:
|
||||||
ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(),
|
ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
|
||||||
compressed);
|
compressed);
|
||||||
break;
|
break;
|
||||||
case rocksdb::kLZ4Compression:
|
case rocksdb::kLZ4Compression:
|
||||||
ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(),
|
ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
|
||||||
compressed);
|
compressed);
|
||||||
break;
|
break;
|
||||||
case rocksdb::kLZ4HCCompression:
|
case rocksdb::kLZ4HCCompression:
|
||||||
ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(),
|
ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
|
||||||
compressed);
|
compressed);
|
||||||
break;
|
break;
|
||||||
case rocksdb::kXpressCompression:
|
case rocksdb::kXpressCompression:
|
||||||
@ -2020,7 +2020,7 @@ class Benchmark {
|
|||||||
input.size(), compressed);
|
input.size(), compressed);
|
||||||
break;
|
break;
|
||||||
case rocksdb::kZSTD:
|
case rocksdb::kZSTD:
|
||||||
ok = ZSTD_Compress(compression_ctx, input.data(), input.size(),
|
ok = ZSTD_Compress(compression_info, input.data(), input.size(),
|
||||||
compressed);
|
compressed);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -2103,10 +2103,11 @@ class Benchmark {
|
|||||||
const int len = FLAGS_block_size;
|
const int len = FLAGS_block_size;
|
||||||
std::string input_str(len, 'y');
|
std::string input_str(len, 'y');
|
||||||
std::string compressed;
|
std::string compressed;
|
||||||
CompressionContext compression_ctx(FLAGS_compression_type_e,
|
CompressionOptions opts;
|
||||||
Options().compression_opts);
|
CompressionContext context(FLAGS_compression_type_e);
|
||||||
bool result =
|
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
|
||||||
CompressSlice(compression_ctx, Slice(input_str), &compressed);
|
FLAGS_compression_type_e);
|
||||||
|
bool result = CompressSlice(info, Slice(input_str), &compressed);
|
||||||
|
|
||||||
if (!result) {
|
if (!result) {
|
||||||
fprintf(stdout, "WARNING: %s compression is not enabled\n",
|
fprintf(stdout, "WARNING: %s compression is not enabled\n",
|
||||||
@ -2956,13 +2957,14 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
|||||||
int64_t produced = 0;
|
int64_t produced = 0;
|
||||||
bool ok = true;
|
bool ok = true;
|
||||||
std::string compressed;
|
std::string compressed;
|
||||||
CompressionContext compression_ctx(FLAGS_compression_type_e,
|
CompressionOptions opts;
|
||||||
Options().compression_opts);
|
CompressionContext context(FLAGS_compression_type_e);
|
||||||
|
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
|
||||||
|
FLAGS_compression_type_e);
|
||||||
// Compress 1G
|
// Compress 1G
|
||||||
while (ok && bytes < int64_t(1) << 30) {
|
while (ok && bytes < int64_t(1) << 30) {
|
||||||
compressed.clear();
|
compressed.clear();
|
||||||
ok = CompressSlice(compression_ctx, input, &compressed);
|
ok = CompressSlice(info, input, &compressed);
|
||||||
produced += compressed.size();
|
produced += compressed.size();
|
||||||
bytes += input.size();
|
bytes += input.size();
|
||||||
thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
|
thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
|
||||||
@ -2984,11 +2986,17 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
|||||||
Slice input = gen.Generate(FLAGS_block_size);
|
Slice input = gen.Generate(FLAGS_block_size);
|
||||||
std::string compressed;
|
std::string compressed;
|
||||||
|
|
||||||
|
CompressionContext compression_ctx(FLAGS_compression_type_e);
|
||||||
|
CompressionOptions compression_opts;
|
||||||
|
CompressionInfo compression_info(compression_opts, compression_ctx,
|
||||||
|
CompressionDict::GetEmptyDict(),
|
||||||
|
FLAGS_compression_type_e);
|
||||||
UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
|
UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
|
||||||
CompressionContext compression_ctx(FLAGS_compression_type_e,
|
UncompressionInfo uncompression_info(uncompression_ctx,
|
||||||
Options().compression_opts);
|
CompressionDict::GetEmptyDict(),
|
||||||
|
FLAGS_compression_type_e);
|
||||||
|
|
||||||
bool ok = CompressSlice(compression_ctx, input, &compressed);
|
bool ok = CompressSlice(compression_info, input, &compressed);
|
||||||
int64_t bytes = 0;
|
int64_t bytes = 0;
|
||||||
int decompress_size;
|
int decompress_size;
|
||||||
while (ok && bytes < 1024 * 1048576) {
|
while (ok && bytes < 1024 * 1048576) {
|
||||||
@ -3008,7 +3016,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case rocksdb::kZlibCompression:
|
case rocksdb::kZlibCompression:
|
||||||
uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
|
uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(),
|
||||||
compressed.size(), &decompress_size, 2);
|
compressed.size(), &decompress_size, 2);
|
||||||
ok = uncompressed != nullptr;
|
ok = uncompressed != nullptr;
|
||||||
break;
|
break;
|
||||||
@ -3018,12 +3026,12 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
|||||||
ok = uncompressed != nullptr;
|
ok = uncompressed != nullptr;
|
||||||
break;
|
break;
|
||||||
case rocksdb::kLZ4Compression:
|
case rocksdb::kLZ4Compression:
|
||||||
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
|
uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
|
||||||
compressed.size(), &decompress_size, 2);
|
compressed.size(), &decompress_size, 2);
|
||||||
ok = uncompressed != nullptr;
|
ok = uncompressed != nullptr;
|
||||||
break;
|
break;
|
||||||
case rocksdb::kLZ4HCCompression:
|
case rocksdb::kLZ4HCCompression:
|
||||||
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
|
uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
|
||||||
compressed.size(), &decompress_size, 2);
|
compressed.size(), &decompress_size, 2);
|
||||||
ok = uncompressed != nullptr;
|
ok = uncompressed != nullptr;
|
||||||
break;
|
break;
|
||||||
@ -3033,7 +3041,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
|||||||
ok = uncompressed != nullptr;
|
ok = uncompressed != nullptr;
|
||||||
break;
|
break;
|
||||||
case rocksdb::kZSTD:
|
case rocksdb::kZSTD:
|
||||||
uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
|
uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
|
||||||
compressed.size(), &decompress_size);
|
compressed.size(), &decompress_size);
|
||||||
ok = uncompressed != nullptr;
|
ok = uncompressed != nullptr;
|
||||||
break;
|
break;
|
||||||
|
@ -133,13 +133,147 @@ class ZSTDUncompressCachedData {
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
// Instantiate this class and pass it to the uncompression API below
|
// Holds dictionary and related data, like ZSTD's digested dictionary.
|
||||||
|
struct CompressionDict {
|
||||||
|
enum class Mode {
|
||||||
|
kUninit,
|
||||||
|
kEmpty, // An empty one can be used for both compression and uncompression
|
||||||
|
kCompression,
|
||||||
|
kUncompression,
|
||||||
|
};
|
||||||
|
#if ZSTD_VERSION_NUMBER >= 700
|
||||||
|
union {
|
||||||
|
ZSTD_CDict* zstd_cdict_;
|
||||||
|
ZSTD_DDict* zstd_ddict_;
|
||||||
|
};
|
||||||
|
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||||
|
Mode mode_ = Mode::kUninit;
|
||||||
|
Slice dict_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
static const CompressionDict& GetEmptyDict() {
|
||||||
|
static CompressionDict empty_dict{};
|
||||||
|
static bool init = false;
|
||||||
|
if (!init) {
|
||||||
|
empty_dict.Init(Slice() /* dict */, Mode::kEmpty,
|
||||||
|
false /* use_zstd_trainer */);
|
||||||
|
init = true;
|
||||||
|
}
|
||||||
|
return empty_dict;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Init(Slice dict, Mode mode, CompressionType type, int level = -1) {
|
||||||
|
return Init(dict, mode, type == kZSTD || type == kZSTDNotFinalCompression,
|
||||||
|
level);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
#if ZSTD_VERSION_NUMBER >= 700
|
||||||
|
void Init(Slice dict, Mode mode, bool use_zstd_trainer, int level = -1) {
|
||||||
|
#else // ZSTD_VERSION_NUMBER >= 700
|
||||||
|
void Init(Slice dict, Mode mode, bool /* use_zstd_trainer */,
|
||||||
|
int /*level*/ = -1) {
|
||||||
|
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||||
|
assert(mode_ == Mode::kUninit);
|
||||||
|
dict_ = std::move(dict);
|
||||||
|
mode_ = mode;
|
||||||
|
switch (mode) {
|
||||||
|
case Mode::kUninit:
|
||||||
|
assert(false);
|
||||||
|
break;
|
||||||
|
case Mode::kEmpty:
|
||||||
|
break;
|
||||||
|
case Mode::kCompression:
|
||||||
|
#if ZSTD_VERSION_NUMBER >= 700
|
||||||
|
zstd_cdict_ = nullptr;
|
||||||
|
if (!dict_.empty() && use_zstd_trainer) {
|
||||||
|
if (level == CompressionOptions::kDefaultCompressionLevel) {
|
||||||
|
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
|
||||||
|
// https://github.com/facebook/zstd/issues/1148
|
||||||
|
level = 3;
|
||||||
|
}
|
||||||
|
// Should be safe (but slower) if below call fails as we'll use the
|
||||||
|
// raw dictionary to compress.
|
||||||
|
zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level);
|
||||||
|
assert(zstd_cdict_ != nullptr);
|
||||||
|
}
|
||||||
|
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||||
|
break;
|
||||||
|
case Mode::kUncompression:
|
||||||
|
#if ZSTD_VERSION_NUMBER >= 700
|
||||||
|
zstd_ddict_ = nullptr;
|
||||||
|
if (!dict_.empty() && use_zstd_trainer) {
|
||||||
|
zstd_ddict_ = ZSTD_createDDict(dict_.data(), dict_.size());
|
||||||
|
assert(zstd_ddict_ != nullptr);
|
||||||
|
}
|
||||||
|
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
~CompressionDict() {
|
||||||
|
#if ZSTD_VERSION_NUMBER >= 700
|
||||||
|
size_t res = 0;
|
||||||
|
switch (mode_) {
|
||||||
|
case Mode::kUninit:
|
||||||
|
break;
|
||||||
|
case Mode::kEmpty:
|
||||||
|
break;
|
||||||
|
case Mode::kCompression:
|
||||||
|
if (zstd_cdict_ != nullptr) {
|
||||||
|
res = ZSTD_freeCDict(zstd_cdict_);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case Mode::kUncompression:
|
||||||
|
if (zstd_ddict_ != nullptr) {
|
||||||
|
res = ZSTD_freeDDict(zstd_ddict_);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
assert(res == 0); // Last I checked they can't fail
|
||||||
|
(void)res; // prevent unused var warning
|
||||||
|
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||||
|
}
|
||||||
|
|
||||||
|
#if ZSTD_VERSION_NUMBER >= 700
|
||||||
|
const ZSTD_CDict* GetDigestedZstdCDict() const {
|
||||||
|
assert(mode_ != Mode::kUninit);
|
||||||
|
if (mode_ == Mode::kEmpty) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
assert(mode_ == Mode::kCompression);
|
||||||
|
return zstd_cdict_;
|
||||||
|
}
|
||||||
|
|
||||||
|
const ZSTD_DDict* GetDigestedZstdDDict() const {
|
||||||
|
assert(mode_ != Mode::kUninit);
|
||||||
|
if (mode_ == Mode::kEmpty) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
assert(mode_ == Mode::kUncompression);
|
||||||
|
return zstd_ddict_;
|
||||||
|
}
|
||||||
|
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||||
|
|
||||||
|
Slice GetRawDict() const {
|
||||||
|
assert(mode_ != Mode::kUninit);
|
||||||
|
assert(mode_ != Mode::kEmpty || dict_.empty());
|
||||||
|
return dict_;
|
||||||
|
}
|
||||||
|
|
||||||
|
CompressionDict() = default;
|
||||||
|
// Disable copy/move
|
||||||
|
CompressionDict(const CompressionDict&) = delete;
|
||||||
|
CompressionDict& operator=(const CompressionDict&) = delete;
|
||||||
|
CompressionDict(CompressionDict&&) = delete;
|
||||||
|
CompressionDict& operator=(CompressionDict&&) = delete;
|
||||||
|
};
|
||||||
|
|
||||||
class CompressionContext {
|
class CompressionContext {
|
||||||
|
#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
|
||||||
private:
|
private:
|
||||||
const CompressionType type_;
|
const CompressionType type_;
|
||||||
const CompressionOptions opts_;
|
|
||||||
Slice dict_;
|
|
||||||
#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
|
|
||||||
ZSTD_CCtx* zstd_ctx_ = nullptr;
|
ZSTD_CCtx* zstd_ctx_ = nullptr;
|
||||||
void CreateNativeContext() {
|
void CreateNativeContext() {
|
||||||
if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) {
|
if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) {
|
||||||
@ -163,35 +297,45 @@ class CompressionContext {
|
|||||||
assert(type_ == kZSTD || type_ == kZSTDNotFinalCompression);
|
assert(type_ == kZSTD || type_ == kZSTDNotFinalCompression);
|
||||||
return zstd_ctx_;
|
return zstd_ctx_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
explicit CompressionContext(CompressionType comp_type) : type_(comp_type) {
|
||||||
|
CreateNativeContext();
|
||||||
|
}
|
||||||
|
|
||||||
#else // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
|
#else // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
|
||||||
|
public:
|
||||||
|
explicit CompressionContext(CompressionType /* comp_type */) {}
|
||||||
private:
|
private:
|
||||||
void CreateNativeContext() {}
|
void CreateNativeContext() {}
|
||||||
void DestroyNativeContext() {}
|
void DestroyNativeContext() {}
|
||||||
#endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
|
#endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
|
||||||
public:
|
public:
|
||||||
explicit CompressionContext(CompressionType comp_type) : type_(comp_type) {
|
|
||||||
CreateNativeContext();
|
|
||||||
}
|
|
||||||
CompressionContext(CompressionType comp_type, const CompressionOptions& opts,
|
|
||||||
const Slice& comp_dict = Slice())
|
|
||||||
: type_(comp_type), opts_(opts), dict_(comp_dict) {
|
|
||||||
CreateNativeContext();
|
|
||||||
}
|
|
||||||
~CompressionContext() { DestroyNativeContext(); }
|
~CompressionContext() { DestroyNativeContext(); }
|
||||||
CompressionContext(const CompressionContext&) = delete;
|
CompressionContext(const CompressionContext&) = delete;
|
||||||
CompressionContext& operator=(const CompressionContext&) = delete;
|
CompressionContext& operator=(const CompressionContext&) = delete;
|
||||||
|
|
||||||
const CompressionOptions& options() const { return opts_; }
|
|
||||||
CompressionType type() const { return type_; }
|
|
||||||
const Slice& dict() const { return dict_; }
|
|
||||||
Slice& dict() { return dict_; }
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Instantiate this class and pass it to the uncompression API below
|
class CompressionInfo {
|
||||||
|
const CompressionOptions& opts_;
|
||||||
|
const CompressionContext& context_;
|
||||||
|
const CompressionDict& dict_;
|
||||||
|
const CompressionType type_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
CompressionInfo(const CompressionOptions& _opts,
|
||||||
|
const CompressionContext& _context,
|
||||||
|
const CompressionDict& _dict, CompressionType _type)
|
||||||
|
: opts_(_opts), context_(_context), dict_(_dict), type_(_type) {}
|
||||||
|
|
||||||
|
const CompressionOptions& options() const { return opts_; }
|
||||||
|
const CompressionContext& context() const { return context_; }
|
||||||
|
const CompressionDict& dict() const { return dict_; }
|
||||||
|
CompressionType type() const { return type_; }
|
||||||
|
};
|
||||||
|
|
||||||
class UncompressionContext {
|
class UncompressionContext {
|
||||||
private:
|
private:
|
||||||
CompressionType type_;
|
const CompressionType type_;
|
||||||
Slice dict_;
|
|
||||||
CompressionContextCache* ctx_cache_ = nullptr;
|
CompressionContextCache* ctx_cache_ = nullptr;
|
||||||
ZSTDUncompressCachedData uncomp_cached_data_;
|
ZSTDUncompressCachedData uncomp_cached_data_;
|
||||||
|
|
||||||
@ -199,10 +343,8 @@ class UncompressionContext {
|
|||||||
struct NoCache {};
|
struct NoCache {};
|
||||||
// Do not use context cache, used by TableBuilder
|
// Do not use context cache, used by TableBuilder
|
||||||
UncompressionContext(NoCache, CompressionType comp_type) : type_(comp_type) {}
|
UncompressionContext(NoCache, CompressionType comp_type) : type_(comp_type) {}
|
||||||
explicit UncompressionContext(CompressionType comp_type)
|
|
||||||
: UncompressionContext(comp_type, Slice()) {}
|
explicit UncompressionContext(CompressionType comp_type) : type_(comp_type) {
|
||||||
UncompressionContext(CompressionType comp_type, const Slice& comp_dict)
|
|
||||||
: type_(comp_type), dict_(comp_dict) {
|
|
||||||
if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) {
|
if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) {
|
||||||
ctx_cache_ = CompressionContextCache::Instance();
|
ctx_cache_ = CompressionContextCache::Instance();
|
||||||
uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
|
uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
|
||||||
@ -222,9 +364,21 @@ class UncompressionContext {
|
|||||||
ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
|
ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
|
||||||
return uncomp_cached_data_.Get();
|
return uncomp_cached_data_.Get();
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class UncompressionInfo {
|
||||||
|
const UncompressionContext& context_;
|
||||||
|
const CompressionDict& dict_;
|
||||||
|
const CompressionType type_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
UncompressionInfo(const UncompressionContext& _context,
|
||||||
|
const CompressionDict& _dict, CompressionType _type)
|
||||||
|
: context_(_context), dict_(_dict), type_(_type) {}
|
||||||
|
|
||||||
|
const UncompressionContext& context() const { return context_; }
|
||||||
|
const CompressionDict& dict() const { return dict_; }
|
||||||
CompressionType type() const { return type_; }
|
CompressionType type() const { return type_; }
|
||||||
const Slice& dict() const { return dict_; }
|
|
||||||
Slice& dict() { return dict_; }
|
|
||||||
};
|
};
|
||||||
|
|
||||||
inline bool Snappy_Supported() {
|
inline bool Snappy_Supported() {
|
||||||
@ -343,9 +497,8 @@ inline std::string CompressionTypeToString(CompressionType compression_type) {
|
|||||||
// 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
|
// 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
|
||||||
// start of compressed block. Snappy format is the same as version 1.
|
// start of compressed block. Snappy format is the same as version 1.
|
||||||
|
|
||||||
inline bool Snappy_Compress(const CompressionContext& /*ctx*/,
|
inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input,
|
||||||
const char* input, size_t length,
|
size_t length, ::std::string* output) {
|
||||||
::std::string* output) {
|
|
||||||
#ifdef SNAPPY
|
#ifdef SNAPPY
|
||||||
output->resize(snappy::MaxCompressedLength(length));
|
output->resize(snappy::MaxCompressedLength(length));
|
||||||
size_t outlen;
|
size_t outlen;
|
||||||
@ -410,7 +563,7 @@ inline bool GetDecompressedSizeInfo(const char** input_data,
|
|||||||
// header in varint32 format
|
// header in varint32 format
|
||||||
// @param compression_dict Data for presetting the compression library's
|
// @param compression_dict Data for presetting the compression library's
|
||||||
// dictionary.
|
// dictionary.
|
||||||
inline bool Zlib_Compress(const CompressionContext& ctx,
|
inline bool Zlib_Compress(const CompressionInfo& info,
|
||||||
uint32_t compress_format_version, const char* input,
|
uint32_t compress_format_version, const char* input,
|
||||||
size_t length, ::std::string* output) {
|
size_t length, ::std::string* output) {
|
||||||
#ifdef ZLIB
|
#ifdef ZLIB
|
||||||
@ -435,24 +588,25 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
|
|||||||
// The default value is 8. See zconf.h for more details.
|
// The default value is 8. See zconf.h for more details.
|
||||||
static const int memLevel = 8;
|
static const int memLevel = 8;
|
||||||
int level;
|
int level;
|
||||||
if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
||||||
level = Z_DEFAULT_COMPRESSION;
|
level = Z_DEFAULT_COMPRESSION;
|
||||||
} else {
|
} else {
|
||||||
level = ctx.options().level;
|
level = info.options().level;
|
||||||
}
|
}
|
||||||
z_stream _stream;
|
z_stream _stream;
|
||||||
memset(&_stream, 0, sizeof(z_stream));
|
memset(&_stream, 0, sizeof(z_stream));
|
||||||
int st = deflateInit2(&_stream, level, Z_DEFLATED, ctx.options().window_bits,
|
int st = deflateInit2(&_stream, level, Z_DEFLATED, info.options().window_bits,
|
||||||
memLevel, ctx.options().strategy);
|
memLevel, info.options().strategy);
|
||||||
if (st != Z_OK) {
|
if (st != Z_OK) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx.dict().size()) {
|
Slice compression_dict = info.dict().GetRawDict();
|
||||||
|
if (compression_dict.size()) {
|
||||||
// Initialize the compression library's dictionary
|
// Initialize the compression library's dictionary
|
||||||
st = deflateSetDictionary(&_stream,
|
st = deflateSetDictionary(
|
||||||
reinterpret_cast<const Bytef*>(ctx.dict().data()),
|
&_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
|
||||||
static_cast<unsigned int>(ctx.dict().size()));
|
static_cast<unsigned int>(compression_dict.size()));
|
||||||
if (st != Z_OK) {
|
if (st != Z_OK) {
|
||||||
deflateEnd(&_stream);
|
deflateEnd(&_stream);
|
||||||
return false;
|
return false;
|
||||||
@ -480,7 +634,7 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
|
|||||||
deflateEnd(&_stream);
|
deflateEnd(&_stream);
|
||||||
return compressed;
|
return compressed;
|
||||||
#else
|
#else
|
||||||
(void)ctx;
|
(void)info;
|
||||||
(void)compress_format_version;
|
(void)compress_format_version;
|
||||||
(void)input;
|
(void)input;
|
||||||
(void)length;
|
(void)length;
|
||||||
@ -495,7 +649,7 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
|
|||||||
// header in varint32 format
|
// header in varint32 format
|
||||||
// @param compression_dict Data for presetting the compression library's
|
// @param compression_dict Data for presetting the compression library's
|
||||||
// dictionary.
|
// dictionary.
|
||||||
inline char* Zlib_Uncompress(const UncompressionContext& ctx,
|
inline char* Zlib_Uncompress(const UncompressionInfo& info,
|
||||||
const char* input_data, size_t input_length,
|
const char* input_data, size_t input_length,
|
||||||
int* decompress_size,
|
int* decompress_size,
|
||||||
uint32_t compress_format_version,
|
uint32_t compress_format_version,
|
||||||
@ -528,11 +682,12 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx,
|
|||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx.dict().size()) {
|
Slice compression_dict = info.dict().GetRawDict();
|
||||||
|
if (compression_dict.size()) {
|
||||||
// Initialize the compression library's dictionary
|
// Initialize the compression library's dictionary
|
||||||
st = inflateSetDictionary(&_stream,
|
st = inflateSetDictionary(
|
||||||
reinterpret_cast<const Bytef*>(ctx.dict().data()),
|
&_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
|
||||||
static_cast<unsigned int>(ctx.dict().size()));
|
static_cast<unsigned int>(compression_dict.size()));
|
||||||
if (st != Z_OK) {
|
if (st != Z_OK) {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
@ -585,7 +740,7 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx,
|
|||||||
inflateEnd(&_stream);
|
inflateEnd(&_stream);
|
||||||
return output;
|
return output;
|
||||||
#else
|
#else
|
||||||
(void)ctx;
|
(void)info;
|
||||||
(void)input_data;
|
(void)input_data;
|
||||||
(void)input_length;
|
(void)input_length;
|
||||||
(void)decompress_size;
|
(void)decompress_size;
|
||||||
@ -599,7 +754,7 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx,
|
|||||||
// block header
|
// block header
|
||||||
// compress_format_version == 2 -- decompressed size is included in the block
|
// compress_format_version == 2 -- decompressed size is included in the block
|
||||||
// header in varint32 format
|
// header in varint32 format
|
||||||
inline bool BZip2_Compress(const CompressionContext& /*ctx*/,
|
inline bool BZip2_Compress(const CompressionInfo& /*info*/,
|
||||||
uint32_t compress_format_version, const char* input,
|
uint32_t compress_format_version, const char* input,
|
||||||
size_t length, ::std::string* output) {
|
size_t length, ::std::string* output) {
|
||||||
#ifdef BZIP2
|
#ifdef BZIP2
|
||||||
@ -746,7 +901,7 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
|
|||||||
// header in varint32 format
|
// header in varint32 format
|
||||||
// @param compression_dict Data for presetting the compression library's
|
// @param compression_dict Data for presetting the compression library's
|
||||||
// dictionary.
|
// dictionary.
|
||||||
inline bool LZ4_Compress(const CompressionContext& ctx,
|
inline bool LZ4_Compress(const CompressionInfo& info,
|
||||||
uint32_t compress_format_version, const char* input,
|
uint32_t compress_format_version, const char* input,
|
||||||
size_t length, ::std::string* output) {
|
size_t length, ::std::string* output) {
|
||||||
#ifdef LZ4
|
#ifdef LZ4
|
||||||
@ -774,9 +929,10 @@ inline bool LZ4_Compress(const CompressionContext& ctx,
|
|||||||
int outlen;
|
int outlen;
|
||||||
#if LZ4_VERSION_NUMBER >= 10400 // r124+
|
#if LZ4_VERSION_NUMBER >= 10400 // r124+
|
||||||
LZ4_stream_t* stream = LZ4_createStream();
|
LZ4_stream_t* stream = LZ4_createStream();
|
||||||
if (ctx.dict().size()) {
|
Slice compression_dict = info.dict().GetRawDict();
|
||||||
LZ4_loadDict(stream, ctx.dict().data(),
|
if (compression_dict.size()) {
|
||||||
static_cast<int>(ctx.dict().size()));
|
LZ4_loadDict(stream, compression_dict.data(),
|
||||||
|
static_cast<int>(compression_dict.size()));
|
||||||
}
|
}
|
||||||
#if LZ4_VERSION_NUMBER >= 10700 // r129+
|
#if LZ4_VERSION_NUMBER >= 10700 // r129+
|
||||||
outlen =
|
outlen =
|
||||||
@ -799,7 +955,7 @@ inline bool LZ4_Compress(const CompressionContext& ctx,
|
|||||||
output->resize(static_cast<size_t>(output_header_len + outlen));
|
output->resize(static_cast<size_t>(output_header_len + outlen));
|
||||||
return true;
|
return true;
|
||||||
#else // LZ4
|
#else // LZ4
|
||||||
(void)ctx;
|
(void)info;
|
||||||
(void)compress_format_version;
|
(void)compress_format_version;
|
||||||
(void)input;
|
(void)input;
|
||||||
(void)length;
|
(void)length;
|
||||||
@ -814,7 +970,7 @@ inline bool LZ4_Compress(const CompressionContext& ctx,
|
|||||||
// header in varint32 format
|
// header in varint32 format
|
||||||
// @param compression_dict Data for presetting the compression library's
|
// @param compression_dict Data for presetting the compression library's
|
||||||
// dictionary.
|
// dictionary.
|
||||||
inline char* LZ4_Uncompress(const UncompressionContext& ctx,
|
inline char* LZ4_Uncompress(const UncompressionInfo& info,
|
||||||
const char* input_data, size_t input_length,
|
const char* input_data, size_t input_length,
|
||||||
int* decompress_size,
|
int* decompress_size,
|
||||||
uint32_t compress_format_version) {
|
uint32_t compress_format_version) {
|
||||||
@ -840,9 +996,10 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx,
|
|||||||
char* output = new char[output_len];
|
char* output = new char[output_len];
|
||||||
#if LZ4_VERSION_NUMBER >= 10400 // r124+
|
#if LZ4_VERSION_NUMBER >= 10400 // r124+
|
||||||
LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
|
LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
|
||||||
if (ctx.dict().size()) {
|
Slice compression_dict = info.dict().GetRawDict();
|
||||||
LZ4_setStreamDecode(stream, ctx.dict().data(),
|
if (compression_dict.size()) {
|
||||||
static_cast<int>(ctx.dict().size()));
|
LZ4_setStreamDecode(stream, compression_dict.data(),
|
||||||
|
static_cast<int>(compression_dict.size()));
|
||||||
}
|
}
|
||||||
*decompress_size = LZ4_decompress_safe_continue(
|
*decompress_size = LZ4_decompress_safe_continue(
|
||||||
stream, input_data, output, static_cast<int>(input_length),
|
stream, input_data, output, static_cast<int>(input_length),
|
||||||
@ -861,7 +1018,7 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx,
|
|||||||
assert(*decompress_size == static_cast<int>(output_len));
|
assert(*decompress_size == static_cast<int>(output_len));
|
||||||
return output;
|
return output;
|
||||||
#else // LZ4
|
#else // LZ4
|
||||||
(void)ctx;
|
(void)info;
|
||||||
(void)input_data;
|
(void)input_data;
|
||||||
(void)input_length;
|
(void)input_length;
|
||||||
(void)decompress_size;
|
(void)decompress_size;
|
||||||
@ -876,7 +1033,7 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx,
|
|||||||
// header in varint32 format
|
// header in varint32 format
|
||||||
// @param compression_dict Data for presetting the compression library's
|
// @param compression_dict Data for presetting the compression library's
|
||||||
// dictionary.
|
// dictionary.
|
||||||
inline bool LZ4HC_Compress(const CompressionContext& ctx,
|
inline bool LZ4HC_Compress(const CompressionInfo& info,
|
||||||
uint32_t compress_format_version, const char* input,
|
uint32_t compress_format_version, const char* input,
|
||||||
size_t length, ::std::string* output) {
|
size_t length, ::std::string* output) {
|
||||||
#ifdef LZ4
|
#ifdef LZ4
|
||||||
@ -903,17 +1060,18 @@ inline bool LZ4HC_Compress(const CompressionContext& ctx,
|
|||||||
|
|
||||||
int outlen;
|
int outlen;
|
||||||
int level;
|
int level;
|
||||||
if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
||||||
level = 0; // lz4hc.h says any value < 1 will be sanitized to default
|
level = 0; // lz4hc.h says any value < 1 will be sanitized to default
|
||||||
} else {
|
} else {
|
||||||
level = ctx.options().level;
|
level = info.options().level;
|
||||||
}
|
}
|
||||||
#if LZ4_VERSION_NUMBER >= 10400 // r124+
|
#if LZ4_VERSION_NUMBER >= 10400 // r124+
|
||||||
LZ4_streamHC_t* stream = LZ4_createStreamHC();
|
LZ4_streamHC_t* stream = LZ4_createStreamHC();
|
||||||
LZ4_resetStreamHC(stream, level);
|
LZ4_resetStreamHC(stream, level);
|
||||||
|
Slice compression_dict = info.dict().GetRawDict();
|
||||||
const char* compression_dict_data =
|
const char* compression_dict_data =
|
||||||
ctx.dict().size() > 0 ? ctx.dict().data() : nullptr;
|
compression_dict.size() > 0 ? compression_dict.data() : nullptr;
|
||||||
size_t compression_dict_size = ctx.dict().size();
|
size_t compression_dict_size = compression_dict.size();
|
||||||
LZ4_loadDictHC(stream, compression_dict_data,
|
LZ4_loadDictHC(stream, compression_dict_data,
|
||||||
static_cast<int>(compression_dict_size));
|
static_cast<int>(compression_dict_size));
|
||||||
|
|
||||||
@ -944,7 +1102,7 @@ inline bool LZ4HC_Compress(const CompressionContext& ctx,
|
|||||||
output->resize(static_cast<size_t>(output_header_len + outlen));
|
output->resize(static_cast<size_t>(output_header_len + outlen));
|
||||||
return true;
|
return true;
|
||||||
#else // LZ4
|
#else // LZ4
|
||||||
(void)ctx;
|
(void)info;
|
||||||
(void)compress_format_version;
|
(void)compress_format_version;
|
||||||
(void)input;
|
(void)input;
|
||||||
(void)length;
|
(void)length;
|
||||||
@ -978,9 +1136,7 @@ inline char* XPRESS_Uncompress(const char* /*input_data*/,
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// @param compression_dict Data for presetting the compression library's
|
inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
|
||||||
// dictionary.
|
|
||||||
inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
|
|
||||||
size_t length, ::std::string* output) {
|
size_t length, ::std::string* output) {
|
||||||
#ifdef ZSTD
|
#ifdef ZSTD
|
||||||
if (length > std::numeric_limits<uint32_t>::max()) {
|
if (length > std::numeric_limits<uint32_t>::max()) {
|
||||||
@ -995,19 +1151,29 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
|
|||||||
output->resize(static_cast<size_t>(output_header_len + compressBound));
|
output->resize(static_cast<size_t>(output_header_len + compressBound));
|
||||||
size_t outlen = 0;
|
size_t outlen = 0;
|
||||||
int level;
|
int level;
|
||||||
if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
||||||
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
|
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
|
||||||
// https://github.com/facebook/zstd/issues/1148
|
// https://github.com/facebook/zstd/issues/1148
|
||||||
level = 3;
|
level = 3;
|
||||||
} else {
|
} else {
|
||||||
level = ctx.options().level;
|
level = info.options().level;
|
||||||
}
|
}
|
||||||
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
|
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
|
||||||
ZSTD_CCtx* context = ctx.ZSTDPreallocCtx();
|
ZSTD_CCtx* context = info.context().ZSTDPreallocCtx();
|
||||||
assert(context != nullptr);
|
assert(context != nullptr);
|
||||||
outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len],
|
#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+
|
||||||
compressBound, input, length,
|
if (info.dict().GetDigestedZstdCDict() != nullptr) {
|
||||||
ctx.dict().data(), ctx.dict().size(), level);
|
outlen = ZSTD_compress_usingCDict(context, &(*output)[output_header_len],
|
||||||
|
compressBound, input, length,
|
||||||
|
info.dict().GetDigestedZstdCDict());
|
||||||
|
}
|
||||||
|
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||||
|
if (outlen == 0) {
|
||||||
|
outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len],
|
||||||
|
compressBound, input, length,
|
||||||
|
info.dict().GetRawDict().data(),
|
||||||
|
info.dict().GetRawDict().size(), level);
|
||||||
|
}
|
||||||
#else // up to v0.4.x
|
#else // up to v0.4.x
|
||||||
outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input,
|
outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input,
|
||||||
length, level);
|
length, level);
|
||||||
@ -1018,7 +1184,7 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
|
|||||||
output->resize(output_header_len + outlen);
|
output->resize(output_header_len + outlen);
|
||||||
return true;
|
return true;
|
||||||
#else // ZSTD
|
#else // ZSTD
|
||||||
(void)ctx;
|
(void)info;
|
||||||
(void)input;
|
(void)input;
|
||||||
(void)length;
|
(void)length;
|
||||||
(void)output;
|
(void)output;
|
||||||
@ -1028,7 +1194,7 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
|
|||||||
|
|
||||||
// @param compression_dict Data for presetting the compression library's
|
// @param compression_dict Data for presetting the compression library's
|
||||||
// dictionary.
|
// dictionary.
|
||||||
inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
|
inline char* ZSTD_Uncompress(const UncompressionInfo& info,
|
||||||
const char* input_data, size_t input_length,
|
const char* input_data, size_t input_length,
|
||||||
int* decompress_size) {
|
int* decompress_size) {
|
||||||
#ifdef ZSTD
|
#ifdef ZSTD
|
||||||
@ -1039,14 +1205,24 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
|
|||||||
}
|
}
|
||||||
|
|
||||||
char* output = new char[output_len];
|
char* output = new char[output_len];
|
||||||
size_t actual_output_length;
|
size_t actual_output_length = 0;
|
||||||
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
|
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
|
||||||
ZSTD_DCtx* context = ctx.GetZSTDContext();
|
ZSTD_DCtx* context = info.context().GetZSTDContext();
|
||||||
assert(context != nullptr);
|
assert(context != nullptr);
|
||||||
actual_output_length = ZSTD_decompress_usingDict(
|
#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+
|
||||||
context, output, output_len, input_data, input_length, ctx.dict().data(),
|
if (info.dict().GetDigestedZstdDDict() != nullptr) {
|
||||||
ctx.dict().size());
|
actual_output_length = ZSTD_decompress_usingDDict(
|
||||||
|
context, output, output_len, input_data, input_length,
|
||||||
|
info.dict().GetDigestedZstdDDict());
|
||||||
|
}
|
||||||
|
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||||
|
if (actual_output_length == 0) {
|
||||||
|
actual_output_length = ZSTD_decompress_usingDict(
|
||||||
|
context, output, output_len, input_data, input_length,
|
||||||
|
info.dict().GetRawDict().data(), info.dict().GetRawDict().size());
|
||||||
|
}
|
||||||
#else // up to v0.4.x
|
#else // up to v0.4.x
|
||||||
|
(void) info;
|
||||||
actual_output_length =
|
actual_output_length =
|
||||||
ZSTD_decompress(output, output_len, input_data, input_length);
|
ZSTD_decompress(output, output_len, input_data, input_length);
|
||||||
#endif // ZSTD_VERSION_NUMBER >= 500
|
#endif // ZSTD_VERSION_NUMBER >= 500
|
||||||
@ -1054,7 +1230,7 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
|
|||||||
*decompress_size = static_cast<int>(actual_output_length);
|
*decompress_size = static_cast<int>(actual_output_length);
|
||||||
return output;
|
return output;
|
||||||
#else // ZSTD
|
#else // ZSTD
|
||||||
(void)ctx;
|
(void)info;
|
||||||
(void)input_data;
|
(void)input_data;
|
||||||
(void)input_length;
|
(void)input_length;
|
||||||
(void)decompress_size;
|
(void)decompress_size;
|
||||||
@ -1069,6 +1245,10 @@ inline std::string ZSTD_TrainDictionary(const std::string& samples,
|
|||||||
// available for dynamic linking until v1.1.3. For now we enable the feature
|
// available for dynamic linking until v1.1.3. For now we enable the feature
|
||||||
// in v1.1.3+ only.
|
// in v1.1.3+ only.
|
||||||
#if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+
|
#if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+
|
||||||
|
assert(samples.empty() == sample_lens.empty());
|
||||||
|
if (samples.empty()) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
std::string dict_data(max_dict_bytes, '\0');
|
std::string dict_data(max_dict_bytes, '\0');
|
||||||
size_t dict_len = ZDICT_trainFromBuffer(
|
size_t dict_len = ZDICT_trainFromBuffer(
|
||||||
&dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0],
|
&dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0],
|
||||||
|
@ -771,9 +771,11 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
|||||||
return raw;
|
return raw;
|
||||||
}
|
}
|
||||||
StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
|
StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
|
||||||
CompressionType ct = bdb_options_.compression;
|
CompressionType type = bdb_options_.compression;
|
||||||
CompressionContext compression_ctx(ct);
|
CompressionOptions opts;
|
||||||
CompressBlock(raw, compression_ctx, &ct, kBlockBasedTableVersionFormat,
|
CompressionContext context(type);
|
||||||
|
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type);
|
||||||
|
CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat,
|
||||||
compression_output);
|
compression_output);
|
||||||
return *compression_output;
|
return *compression_output;
|
||||||
}
|
}
|
||||||
@ -1106,9 +1108,11 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
|||||||
{
|
{
|
||||||
StopWatch decompression_sw(env_, statistics_,
|
StopWatch decompression_sw(env_, statistics_,
|
||||||
BLOB_DB_DECOMPRESSION_MICROS);
|
BLOB_DB_DECOMPRESSION_MICROS);
|
||||||
UncompressionContext uncompression_ctx(bfile->compression());
|
UncompressionContext context(bfile->compression());
|
||||||
|
UncompressionInfo info(context, CompressionDict::GetEmptyDict(),
|
||||||
|
bfile->compression());
|
||||||
s = UncompressBlockContentsForCompressionType(
|
s = UncompressBlockContentsForCompressionType(
|
||||||
uncompression_ctx, blob_value.data(), blob_value.size(), &contents,
|
info, blob_value.data(), blob_value.size(), &contents,
|
||||||
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
|
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
|
||||||
}
|
}
|
||||||
value->PinSelf(contents.data);
|
value->PinSelf(contents.data);
|
||||||
|
@ -208,9 +208,11 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
|
|||||||
if (compression != kNoCompression &&
|
if (compression != kNoCompression &&
|
||||||
(show_uncompressed_blob != DisplayType::kNone || show_summary)) {
|
(show_uncompressed_blob != DisplayType::kNone || show_summary)) {
|
||||||
BlockContents contents;
|
BlockContents contents;
|
||||||
UncompressionContext uncompression_ctx(compression);
|
UncompressionContext context(compression);
|
||||||
|
UncompressionInfo info(context, CompressionDict::GetEmptyDict(),
|
||||||
|
compression);
|
||||||
s = UncompressBlockContentsForCompressionType(
|
s = UncompressBlockContentsForCompressionType(
|
||||||
uncompression_ctx, slice.data() + key_size, value_size, &contents,
|
info, slice.data() + key_size, value_size, &contents,
|
||||||
2 /*compress_format_version*/, ImmutableCFOptions(Options()));
|
2 /*compress_format_version*/, ImmutableCFOptions(Options()));
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
|
@ -89,8 +89,9 @@ void ColumnAwareEncodingReader::DecodeBlocks(
|
|||||||
CompressionType type =
|
CompressionType type =
|
||||||
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
|
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
|
||||||
if (type != kNoCompression) {
|
if (type != kNoCompression) {
|
||||||
UncompressionContext uncompression_ctx(type);
|
UncompressionContext context(type);
|
||||||
UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(),
|
UncompressionInfo info(context, CompressionDict::GetEmptyDict(), type);
|
||||||
|
UncompressBlockContents(info, slice_final_with_bit.c_str(),
|
||||||
slice_final_with_bit.size() - 1, &contents,
|
slice_final_with_bit.size() - 1, &contents,
|
||||||
format_version, ioptions);
|
format_version, ioptions);
|
||||||
content_ptr = contents.data.data();
|
content_ptr = contents.data.data();
|
||||||
@ -171,8 +172,9 @@ void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat(
|
|||||||
CompressionType type =
|
CompressionType type =
|
||||||
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
|
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
|
||||||
if (type != kNoCompression) {
|
if (type != kNoCompression) {
|
||||||
UncompressionContext uncompression_ctx(type);
|
UncompressionContext context(type);
|
||||||
UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(),
|
UncompressionInfo info(context, CompressionDict::GetEmptyDict(), type);
|
||||||
|
UncompressBlockContents(info, slice_final_with_bit.c_str(),
|
||||||
slice_final_with_bit.size() - 1, &contents,
|
slice_final_with_bit.size() - 1, &contents,
|
||||||
format_version, ioptions);
|
format_version, ioptions);
|
||||||
decoded_content = std::string(contents.data.data(), contents.data.size());
|
decoded_content = std::string(contents.data.data(), contents.data.size());
|
||||||
@ -243,10 +245,12 @@ namespace {
|
|||||||
|
|
||||||
void CompressDataBlock(const std::string& output_content, Slice* slice_final,
|
void CompressDataBlock(const std::string& output_content, Slice* slice_final,
|
||||||
CompressionType* type, std::string* compressed_output) {
|
CompressionType* type, std::string* compressed_output) {
|
||||||
CompressionContext compression_ctx(*type);
|
CompressionContext context(*type);
|
||||||
|
CompressionOptions opts;
|
||||||
|
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), *type);
|
||||||
uint32_t format_version = 2; // hard-coded version
|
uint32_t format_version = 2; // hard-coded version
|
||||||
*slice_final = CompressBlock(output_content, compression_ctx, type,
|
*slice_final = CompressBlock(output_content, info, type, format_version,
|
||||||
format_version, compressed_output);
|
compressed_output);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
Loading…
x
Reference in New Issue
Block a user