Digest ZSTD compression dictionary once when writing SST file (#4849)
Summary: This is essentially a re-submission of #4251 with a few improvements: - Split `CompressionDict` into two separate classes: `CompressionDict` and `UncompressionDict` - Eliminated `Init` functions. Instead do all initialization work in constructors. - Added test case for parallel DB open, which is the scenario where #4251 failed under TSAN. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4849 Differential Revision: D13606039 Pulled By: ajkr fbshipit-source-id: 08c236059798c710db9cbf545fce0f371232d447
This commit is contained in:
parent
b1ad6ebba8
commit
01013ae766
@ -129,14 +129,10 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
|
||||
}
|
||||
}
|
||||
if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
|
||||
if (!CompressionTypeSupported(CompressionType::kZSTD)) {
|
||||
// Dictionary trainer is available since v0.6.1, but ZSTD was marked
|
||||
// stable only since v0.8.0. For now we enable the feature in stable
|
||||
// versions only.
|
||||
if (!ZSTD_TrainDictionarySupported()) {
|
||||
return Status::InvalidArgument(
|
||||
"zstd dictionary trainer cannot be used because " +
|
||||
CompressionTypeToString(CompressionType::kZSTD) +
|
||||
" is not linked with the binary.");
|
||||
"zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
|
||||
"is not linked with the binary.");
|
||||
}
|
||||
if (cf_options.compression_opts.max_dict_bytes == 0) {
|
||||
return Status::InvalidArgument(
|
||||
|
@ -3214,6 +3214,57 @@ TEST_F(DBTest2, TestCompactFiles) {
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// TODO: figure out why this test fails in appveyor
|
||||
#ifndef OS_WIN
|
||||
TEST_F(DBTest2, MultiDBParallelOpenTest) {
|
||||
const int kNumDbs = 2;
|
||||
Options options = CurrentOptions();
|
||||
std::vector<std::string> dbnames;
|
||||
for (int i = 0; i < kNumDbs; ++i) {
|
||||
dbnames.emplace_back(test::TmpDir(env_) + "/db" + ToString(i));
|
||||
ASSERT_OK(DestroyDB(dbnames.back(), options));
|
||||
}
|
||||
|
||||
// Verify empty DBs can be created in parallel
|
||||
std::vector<std::thread> open_threads;
|
||||
std::vector<DB*> dbs{static_cast<unsigned int>(kNumDbs), nullptr};
|
||||
options.create_if_missing = true;
|
||||
for (int i = 0; i < kNumDbs; ++i) {
|
||||
open_threads.emplace_back(
|
||||
[&](int dbnum) {
|
||||
ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
|
||||
},
|
||||
i);
|
||||
}
|
||||
|
||||
// Now add some data and close, so next we can verify non-empty DBs can be
|
||||
// recovered in parallel
|
||||
for (int i = 0; i < kNumDbs; ++i) {
|
||||
open_threads[i].join();
|
||||
ASSERT_OK(dbs[i]->Put(WriteOptions(), "xi", "gua"));
|
||||
delete dbs[i];
|
||||
}
|
||||
|
||||
// Verify non-empty DBs can be recovered in parallel
|
||||
dbs.clear();
|
||||
open_threads.clear();
|
||||
for (int i = 0; i < kNumDbs; ++i) {
|
||||
open_threads.emplace_back(
|
||||
[&](int dbnum) {
|
||||
ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
|
||||
},
|
||||
i);
|
||||
}
|
||||
|
||||
// Wait and cleanup
|
||||
for (int i = 0; i < kNumDbs; ++i) {
|
||||
open_threads[i].join();
|
||||
delete dbs[i];
|
||||
ASSERT_OK(DestroyDB(dbnames[i], options));
|
||||
}
|
||||
}
|
||||
#endif // OS_WIN
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -104,19 +104,19 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
|
||||
} // namespace
|
||||
|
||||
// format_version is the block format as defined in include/rocksdb/table.h
|
||||
Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
||||
Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info,
|
||||
CompressionType* type, uint32_t format_version,
|
||||
std::string* compressed_output) {
|
||||
*type = compression_ctx.type();
|
||||
if (compression_ctx.type() == kNoCompression) {
|
||||
*type = compression_info.type();
|
||||
if (compression_info.type() == kNoCompression) {
|
||||
return raw;
|
||||
}
|
||||
|
||||
// Will return compressed block contents if (1) the compression method is
|
||||
// supported in this platform and (2) the compression rate is "good enough".
|
||||
switch (compression_ctx.type()) {
|
||||
switch (compression_info.type()) {
|
||||
case kSnappyCompression:
|
||||
if (Snappy_Compress(compression_ctx, raw.data(), raw.size(),
|
||||
if (Snappy_Compress(compression_info, raw.data(), raw.size(),
|
||||
compressed_output) &&
|
||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||
return *compressed_output;
|
||||
@ -124,7 +124,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
||||
break; // fall back to no compression.
|
||||
case kZlibCompression:
|
||||
if (Zlib_Compress(
|
||||
compression_ctx,
|
||||
compression_info,
|
||||
GetCompressFormatForVersion(kZlibCompression, format_version),
|
||||
raw.data(), raw.size(), compressed_output) &&
|
||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||
@ -133,7 +133,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
||||
break; // fall back to no compression.
|
||||
case kBZip2Compression:
|
||||
if (BZip2_Compress(
|
||||
compression_ctx,
|
||||
compression_info,
|
||||
GetCompressFormatForVersion(kBZip2Compression, format_version),
|
||||
raw.data(), raw.size(), compressed_output) &&
|
||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||
@ -142,7 +142,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
||||
break; // fall back to no compression.
|
||||
case kLZ4Compression:
|
||||
if (LZ4_Compress(
|
||||
compression_ctx,
|
||||
compression_info,
|
||||
GetCompressFormatForVersion(kLZ4Compression, format_version),
|
||||
raw.data(), raw.size(), compressed_output) &&
|
||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||
@ -151,7 +151,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
||||
break; // fall back to no compression.
|
||||
case kLZ4HCCompression:
|
||||
if (LZ4HC_Compress(
|
||||
compression_ctx,
|
||||
compression_info,
|
||||
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
|
||||
raw.data(), raw.size(), compressed_output) &&
|
||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||
@ -167,7 +167,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
||||
break;
|
||||
case kZSTD:
|
||||
case kZSTDNotFinalCompression:
|
||||
if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(),
|
||||
if (ZSTD_Compress(compression_info, raw.data(), raw.size(),
|
||||
compressed_output) &&
|
||||
GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
||||
return *compressed_output;
|
||||
@ -261,10 +261,12 @@ struct BlockBasedTableBuilder::Rep {
|
||||
PartitionedIndexBuilder* p_index_builder_ = nullptr;
|
||||
|
||||
std::string last_key;
|
||||
// Compression dictionary or nullptr
|
||||
const std::string* compression_dict;
|
||||
CompressionType compression_type;
|
||||
CompressionOptions compression_opts;
|
||||
CompressionDict compression_dict;
|
||||
CompressionContext compression_ctx;
|
||||
std::unique_ptr<UncompressionContext> verify_ctx;
|
||||
UncompressionDict verify_dict;
|
||||
TableProperties props;
|
||||
|
||||
bool closed = false; // Either Finish() or Abandon() has been called.
|
||||
@ -313,8 +315,15 @@ struct BlockBasedTableBuilder::Rep {
|
||||
table_options.data_block_hash_table_util_ratio),
|
||||
range_del_block(1 /* block_restart_interval */),
|
||||
internal_prefix_transform(_moptions.prefix_extractor.get()),
|
||||
compression_dict(_compression_dict),
|
||||
compression_ctx(_compression_type, _compression_opts),
|
||||
compression_type(_compression_type),
|
||||
compression_opts(_compression_opts),
|
||||
compression_dict(
|
||||
_compression_dict == nullptr ? Slice() : Slice(*_compression_dict),
|
||||
_compression_type, _compression_opts.level),
|
||||
compression_ctx(_compression_type),
|
||||
verify_dict(
|
||||
_compression_dict == nullptr ? Slice() : Slice(*_compression_dict),
|
||||
_compression_type),
|
||||
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
|
||||
!table_opt.block_align),
|
||||
compressed_cache_key_prefix_size(0),
|
||||
@ -355,7 +364,7 @@ struct BlockBasedTableBuilder::Rep {
|
||||
_moptions.prefix_extractor != nullptr));
|
||||
if (table_options.verify_compression) {
|
||||
verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
|
||||
compression_ctx.type()));
|
||||
compression_type));
|
||||
}
|
||||
}
|
||||
|
||||
@ -506,7 +515,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
|
||||
assert(ok());
|
||||
Rep* r = rep_;
|
||||
|
||||
auto type = r->compression_ctx.type();
|
||||
auto type = r->compression_type;
|
||||
Slice block_contents;
|
||||
bool abort_compression = false;
|
||||
|
||||
@ -514,24 +523,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
|
||||
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
|
||||
|
||||
if (raw_block_contents.size() < kCompressionSizeLimit) {
|
||||
Slice compression_dict;
|
||||
if (is_data_block && r->compression_dict && r->compression_dict->size()) {
|
||||
r->compression_ctx.dict() = *r->compression_dict;
|
||||
if (r->table_options.verify_compression) {
|
||||
assert(r->verify_ctx != nullptr);
|
||||
r->verify_ctx->dict() = *r->compression_dict;
|
||||
}
|
||||
} else {
|
||||
// Clear dictionary
|
||||
r->compression_ctx.dict() = Slice();
|
||||
if (r->table_options.verify_compression) {
|
||||
assert(r->verify_ctx != nullptr);
|
||||
r->verify_ctx->dict() = Slice();
|
||||
}
|
||||
}
|
||||
|
||||
CompressionInfo compression_info(
|
||||
r->compression_opts, r->compression_ctx,
|
||||
is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(),
|
||||
r->compression_type);
|
||||
block_contents =
|
||||
CompressBlock(raw_block_contents, r->compression_ctx, &type,
|
||||
CompressBlock(raw_block_contents, compression_info, &type,
|
||||
r->table_options.format_version, &r->compressed_output);
|
||||
|
||||
// Some of the compression algorithms are known to be unreliable. If
|
||||
@ -540,8 +537,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
|
||||
if (type != kNoCompression && r->table_options.verify_compression) {
|
||||
// Retrieve the uncompressed contents into a new buffer
|
||||
BlockContents contents;
|
||||
UncompressionInfo uncompression_info(
|
||||
*r->verify_ctx,
|
||||
is_data_block ? r->verify_dict : UncompressionDict::GetEmptyDict(),
|
||||
r->compression_type);
|
||||
Status stat = UncompressBlockContentsForCompressionType(
|
||||
*r->verify_ctx, block_contents.data(), block_contents.size(),
|
||||
uncompression_info, block_contents.data(), block_contents.size(),
|
||||
&contents, r->table_options.format_version, r->ioptions);
|
||||
|
||||
if (stat.ok()) {
|
||||
@ -805,7 +806,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
|
||||
? rep_->ioptions.merge_operator->Name()
|
||||
: "nullptr";
|
||||
rep_->props.compression_name =
|
||||
CompressionTypeToString(rep_->compression_ctx.type());
|
||||
CompressionTypeToString(rep_->compression_type);
|
||||
rep_->props.prefix_extractor_name =
|
||||
rep_->moptions.prefix_extractor != nullptr
|
||||
? rep_->moptions.prefix_extractor->Name()
|
||||
@ -854,10 +855,10 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
|
||||
|
||||
void BlockBasedTableBuilder::WriteCompressionDictBlock(
|
||||
MetaIndexBuilder* meta_index_builder) {
|
||||
if (rep_->compression_dict && rep_->compression_dict->size()) {
|
||||
if (rep_->compression_dict.GetRawDict().size()) {
|
||||
BlockHandle compression_dict_block_handle;
|
||||
if (ok()) {
|
||||
WriteRawBlock(*rep_->compression_dict, kNoCompression,
|
||||
WriteRawBlock(rep_->compression_dict.GetRawDict(), kNoCompression,
|
||||
&compression_dict_block_handle);
|
||||
}
|
||||
if (ok()) {
|
||||
|
@ -133,7 +133,7 @@ class BlockBasedTableBuilder : public TableBuilder {
|
||||
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
|
||||
};
|
||||
|
||||
Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
|
||||
Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
|
||||
CompressionType* type, uint32_t format_version,
|
||||
std::string* compressed_output);
|
||||
|
||||
|
@ -1334,8 +1334,10 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
||||
|
||||
// Retrieve the uncompressed contents into a new buffer
|
||||
BlockContents contents;
|
||||
UncompressionContext uncompresssion_ctx(compression_type, compression_dict);
|
||||
s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data.data(),
|
||||
UncompressionContext context(compression_type);
|
||||
UncompressionDict dict(compression_dict, compression_type);
|
||||
UncompressionInfo info(context, dict, compression_type);
|
||||
s = UncompressBlockContents(info, compressed_block->data.data(),
|
||||
compressed_block->data.size(), &contents,
|
||||
rep->table_options.format_version, rep->ioptions,
|
||||
GetMemoryAllocator(rep->table_options));
|
||||
@ -1412,12 +1414,13 @@ Status BlockBasedTable::PutDataBlockToCache(
|
||||
BlockContents uncompressed_block_contents;
|
||||
Statistics* statistics = ioptions.statistics;
|
||||
if (raw_block_comp_type != kNoCompression) {
|
||||
UncompressionContext uncompression_ctx(raw_block_comp_type,
|
||||
compression_dict);
|
||||
s = UncompressBlockContents(
|
||||
uncompression_ctx, raw_block_contents->data.data(),
|
||||
raw_block_contents->data.size(), &uncompressed_block_contents,
|
||||
format_version, ioptions, memory_allocator);
|
||||
UncompressionContext context(raw_block_comp_type);
|
||||
UncompressionDict dict(compression_dict, raw_block_comp_type);
|
||||
UncompressionInfo info(context, dict, raw_block_comp_type);
|
||||
s = UncompressBlockContents(info, raw_block_contents->data.data(),
|
||||
raw_block_contents->data.size(),
|
||||
&uncompressed_block_contents, format_version,
|
||||
ioptions, memory_allocator);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
|
@ -256,11 +256,12 @@ Status BlockFetcher::ReadBlockContents() {
|
||||
|
||||
if (do_uncompress_ && compression_type_ != kNoCompression) {
|
||||
// compressed page, uncompress, update cache
|
||||
UncompressionContext uncompression_ctx(compression_type_,
|
||||
compression_dict_);
|
||||
status_ = UncompressBlockContents(uncompression_ctx, slice_.data(),
|
||||
block_size_, contents_, footer_.version(),
|
||||
ioptions_, memory_allocator_);
|
||||
UncompressionContext context(compression_type_);
|
||||
UncompressionDict dict(compression_dict_, compression_type_);
|
||||
UncompressionInfo info(context, dict, compression_type_);
|
||||
status_ = UncompressBlockContents(info, slice_.data(), block_size_,
|
||||
contents_, footer_.version(), ioptions_,
|
||||
memory_allocator_);
|
||||
compression_type_ = kNoCompression;
|
||||
} else {
|
||||
GetBlockContents();
|
||||
|
@ -278,18 +278,18 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
|
||||
}
|
||||
|
||||
Status UncompressBlockContentsForCompressionType(
|
||||
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
|
||||
const UncompressionInfo& uncompression_info, const char* data, size_t n,
|
||||
BlockContents* contents, uint32_t format_version,
|
||||
const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) {
|
||||
CacheAllocationPtr ubuf;
|
||||
|
||||
assert(uncompression_ctx.type() != kNoCompression &&
|
||||
assert(uncompression_info.type() != kNoCompression &&
|
||||
"Invalid compression type");
|
||||
|
||||
StopWatchNano timer(ioptions.env,
|
||||
ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
|
||||
int decompress_size = 0;
|
||||
switch (uncompression_ctx.type()) {
|
||||
switch (uncompression_info.type()) {
|
||||
case kSnappyCompression: {
|
||||
size_t ulength = 0;
|
||||
static char snappy_corrupt_msg[] =
|
||||
@ -306,7 +306,7 @@ Status UncompressBlockContentsForCompressionType(
|
||||
}
|
||||
case kZlibCompression:
|
||||
ubuf = Zlib_Uncompress(
|
||||
uncompression_ctx, data, n, &decompress_size,
|
||||
uncompression_info, data, n, &decompress_size,
|
||||
GetCompressFormatForVersion(kZlibCompression, format_version),
|
||||
allocator);
|
||||
if (!ubuf) {
|
||||
@ -330,7 +330,7 @@ Status UncompressBlockContentsForCompressionType(
|
||||
break;
|
||||
case kLZ4Compression:
|
||||
ubuf = LZ4_Uncompress(
|
||||
uncompression_ctx, data, n, &decompress_size,
|
||||
uncompression_info, data, n, &decompress_size,
|
||||
GetCompressFormatForVersion(kLZ4Compression, format_version),
|
||||
allocator);
|
||||
if (!ubuf) {
|
||||
@ -342,7 +342,7 @@ Status UncompressBlockContentsForCompressionType(
|
||||
break;
|
||||
case kLZ4HCCompression:
|
||||
ubuf = LZ4_Uncompress(
|
||||
uncompression_ctx, data, n, &decompress_size,
|
||||
uncompression_info, data, n, &decompress_size,
|
||||
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
|
||||
allocator);
|
||||
if (!ubuf) {
|
||||
@ -365,7 +365,7 @@ Status UncompressBlockContentsForCompressionType(
|
||||
break;
|
||||
case kZSTD:
|
||||
case kZSTDNotFinalCompression:
|
||||
ubuf = ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size,
|
||||
ubuf = ZSTD_Uncompress(uncompression_info, data, n, &decompress_size,
|
||||
allocator);
|
||||
if (!ubuf) {
|
||||
static char zstd_corrupt_msg[] =
|
||||
@ -395,14 +395,14 @@ Status UncompressBlockContentsForCompressionType(
|
||||
// buffer is returned via 'result' and it is upto the caller to
|
||||
// free this buffer.
|
||||
// format_version is the block format as defined in include/rocksdb/table.h
|
||||
Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
|
||||
Status UncompressBlockContents(const UncompressionInfo& uncompression_info,
|
||||
const char* data, size_t n,
|
||||
BlockContents* contents, uint32_t format_version,
|
||||
const ImmutableCFOptions& ioptions,
|
||||
MemoryAllocator* allocator) {
|
||||
assert(data[n] != kNoCompression);
|
||||
assert(data[n] == uncompression_ctx.type());
|
||||
return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n,
|
||||
assert(data[n] == uncompression_info.type());
|
||||
return UncompressBlockContentsForCompressionType(uncompression_info, data, n,
|
||||
contents, format_version,
|
||||
ioptions, allocator);
|
||||
}
|
||||
|
@ -277,16 +277,18 @@ extern Status ReadBlockContents(
|
||||
// free this buffer.
|
||||
// For description of compress_format_version and possible values, see
|
||||
// util/compression.h
|
||||
extern Status UncompressBlockContents(
|
||||
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
|
||||
BlockContents* contents, uint32_t compress_format_version,
|
||||
const ImmutableCFOptions& ioptions, MemoryAllocator* allocator = nullptr);
|
||||
extern Status UncompressBlockContents(const UncompressionInfo& info,
|
||||
const char* data, size_t n,
|
||||
BlockContents* contents,
|
||||
uint32_t compress_format_version,
|
||||
const ImmutableCFOptions& ioptions,
|
||||
MemoryAllocator* allocator = nullptr);
|
||||
|
||||
// This is an extension to UncompressBlockContents that accepts
|
||||
// a specific compression type. This is used by un-wrapped blocks
|
||||
// with no compression header.
|
||||
extern Status UncompressBlockContentsForCompressionType(
|
||||
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
|
||||
const UncompressionInfo& info, const char* data, size_t n,
|
||||
BlockContents* contents, uint32_t compress_format_version,
|
||||
const ImmutableCFOptions& ioptions, MemoryAllocator* allocator = nullptr);
|
||||
|
||||
|
@ -2051,28 +2051,28 @@ class Benchmark {
|
||||
return true;
|
||||
}
|
||||
|
||||
inline bool CompressSlice(const CompressionContext& compression_ctx,
|
||||
inline bool CompressSlice(const CompressionInfo& compression_info,
|
||||
const Slice& input, std::string* compressed) {
|
||||
bool ok = true;
|
||||
switch (FLAGS_compression_type_e) {
|
||||
case rocksdb::kSnappyCompression:
|
||||
ok = Snappy_Compress(compression_ctx, input.data(), input.size(),
|
||||
ok = Snappy_Compress(compression_info, input.data(), input.size(),
|
||||
compressed);
|
||||
break;
|
||||
case rocksdb::kZlibCompression:
|
||||
ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(),
|
||||
ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
|
||||
compressed);
|
||||
break;
|
||||
case rocksdb::kBZip2Compression:
|
||||
ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(),
|
||||
ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
|
||||
compressed);
|
||||
break;
|
||||
case rocksdb::kLZ4Compression:
|
||||
ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(),
|
||||
ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
|
||||
compressed);
|
||||
break;
|
||||
case rocksdb::kLZ4HCCompression:
|
||||
ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(),
|
||||
ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
|
||||
compressed);
|
||||
break;
|
||||
case rocksdb::kXpressCompression:
|
||||
@ -2080,7 +2080,7 @@ class Benchmark {
|
||||
input.size(), compressed);
|
||||
break;
|
||||
case rocksdb::kZSTD:
|
||||
ok = ZSTD_Compress(compression_ctx, input.data(), input.size(),
|
||||
ok = ZSTD_Compress(compression_info, input.data(), input.size(),
|
||||
compressed);
|
||||
break;
|
||||
default:
|
||||
@ -2163,10 +2163,11 @@ class Benchmark {
|
||||
const int len = FLAGS_block_size;
|
||||
std::string input_str(len, 'y');
|
||||
std::string compressed;
|
||||
CompressionContext compression_ctx(FLAGS_compression_type_e,
|
||||
Options().compression_opts);
|
||||
bool result =
|
||||
CompressSlice(compression_ctx, Slice(input_str), &compressed);
|
||||
CompressionOptions opts;
|
||||
CompressionContext context(FLAGS_compression_type_e);
|
||||
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
|
||||
FLAGS_compression_type_e);
|
||||
bool result = CompressSlice(info, Slice(input_str), &compressed);
|
||||
|
||||
if (!result) {
|
||||
fprintf(stdout, "WARNING: %s compression is not enabled\n",
|
||||
@ -3020,13 +3021,14 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
||||
int64_t produced = 0;
|
||||
bool ok = true;
|
||||
std::string compressed;
|
||||
CompressionContext compression_ctx(FLAGS_compression_type_e,
|
||||
Options().compression_opts);
|
||||
|
||||
CompressionOptions opts;
|
||||
CompressionContext context(FLAGS_compression_type_e);
|
||||
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
|
||||
FLAGS_compression_type_e);
|
||||
// Compress 1G
|
||||
while (ok && bytes < int64_t(1) << 30) {
|
||||
compressed.clear();
|
||||
ok = CompressSlice(compression_ctx, input, &compressed);
|
||||
ok = CompressSlice(info, input, &compressed);
|
||||
produced += compressed.size();
|
||||
bytes += input.size();
|
||||
thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
|
||||
@ -3048,11 +3050,17 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
||||
Slice input = gen.Generate(FLAGS_block_size);
|
||||
std::string compressed;
|
||||
|
||||
CompressionContext compression_ctx(FLAGS_compression_type_e);
|
||||
CompressionOptions compression_opts;
|
||||
CompressionInfo compression_info(compression_opts, compression_ctx,
|
||||
CompressionDict::GetEmptyDict(),
|
||||
FLAGS_compression_type_e);
|
||||
UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
|
||||
CompressionContext compression_ctx(FLAGS_compression_type_e,
|
||||
Options().compression_opts);
|
||||
UncompressionInfo uncompression_info(uncompression_ctx,
|
||||
UncompressionDict::GetEmptyDict(),
|
||||
FLAGS_compression_type_e);
|
||||
|
||||
bool ok = CompressSlice(compression_ctx, input, &compressed);
|
||||
bool ok = CompressSlice(compression_info, input, &compressed);
|
||||
int64_t bytes = 0;
|
||||
int decompress_size;
|
||||
while (ok && bytes < 1024 * 1048576) {
|
||||
@ -3072,7 +3080,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
||||
break;
|
||||
}
|
||||
case rocksdb::kZlibCompression:
|
||||
uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
|
||||
uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(),
|
||||
compressed.size(), &decompress_size, 2);
|
||||
ok = uncompressed.get() != nullptr;
|
||||
break;
|
||||
@ -3082,12 +3090,12 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
||||
ok = uncompressed.get() != nullptr;
|
||||
break;
|
||||
case rocksdb::kLZ4Compression:
|
||||
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
|
||||
uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
|
||||
compressed.size(), &decompress_size, 2);
|
||||
ok = uncompressed.get() != nullptr;
|
||||
break;
|
||||
case rocksdb::kLZ4HCCompression:
|
||||
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
|
||||
uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
|
||||
compressed.size(), &decompress_size, 2);
|
||||
ok = uncompressed.get() != nullptr;
|
||||
break;
|
||||
@ -3097,7 +3105,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
||||
ok = uncompressed.get() != nullptr;
|
||||
break;
|
||||
case rocksdb::kZSTD:
|
||||
uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
|
||||
uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
|
||||
compressed.size(), &decompress_size);
|
||||
ok = uncompressed.get() != nullptr;
|
||||
break;
|
||||
|
@ -135,12 +135,127 @@ class ZSTDUncompressCachedData {
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// Instantiate this class and pass it to the uncompression API below
|
||||
// Holds dictionary and related data, like ZSTD's digested compression
|
||||
// dictionary.
|
||||
struct CompressionDict {
|
||||
#if ZSTD_VERSION_NUMBER >= 700
|
||||
ZSTD_CDict* zstd_cdict_ = nullptr;
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
Slice dict_;
|
||||
|
||||
public:
|
||||
#if ZSTD_VERSION_NUMBER >= 700
|
||||
CompressionDict(Slice dict, CompressionType type, int level) {
|
||||
#else // ZSTD_VERSION_NUMBER >= 700
|
||||
CompressionDict(Slice dict, CompressionType /*type*/, int /*level*/) {
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
dict_ = std::move(dict);
|
||||
#if ZSTD_VERSION_NUMBER >= 700
|
||||
zstd_cdict_ = nullptr;
|
||||
if (!dict_.empty() && (type == kZSTD || type == kZSTDNotFinalCompression)) {
|
||||
if (level == CompressionOptions::kDefaultCompressionLevel) {
|
||||
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
|
||||
// https://github.com/facebook/zstd/issues/1148
|
||||
level = 3;
|
||||
}
|
||||
// Should be safe (but slower) if below call fails as we'll use the
|
||||
// raw dictionary to compress.
|
||||
zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level);
|
||||
assert(zstd_cdict_ != nullptr);
|
||||
}
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
}
|
||||
|
||||
~CompressionDict() {
|
||||
#if ZSTD_VERSION_NUMBER >= 700
|
||||
size_t res = 0;
|
||||
if (zstd_cdict_ != nullptr) {
|
||||
res = ZSTD_freeCDict(zstd_cdict_);
|
||||
}
|
||||
assert(res == 0); // Last I checked they can't fail
|
||||
(void)res; // prevent unused var warning
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
}
|
||||
|
||||
#if ZSTD_VERSION_NUMBER >= 700
|
||||
const ZSTD_CDict* GetDigestedZstdCDict() const {
|
||||
return zstd_cdict_;
|
||||
}
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
|
||||
Slice GetRawDict() const { return dict_; }
|
||||
|
||||
static const CompressionDict& GetEmptyDict() {
|
||||
static CompressionDict empty_dict{};
|
||||
return empty_dict;
|
||||
}
|
||||
|
||||
CompressionDict() = default;
|
||||
// Disable copy/move
|
||||
CompressionDict(const CompressionDict&) = delete;
|
||||
CompressionDict& operator=(const CompressionDict&) = delete;
|
||||
CompressionDict(CompressionDict&&) = delete;
|
||||
CompressionDict& operator=(CompressionDict&&) = delete;
|
||||
};
|
||||
|
||||
// Holds dictionary and related data, like ZSTD's digested uncompression
|
||||
// dictionary.
|
||||
struct UncompressionDict {
|
||||
#if ZSTD_VERSION_NUMBER >= 700
|
||||
ZSTD_DDict* zstd_ddict_;
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
Slice dict_;
|
||||
|
||||
#if ZSTD_VERSION_NUMBER >= 700
|
||||
UncompressionDict(Slice dict, CompressionType type) {
|
||||
#else // ZSTD_VERSION_NUMBER >= 700
|
||||
UncompressionDict(Slice dict, CompressionType /*type*/) {
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
dict_ = std::move(dict);
|
||||
#if ZSTD_VERSION_NUMBER >= 700
|
||||
zstd_ddict_ = nullptr;
|
||||
if (!dict_.empty() && (type == kZSTD || type == kZSTDNotFinalCompression)) {
|
||||
zstd_ddict_ = ZSTD_createDDict(dict_.data(), dict_.size());
|
||||
assert(zstd_ddict_ != nullptr);
|
||||
}
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
}
|
||||
|
||||
~UncompressionDict() {
|
||||
#if ZSTD_VERSION_NUMBER >= 700
|
||||
size_t res = 0;
|
||||
if (zstd_ddict_ != nullptr) {
|
||||
res = ZSTD_freeDDict(zstd_ddict_);
|
||||
}
|
||||
assert(res == 0); // Last I checked they can't fail
|
||||
(void)res; // prevent unused var warning
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
}
|
||||
|
||||
#if ZSTD_VERSION_NUMBER >= 700
|
||||
const ZSTD_DDict* GetDigestedZstdDDict() const {
|
||||
return zstd_ddict_;
|
||||
}
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
|
||||
Slice GetRawDict() const { return dict_; }
|
||||
|
||||
static const UncompressionDict& GetEmptyDict() {
|
||||
static UncompressionDict empty_dict{};
|
||||
return empty_dict;
|
||||
}
|
||||
|
||||
UncompressionDict() = default;
|
||||
// Disable copy/move
|
||||
UncompressionDict(const CompressionDict&) = delete;
|
||||
UncompressionDict& operator=(const CompressionDict&) = delete;
|
||||
UncompressionDict(CompressionDict&&) = delete;
|
||||
UncompressionDict& operator=(CompressionDict&&) = delete;
|
||||
};
|
||||
|
||||
class CompressionContext {
|
||||
private:
|
||||
const CompressionType type_;
|
||||
const CompressionOptions opts_;
|
||||
Slice dict_;
|
||||
#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
|
||||
ZSTD_CCtx* zstd_ctx_ = nullptr;
|
||||
void CreateNativeContext() {
|
||||
@ -165,6 +280,7 @@ class CompressionContext {
|
||||
assert(type_ == kZSTD || type_ == kZSTDNotFinalCompression);
|
||||
return zstd_ctx_;
|
||||
}
|
||||
|
||||
#else // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
|
||||
private:
|
||||
void CreateNativeContext() {}
|
||||
@ -172,28 +288,35 @@ class CompressionContext {
|
||||
#endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
|
||||
public:
|
||||
explicit CompressionContext(CompressionType comp_type) : type_(comp_type) {
|
||||
CreateNativeContext();
|
||||
}
|
||||
CompressionContext(CompressionType comp_type, const CompressionOptions& opts,
|
||||
const Slice& comp_dict = Slice())
|
||||
: type_(comp_type), opts_(opts), dict_(comp_dict) {
|
||||
(void)type_;
|
||||
CreateNativeContext();
|
||||
}
|
||||
~CompressionContext() { DestroyNativeContext(); }
|
||||
CompressionContext(const CompressionContext&) = delete;
|
||||
CompressionContext& operator=(const CompressionContext&) = delete;
|
||||
|
||||
const CompressionOptions& options() const { return opts_; }
|
||||
CompressionType type() const { return type_; }
|
||||
const Slice& dict() const { return dict_; }
|
||||
Slice& dict() { return dict_; }
|
||||
};
|
||||
|
||||
// Instantiate this class and pass it to the uncompression API below
|
||||
class CompressionInfo {
|
||||
const CompressionOptions& opts_;
|
||||
const CompressionContext& context_;
|
||||
const CompressionDict& dict_;
|
||||
const CompressionType type_;
|
||||
|
||||
public:
|
||||
CompressionInfo(const CompressionOptions& _opts,
|
||||
const CompressionContext& _context,
|
||||
const CompressionDict& _dict, CompressionType _type)
|
||||
: opts_(_opts), context_(_context), dict_(_dict), type_(_type) {}
|
||||
|
||||
const CompressionOptions& options() const { return opts_; }
|
||||
const CompressionContext& context() const { return context_; }
|
||||
const CompressionDict& dict() const { return dict_; }
|
||||
CompressionType type() const { return type_; }
|
||||
};
|
||||
|
||||
class UncompressionContext {
|
||||
private:
|
||||
CompressionType type_;
|
||||
Slice dict_;
|
||||
const CompressionType type_;
|
||||
CompressionContextCache* ctx_cache_ = nullptr;
|
||||
ZSTDUncompressCachedData uncomp_cached_data_;
|
||||
|
||||
@ -201,10 +324,8 @@ class UncompressionContext {
|
||||
struct NoCache {};
|
||||
// Do not use context cache, used by TableBuilder
|
||||
UncompressionContext(NoCache, CompressionType comp_type) : type_(comp_type) {}
|
||||
explicit UncompressionContext(CompressionType comp_type)
|
||||
: UncompressionContext(comp_type, Slice()) {}
|
||||
UncompressionContext(CompressionType comp_type, const Slice& comp_dict)
|
||||
: type_(comp_type), dict_(comp_dict) {
|
||||
|
||||
explicit UncompressionContext(CompressionType comp_type) : type_(comp_type) {
|
||||
if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) {
|
||||
ctx_cache_ = CompressionContextCache::Instance();
|
||||
uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
|
||||
@ -224,9 +345,21 @@ class UncompressionContext {
|
||||
ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
|
||||
return uncomp_cached_data_.Get();
|
||||
}
|
||||
};
|
||||
|
||||
class UncompressionInfo {
|
||||
const UncompressionContext& context_;
|
||||
const UncompressionDict& dict_;
|
||||
const CompressionType type_;
|
||||
|
||||
public:
|
||||
UncompressionInfo(const UncompressionContext& _context,
|
||||
const UncompressionDict& _dict, CompressionType _type)
|
||||
: context_(_context), dict_(_dict), type_(_type) {}
|
||||
|
||||
const UncompressionContext& context() const { return context_; }
|
||||
const UncompressionDict& dict() const { return dict_; }
|
||||
CompressionType type() const { return type_; }
|
||||
const Slice& dict() const { return dict_; }
|
||||
Slice& dict() { return dict_; }
|
||||
};
|
||||
|
||||
inline bool Snappy_Supported() {
|
||||
@ -345,9 +478,8 @@ inline std::string CompressionTypeToString(CompressionType compression_type) {
|
||||
// 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
|
||||
// start of compressed block. Snappy format is the same as version 1.
|
||||
|
||||
inline bool Snappy_Compress(const CompressionContext& /*ctx*/,
|
||||
const char* input, size_t length,
|
||||
::std::string* output) {
|
||||
inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input,
|
||||
size_t length, ::std::string* output) {
|
||||
#ifdef SNAPPY
|
||||
output->resize(snappy::MaxCompressedLength(length));
|
||||
size_t outlen;
|
||||
@ -412,7 +544,7 @@ inline bool GetDecompressedSizeInfo(const char** input_data,
|
||||
// header in varint32 format
|
||||
// @param compression_dict Data for presetting the compression library's
|
||||
// dictionary.
|
||||
inline bool Zlib_Compress(const CompressionContext& ctx,
|
||||
inline bool Zlib_Compress(const CompressionInfo& info,
|
||||
uint32_t compress_format_version, const char* input,
|
||||
size_t length, ::std::string* output) {
|
||||
#ifdef ZLIB
|
||||
@ -437,24 +569,25 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
|
||||
// The default value is 8. See zconf.h for more details.
|
||||
static const int memLevel = 8;
|
||||
int level;
|
||||
if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
||||
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
||||
level = Z_DEFAULT_COMPRESSION;
|
||||
} else {
|
||||
level = ctx.options().level;
|
||||
level = info.options().level;
|
||||
}
|
||||
z_stream _stream;
|
||||
memset(&_stream, 0, sizeof(z_stream));
|
||||
int st = deflateInit2(&_stream, level, Z_DEFLATED, ctx.options().window_bits,
|
||||
memLevel, ctx.options().strategy);
|
||||
int st = deflateInit2(&_stream, level, Z_DEFLATED, info.options().window_bits,
|
||||
memLevel, info.options().strategy);
|
||||
if (st != Z_OK) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (ctx.dict().size()) {
|
||||
Slice compression_dict = info.dict().GetRawDict();
|
||||
if (compression_dict.size()) {
|
||||
// Initialize the compression library's dictionary
|
||||
st = deflateSetDictionary(&_stream,
|
||||
reinterpret_cast<const Bytef*>(ctx.dict().data()),
|
||||
static_cast<unsigned int>(ctx.dict().size()));
|
||||
st = deflateSetDictionary(
|
||||
&_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
|
||||
static_cast<unsigned int>(compression_dict.size()));
|
||||
if (st != Z_OK) {
|
||||
deflateEnd(&_stream);
|
||||
return false;
|
||||
@ -482,7 +615,7 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
|
||||
deflateEnd(&_stream);
|
||||
return compressed;
|
||||
#else
|
||||
(void)ctx;
|
||||
(void)info;
|
||||
(void)compress_format_version;
|
||||
(void)input;
|
||||
(void)length;
|
||||
@ -498,8 +631,8 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
|
||||
// @param compression_dict Data for presetting the compression library's
|
||||
// dictionary.
|
||||
inline CacheAllocationPtr Zlib_Uncompress(
|
||||
const UncompressionContext& ctx, const char* input_data,
|
||||
size_t input_length, int* decompress_size, uint32_t compress_format_version,
|
||||
const UncompressionInfo& info, const char* input_data, size_t input_length,
|
||||
int* decompress_size, uint32_t compress_format_version,
|
||||
MemoryAllocator* allocator = nullptr, int windowBits = -14) {
|
||||
#ifdef ZLIB
|
||||
uint32_t output_len = 0;
|
||||
@ -529,11 +662,12 @@ inline CacheAllocationPtr Zlib_Uncompress(
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (ctx.dict().size()) {
|
||||
Slice compression_dict = info.dict().GetRawDict();
|
||||
if (compression_dict.size()) {
|
||||
// Initialize the compression library's dictionary
|
||||
st = inflateSetDictionary(&_stream,
|
||||
reinterpret_cast<const Bytef*>(ctx.dict().data()),
|
||||
static_cast<unsigned int>(ctx.dict().size()));
|
||||
st = inflateSetDictionary(
|
||||
&_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
|
||||
static_cast<unsigned int>(compression_dict.size()));
|
||||
if (st != Z_OK) {
|
||||
return nullptr;
|
||||
}
|
||||
@ -584,7 +718,7 @@ inline CacheAllocationPtr Zlib_Uncompress(
|
||||
inflateEnd(&_stream);
|
||||
return output;
|
||||
#else
|
||||
(void)ctx;
|
||||
(void)info;
|
||||
(void)input_data;
|
||||
(void)input_length;
|
||||
(void)decompress_size;
|
||||
@ -599,7 +733,7 @@ inline CacheAllocationPtr Zlib_Uncompress(
|
||||
// block header
|
||||
// compress_format_version == 2 -- decompressed size is included in the block
|
||||
// header in varint32 format
|
||||
inline bool BZip2_Compress(const CompressionContext& /*ctx*/,
|
||||
inline bool BZip2_Compress(const CompressionInfo& /*info*/,
|
||||
uint32_t compress_format_version, const char* input,
|
||||
size_t length, ::std::string* output) {
|
||||
#ifdef BZIP2
|
||||
@ -745,7 +879,7 @@ inline CacheAllocationPtr BZip2_Uncompress(
|
||||
// header in varint32 format
|
||||
// @param compression_dict Data for presetting the compression library's
|
||||
// dictionary.
|
||||
inline bool LZ4_Compress(const CompressionContext& ctx,
|
||||
inline bool LZ4_Compress(const CompressionInfo& info,
|
||||
uint32_t compress_format_version, const char* input,
|
||||
size_t length, ::std::string* output) {
|
||||
#ifdef LZ4
|
||||
@ -773,9 +907,10 @@ inline bool LZ4_Compress(const CompressionContext& ctx,
|
||||
int outlen;
|
||||
#if LZ4_VERSION_NUMBER >= 10400 // r124+
|
||||
LZ4_stream_t* stream = LZ4_createStream();
|
||||
if (ctx.dict().size()) {
|
||||
LZ4_loadDict(stream, ctx.dict().data(),
|
||||
static_cast<int>(ctx.dict().size()));
|
||||
Slice compression_dict = info.dict().GetRawDict();
|
||||
if (compression_dict.size()) {
|
||||
LZ4_loadDict(stream, compression_dict.data(),
|
||||
static_cast<int>(compression_dict.size()));
|
||||
}
|
||||
#if LZ4_VERSION_NUMBER >= 10700 // r129+
|
||||
outlen =
|
||||
@ -799,7 +934,7 @@ inline bool LZ4_Compress(const CompressionContext& ctx,
|
||||
output->resize(static_cast<size_t>(output_header_len + outlen));
|
||||
return true;
|
||||
#else // LZ4
|
||||
(void)ctx;
|
||||
(void)info;
|
||||
(void)compress_format_version;
|
||||
(void)input;
|
||||
(void)length;
|
||||
@ -814,7 +949,7 @@ inline bool LZ4_Compress(const CompressionContext& ctx,
|
||||
// header in varint32 format
|
||||
// @param compression_dict Data for presetting the compression library's
|
||||
// dictionary.
|
||||
inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
|
||||
inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info,
|
||||
const char* input_data,
|
||||
size_t input_length,
|
||||
int* decompress_size,
|
||||
@ -842,9 +977,10 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
|
||||
auto output = AllocateBlock(output_len, allocator);
|
||||
#if LZ4_VERSION_NUMBER >= 10400 // r124+
|
||||
LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
|
||||
if (ctx.dict().size()) {
|
||||
LZ4_setStreamDecode(stream, ctx.dict().data(),
|
||||
static_cast<int>(ctx.dict().size()));
|
||||
Slice compression_dict = info.dict().GetRawDict();
|
||||
if (compression_dict.size()) {
|
||||
LZ4_setStreamDecode(stream, compression_dict.data(),
|
||||
static_cast<int>(compression_dict.size()));
|
||||
}
|
||||
*decompress_size = LZ4_decompress_safe_continue(
|
||||
stream, input_data, output.get(), static_cast<int>(input_length),
|
||||
@ -863,7 +999,7 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
|
||||
assert(*decompress_size == static_cast<int>(output_len));
|
||||
return output;
|
||||
#else // LZ4
|
||||
(void)ctx;
|
||||
(void)info;
|
||||
(void)input_data;
|
||||
(void)input_length;
|
||||
(void)decompress_size;
|
||||
@ -879,7 +1015,7 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
|
||||
// header in varint32 format
|
||||
// @param compression_dict Data for presetting the compression library's
|
||||
// dictionary.
|
||||
inline bool LZ4HC_Compress(const CompressionContext& ctx,
|
||||
inline bool LZ4HC_Compress(const CompressionInfo& info,
|
||||
uint32_t compress_format_version, const char* input,
|
||||
size_t length, ::std::string* output) {
|
||||
#ifdef LZ4
|
||||
@ -906,17 +1042,18 @@ inline bool LZ4HC_Compress(const CompressionContext& ctx,
|
||||
|
||||
int outlen;
|
||||
int level;
|
||||
if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
||||
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
||||
level = 0; // lz4hc.h says any value < 1 will be sanitized to default
|
||||
} else {
|
||||
level = ctx.options().level;
|
||||
level = info.options().level;
|
||||
}
|
||||
#if LZ4_VERSION_NUMBER >= 10400 // r124+
|
||||
LZ4_streamHC_t* stream = LZ4_createStreamHC();
|
||||
LZ4_resetStreamHC(stream, level);
|
||||
Slice compression_dict = info.dict().GetRawDict();
|
||||
const char* compression_dict_data =
|
||||
ctx.dict().size() > 0 ? ctx.dict().data() : nullptr;
|
||||
size_t compression_dict_size = ctx.dict().size();
|
||||
compression_dict.size() > 0 ? compression_dict.data() : nullptr;
|
||||
size_t compression_dict_size = compression_dict.size();
|
||||
LZ4_loadDictHC(stream, compression_dict_data,
|
||||
static_cast<int>(compression_dict_size));
|
||||
|
||||
@ -947,7 +1084,7 @@ inline bool LZ4HC_Compress(const CompressionContext& ctx,
|
||||
output->resize(static_cast<size_t>(output_header_len + outlen));
|
||||
return true;
|
||||
#else // LZ4
|
||||
(void)ctx;
|
||||
(void)info;
|
||||
(void)compress_format_version;
|
||||
(void)input;
|
||||
(void)length;
|
||||
@ -981,9 +1118,7 @@ inline char* XPRESS_Uncompress(const char* /*input_data*/,
|
||||
}
|
||||
#endif
|
||||
|
||||
// @param compression_dict Data for presetting the compression library's
|
||||
// dictionary.
|
||||
inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
|
||||
inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
|
||||
size_t length, ::std::string* output) {
|
||||
#ifdef ZSTD
|
||||
if (length > std::numeric_limits<uint32_t>::max()) {
|
||||
@ -998,19 +1133,29 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
|
||||
output->resize(static_cast<size_t>(output_header_len + compressBound));
|
||||
size_t outlen = 0;
|
||||
int level;
|
||||
if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
||||
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
|
||||
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
|
||||
// https://github.com/facebook/zstd/issues/1148
|
||||
level = 3;
|
||||
} else {
|
||||
level = ctx.options().level;
|
||||
level = info.options().level;
|
||||
}
|
||||
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
|
||||
ZSTD_CCtx* context = ctx.ZSTDPreallocCtx();
|
||||
ZSTD_CCtx* context = info.context().ZSTDPreallocCtx();
|
||||
assert(context != nullptr);
|
||||
outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len],
|
||||
compressBound, input, length,
|
||||
ctx.dict().data(), ctx.dict().size(), level);
|
||||
#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+
|
||||
if (info.dict().GetDigestedZstdCDict() != nullptr) {
|
||||
outlen = ZSTD_compress_usingCDict(context, &(*output)[output_header_len],
|
||||
compressBound, input, length,
|
||||
info.dict().GetDigestedZstdCDict());
|
||||
}
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
if (outlen == 0) {
|
||||
outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len],
|
||||
compressBound, input, length,
|
||||
info.dict().GetRawDict().data(),
|
||||
info.dict().GetRawDict().size(), level);
|
||||
}
|
||||
#else // up to v0.4.x
|
||||
outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input,
|
||||
length, level);
|
||||
@ -1021,7 +1166,7 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
|
||||
output->resize(output_header_len + outlen);
|
||||
return true;
|
||||
#else // ZSTD
|
||||
(void)ctx;
|
||||
(void)info;
|
||||
(void)input;
|
||||
(void)length;
|
||||
(void)output;
|
||||
@ -1032,9 +1177,8 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
|
||||
// @param compression_dict Data for presetting the compression library's
|
||||
// dictionary.
|
||||
inline CacheAllocationPtr ZSTD_Uncompress(
|
||||
const UncompressionContext& ctx, const char* input_data,
|
||||
size_t input_length, int* decompress_size,
|
||||
MemoryAllocator* allocator = nullptr) {
|
||||
const UncompressionInfo& info, const char* input_data, size_t input_length,
|
||||
int* decompress_size, MemoryAllocator* allocator = nullptr) {
|
||||
#ifdef ZSTD
|
||||
uint32_t output_len = 0;
|
||||
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
|
||||
@ -1043,14 +1187,24 @@ inline CacheAllocationPtr ZSTD_Uncompress(
|
||||
}
|
||||
|
||||
auto output = AllocateBlock(output_len, allocator);
|
||||
size_t actual_output_length;
|
||||
size_t actual_output_length = 0;
|
||||
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
|
||||
ZSTD_DCtx* context = ctx.GetZSTDContext();
|
||||
ZSTD_DCtx* context = info.context().GetZSTDContext();
|
||||
assert(context != nullptr);
|
||||
actual_output_length = ZSTD_decompress_usingDict(
|
||||
context, output.get(), output_len, input_data, input_length,
|
||||
ctx.dict().data(), ctx.dict().size());
|
||||
#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+
|
||||
if (info.dict().GetDigestedZstdDDict() != nullptr) {
|
||||
actual_output_length = ZSTD_decompress_usingDDict(
|
||||
context, output.get(), output_len, input_data, input_length,
|
||||
info.dict().GetDigestedZstdDDict());
|
||||
}
|
||||
#endif // ZSTD_VERSION_NUMBER >= 700
|
||||
if (actual_output_length == 0) {
|
||||
actual_output_length = ZSTD_decompress_usingDict(
|
||||
context, output.get(), output_len, input_data, input_length,
|
||||
info.dict().GetRawDict().data(), info.dict().GetRawDict().size());
|
||||
}
|
||||
#else // up to v0.4.x
|
||||
(void)info;
|
||||
actual_output_length =
|
||||
ZSTD_decompress(output.get(), output_len, input_data, input_length);
|
||||
#endif // ZSTD_VERSION_NUMBER >= 500
|
||||
@ -1058,7 +1212,7 @@ inline CacheAllocationPtr ZSTD_Uncompress(
|
||||
*decompress_size = static_cast<int>(actual_output_length);
|
||||
return output;
|
||||
#else // ZSTD
|
||||
(void)ctx;
|
||||
(void)info;
|
||||
(void)input_data;
|
||||
(void)input_length;
|
||||
(void)decompress_size;
|
||||
@ -1067,6 +1221,17 @@ inline CacheAllocationPtr ZSTD_Uncompress(
|
||||
#endif
|
||||
}
|
||||
|
||||
inline bool ZSTD_TrainDictionarySupported() {
|
||||
#ifdef ZSTD
|
||||
// Dictionary trainer is available since v0.6.1 for static linking, but not
|
||||
// available for dynamic linking until v1.1.3. For now we enable the feature
|
||||
// in v1.1.3+ only.
|
||||
return (ZSTD_versionNumber() >= 10103);
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
inline std::string ZSTD_TrainDictionary(const std::string& samples,
|
||||
const std::vector<size_t>& sample_lens,
|
||||
size_t max_dict_bytes) {
|
||||
@ -1074,6 +1239,10 @@ inline std::string ZSTD_TrainDictionary(const std::string& samples,
|
||||
// available for dynamic linking until v1.1.3. For now we enable the feature
|
||||
// in v1.1.3+ only.
|
||||
#if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+
|
||||
assert(samples.empty() == sample_lens.empty());
|
||||
if (samples.empty()) {
|
||||
return "";
|
||||
}
|
||||
std::string dict_data(max_dict_bytes, '\0');
|
||||
size_t dict_len = ZDICT_trainFromBuffer(
|
||||
&dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0],
|
||||
|
@ -761,9 +761,11 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
||||
return raw;
|
||||
}
|
||||
StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
|
||||
CompressionType ct = bdb_options_.compression;
|
||||
CompressionContext compression_ctx(ct);
|
||||
CompressBlock(raw, compression_ctx, &ct, kBlockBasedTableVersionFormat,
|
||||
CompressionType type = bdb_options_.compression;
|
||||
CompressionOptions opts;
|
||||
CompressionContext context(type);
|
||||
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type);
|
||||
CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat,
|
||||
compression_output);
|
||||
return *compression_output;
|
||||
}
|
||||
@ -1100,9 +1102,11 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||
{
|
||||
StopWatch decompression_sw(env_, statistics_,
|
||||
BLOB_DB_DECOMPRESSION_MICROS);
|
||||
UncompressionContext uncompression_ctx(bfile->compression());
|
||||
UncompressionContext context(bfile->compression());
|
||||
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
|
||||
bfile->compression());
|
||||
s = UncompressBlockContentsForCompressionType(
|
||||
uncompression_ctx, blob_value.data(), blob_value.size(), &contents,
|
||||
info, blob_value.data(), blob_value.size(), &contents,
|
||||
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
|
||||
}
|
||||
value->PinSelf(contents.data);
|
||||
|
@ -208,10 +208,13 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
|
||||
if (compression != kNoCompression &&
|
||||
(show_uncompressed_blob != DisplayType::kNone || show_summary)) {
|
||||
BlockContents contents;
|
||||
UncompressionContext uncompression_ctx(compression);
|
||||
UncompressionContext context(compression);
|
||||
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
|
||||
compression);
|
||||
s = UncompressBlockContentsForCompressionType(
|
||||
uncompression_ctx, slice.data() + key_size, static_cast<size_t>(value_size),
|
||||
&contents, 2 /*compress_format_version*/, ImmutableCFOptions(Options()));
|
||||
info, slice.data() + key_size, static_cast<size_t>(value_size),
|
||||
&contents, 2 /*compress_format_version*/,
|
||||
ImmutableCFOptions(Options()));
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user