From 01013ae7666af78d4b0fc1e1baf4a735a8db5344 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Fri, 18 Jan 2019 19:10:17 -0800 Subject: [PATCH] Digest ZSTD compression dictionary once when writing SST file (#4849) Summary: This is essentially a re-submission of #4251 with a few improvements: - Split `CompressionDict` into two separate classes: `CompressionDict` and `UncompressionDict` - Eliminated `Init` functions. Instead do all initialization work in constructors. - Added test case for parallel DB open, which is the scenario where #4251 failed under TSAN. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4849 Differential Revision: D13606039 Pulled By: ajkr fbshipit-source-id: 08c236059798c710db9cbf545fce0f371232d447 --- db/column_family.cc | 10 +- db/db_test2.cc | 51 +++++ table/block_based_table_builder.cc | 75 +++---- table/block_based_table_builder.h | 2 +- table/block_based_table_reader.cc | 19 +- table/block_fetcher.cc | 11 +- table/format.cc | 20 +- table/format.h | 12 +- tools/db_bench_tool.cc | 52 +++-- util/compression.h | 325 +++++++++++++++++++++------- utilities/blob_db/blob_db_impl.cc | 14 +- utilities/blob_db/blob_dump_tool.cc | 9 +- 12 files changed, 419 insertions(+), 181 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 9a3ae99ca..c9172f662 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -129,14 +129,10 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) { } } if (cf_options.compression_opts.zstd_max_train_bytes > 0) { - if (!CompressionTypeSupported(CompressionType::kZSTD)) { - // Dictionary trainer is available since v0.6.1, but ZSTD was marked - // stable only since v0.8.0. For now we enable the feature in stable - // versions only. + if (!ZSTD_TrainDictionarySupported()) { return Status::InvalidArgument( - "zstd dictionary trainer cannot be used because " + - CompressionTypeToString(CompressionType::kZSTD) + - " is not linked with the binary."); + "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ " + "is not linked with the binary."); } if (cf_options.compression_opts.max_dict_bytes == 0) { return Status::InvalidArgument( diff --git a/db/db_test2.cc b/db/db_test2.cc index 466159e31..d0638875c 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -3214,6 +3214,57 @@ TEST_F(DBTest2, TestCompactFiles) { } #endif // ROCKSDB_LITE +// TODO: figure out why this test fails in appveyor +#ifndef OS_WIN +TEST_F(DBTest2, MultiDBParallelOpenTest) { + const int kNumDbs = 2; + Options options = CurrentOptions(); + std::vector dbnames; + for (int i = 0; i < kNumDbs; ++i) { + dbnames.emplace_back(test::TmpDir(env_) + "/db" + ToString(i)); + ASSERT_OK(DestroyDB(dbnames.back(), options)); + } + + // Verify empty DBs can be created in parallel + std::vector open_threads; + std::vector dbs{static_cast(kNumDbs), nullptr}; + options.create_if_missing = true; + for (int i = 0; i < kNumDbs; ++i) { + open_threads.emplace_back( + [&](int dbnum) { + ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum])); + }, + i); + } + + // Now add some data and close, so next we can verify non-empty DBs can be + // recovered in parallel + for (int i = 0; i < kNumDbs; ++i) { + open_threads[i].join(); + ASSERT_OK(dbs[i]->Put(WriteOptions(), "xi", "gua")); + delete dbs[i]; + } + + // Verify non-empty DBs can be recovered in parallel + dbs.clear(); + open_threads.clear(); + for (int i = 0; i < kNumDbs; ++i) { + open_threads.emplace_back( + [&](int dbnum) { + ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum])); + }, + i); + } + + // Wait and cleanup + for (int i = 0; i < kNumDbs; ++i) { + open_threads[i].join(); + delete dbs[i]; + ASSERT_OK(DestroyDB(dbnames[i], options)); + } +} +#endif // OS_WIN + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 097f4dba4..f2b8d9310 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -104,19 +104,19 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { } // namespace // format_version is the block format as defined in include/rocksdb/table.h -Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx, +Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info, CompressionType* type, uint32_t format_version, std::string* compressed_output) { - *type = compression_ctx.type(); - if (compression_ctx.type() == kNoCompression) { + *type = compression_info.type(); + if (compression_info.type() == kNoCompression) { return raw; } // Will return compressed block contents if (1) the compression method is // supported in this platform and (2) the compression rate is "good enough". - switch (compression_ctx.type()) { + switch (compression_info.type()) { case kSnappyCompression: - if (Snappy_Compress(compression_ctx, raw.data(), raw.size(), + if (Snappy_Compress(compression_info, raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; @@ -124,7 +124,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx, break; // fall back to no compression. case kZlibCompression: if (Zlib_Compress( - compression_ctx, + compression_info, GetCompressFormatForVersion(kZlibCompression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { @@ -133,7 +133,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx, break; // fall back to no compression. case kBZip2Compression: if (BZip2_Compress( - compression_ctx, + compression_info, GetCompressFormatForVersion(kBZip2Compression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { @@ -142,7 +142,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx, break; // fall back to no compression. case kLZ4Compression: if (LZ4_Compress( - compression_ctx, + compression_info, GetCompressFormatForVersion(kLZ4Compression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { @@ -151,7 +151,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx, break; // fall back to no compression. case kLZ4HCCompression: if (LZ4HC_Compress( - compression_ctx, + compression_info, GetCompressFormatForVersion(kLZ4HCCompression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { @@ -167,7 +167,7 @@ Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx, break; case kZSTD: case kZSTDNotFinalCompression: - if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(), + if (ZSTD_Compress(compression_info, raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; @@ -261,10 +261,12 @@ struct BlockBasedTableBuilder::Rep { PartitionedIndexBuilder* p_index_builder_ = nullptr; std::string last_key; - // Compression dictionary or nullptr - const std::string* compression_dict; + CompressionType compression_type; + CompressionOptions compression_opts; + CompressionDict compression_dict; CompressionContext compression_ctx; std::unique_ptr verify_ctx; + UncompressionDict verify_dict; TableProperties props; bool closed = false; // Either Finish() or Abandon() has been called. @@ -313,8 +315,15 @@ struct BlockBasedTableBuilder::Rep { table_options.data_block_hash_table_util_ratio), range_del_block(1 /* block_restart_interval */), internal_prefix_transform(_moptions.prefix_extractor.get()), - compression_dict(_compression_dict), - compression_ctx(_compression_type, _compression_opts), + compression_type(_compression_type), + compression_opts(_compression_opts), + compression_dict( + _compression_dict == nullptr ? Slice() : Slice(*_compression_dict), + _compression_type, _compression_opts.level), + compression_ctx(_compression_type), + verify_dict( + _compression_dict == nullptr ? Slice() : Slice(*_compression_dict), + _compression_type), use_delta_encoding_for_index_values(table_opt.format_version >= 4 && !table_opt.block_align), compressed_cache_key_prefix_size(0), @@ -355,7 +364,7 @@ struct BlockBasedTableBuilder::Rep { _moptions.prefix_extractor != nullptr)); if (table_options.verify_compression) { verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(), - compression_ctx.type())); + compression_type)); } } @@ -506,7 +515,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, assert(ok()); Rep* r = rep_; - auto type = r->compression_ctx.type(); + auto type = r->compression_type; Slice block_contents; bool abort_compression = false; @@ -514,24 +523,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)); if (raw_block_contents.size() < kCompressionSizeLimit) { - Slice compression_dict; - if (is_data_block && r->compression_dict && r->compression_dict->size()) { - r->compression_ctx.dict() = *r->compression_dict; - if (r->table_options.verify_compression) { - assert(r->verify_ctx != nullptr); - r->verify_ctx->dict() = *r->compression_dict; - } - } else { - // Clear dictionary - r->compression_ctx.dict() = Slice(); - if (r->table_options.verify_compression) { - assert(r->verify_ctx != nullptr); - r->verify_ctx->dict() = Slice(); - } - } - + CompressionInfo compression_info( + r->compression_opts, r->compression_ctx, + is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(), + r->compression_type); block_contents = - CompressBlock(raw_block_contents, r->compression_ctx, &type, + CompressBlock(raw_block_contents, compression_info, &type, r->table_options.format_version, &r->compressed_output); // Some of the compression algorithms are known to be unreliable. If @@ -540,8 +537,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, if (type != kNoCompression && r->table_options.verify_compression) { // Retrieve the uncompressed contents into a new buffer BlockContents contents; + UncompressionInfo uncompression_info( + *r->verify_ctx, + is_data_block ? r->verify_dict : UncompressionDict::GetEmptyDict(), + r->compression_type); Status stat = UncompressBlockContentsForCompressionType( - *r->verify_ctx, block_contents.data(), block_contents.size(), + uncompression_info, block_contents.data(), block_contents.size(), &contents, r->table_options.format_version, r->ioptions); if (stat.ok()) { @@ -805,7 +806,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock( ? rep_->ioptions.merge_operator->Name() : "nullptr"; rep_->props.compression_name = - CompressionTypeToString(rep_->compression_ctx.type()); + CompressionTypeToString(rep_->compression_type); rep_->props.prefix_extractor_name = rep_->moptions.prefix_extractor != nullptr ? rep_->moptions.prefix_extractor->Name() @@ -854,10 +855,10 @@ void BlockBasedTableBuilder::WritePropertiesBlock( void BlockBasedTableBuilder::WriteCompressionDictBlock( MetaIndexBuilder* meta_index_builder) { - if (rep_->compression_dict && rep_->compression_dict->size()) { + if (rep_->compression_dict.GetRawDict().size()) { BlockHandle compression_dict_block_handle; if (ok()) { - WriteRawBlock(*rep_->compression_dict, kNoCompression, + WriteRawBlock(rep_->compression_dict.GetRawDict(), kNoCompression, &compression_dict_block_handle); } if (ok()) { diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 4dc715fd9..bc2a7bf76 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -133,7 +133,7 @@ class BlockBasedTableBuilder : public TableBuilder { const uint64_t kCompressionSizeLimit = std::numeric_limits::max(); }; -Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx, +Slice CompressBlock(const Slice& raw, const CompressionInfo& info, CompressionType* type, uint32_t format_version, std::string* compressed_output); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 392a2a08c..8debd509a 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1334,8 +1334,10 @@ Status BlockBasedTable::GetDataBlockFromCache( // Retrieve the uncompressed contents into a new buffer BlockContents contents; - UncompressionContext uncompresssion_ctx(compression_type, compression_dict); - s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data.data(), + UncompressionContext context(compression_type); + UncompressionDict dict(compression_dict, compression_type); + UncompressionInfo info(context, dict, compression_type); + s = UncompressBlockContents(info, compressed_block->data.data(), compressed_block->data.size(), &contents, rep->table_options.format_version, rep->ioptions, GetMemoryAllocator(rep->table_options)); @@ -1412,12 +1414,13 @@ Status BlockBasedTable::PutDataBlockToCache( BlockContents uncompressed_block_contents; Statistics* statistics = ioptions.statistics; if (raw_block_comp_type != kNoCompression) { - UncompressionContext uncompression_ctx(raw_block_comp_type, - compression_dict); - s = UncompressBlockContents( - uncompression_ctx, raw_block_contents->data.data(), - raw_block_contents->data.size(), &uncompressed_block_contents, - format_version, ioptions, memory_allocator); + UncompressionContext context(raw_block_comp_type); + UncompressionDict dict(compression_dict, raw_block_comp_type); + UncompressionInfo info(context, dict, raw_block_comp_type); + s = UncompressBlockContents(info, raw_block_contents->data.data(), + raw_block_contents->data.size(), + &uncompressed_block_contents, format_version, + ioptions, memory_allocator); } if (!s.ok()) { return s; diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 9ad254a59..5a0da1157 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -256,11 +256,12 @@ Status BlockFetcher::ReadBlockContents() { if (do_uncompress_ && compression_type_ != kNoCompression) { // compressed page, uncompress, update cache - UncompressionContext uncompression_ctx(compression_type_, - compression_dict_); - status_ = UncompressBlockContents(uncompression_ctx, slice_.data(), - block_size_, contents_, footer_.version(), - ioptions_, memory_allocator_); + UncompressionContext context(compression_type_); + UncompressionDict dict(compression_dict_, compression_type_); + UncompressionInfo info(context, dict, compression_type_); + status_ = UncompressBlockContents(info, slice_.data(), block_size_, + contents_, footer_.version(), ioptions_, + memory_allocator_); compression_type_ = kNoCompression; } else { GetBlockContents(); diff --git a/table/format.cc b/table/format.cc index 0e43e8243..d37b54912 100644 --- a/table/format.cc +++ b/table/format.cc @@ -278,18 +278,18 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, } Status UncompressBlockContentsForCompressionType( - const UncompressionContext& uncompression_ctx, const char* data, size_t n, + const UncompressionInfo& uncompression_info, const char* data, size_t n, BlockContents* contents, uint32_t format_version, const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) { CacheAllocationPtr ubuf; - assert(uncompression_ctx.type() != kNoCompression && + assert(uncompression_info.type() != kNoCompression && "Invalid compression type"); StopWatchNano timer(ioptions.env, ShouldReportDetailedTime(ioptions.env, ioptions.statistics)); int decompress_size = 0; - switch (uncompression_ctx.type()) { + switch (uncompression_info.type()) { case kSnappyCompression: { size_t ulength = 0; static char snappy_corrupt_msg[] = @@ -306,7 +306,7 @@ Status UncompressBlockContentsForCompressionType( } case kZlibCompression: ubuf = Zlib_Uncompress( - uncompression_ctx, data, n, &decompress_size, + uncompression_info, data, n, &decompress_size, GetCompressFormatForVersion(kZlibCompression, format_version), allocator); if (!ubuf) { @@ -330,7 +330,7 @@ Status UncompressBlockContentsForCompressionType( break; case kLZ4Compression: ubuf = LZ4_Uncompress( - uncompression_ctx, data, n, &decompress_size, + uncompression_info, data, n, &decompress_size, GetCompressFormatForVersion(kLZ4Compression, format_version), allocator); if (!ubuf) { @@ -342,7 +342,7 @@ Status UncompressBlockContentsForCompressionType( break; case kLZ4HCCompression: ubuf = LZ4_Uncompress( - uncompression_ctx, data, n, &decompress_size, + uncompression_info, data, n, &decompress_size, GetCompressFormatForVersion(kLZ4HCCompression, format_version), allocator); if (!ubuf) { @@ -365,7 +365,7 @@ Status UncompressBlockContentsForCompressionType( break; case kZSTD: case kZSTDNotFinalCompression: - ubuf = ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size, + ubuf = ZSTD_Uncompress(uncompression_info, data, n, &decompress_size, allocator); if (!ubuf) { static char zstd_corrupt_msg[] = @@ -395,14 +395,14 @@ Status UncompressBlockContentsForCompressionType( // buffer is returned via 'result' and it is upto the caller to // free this buffer. // format_version is the block format as defined in include/rocksdb/table.h -Status UncompressBlockContents(const UncompressionContext& uncompression_ctx, +Status UncompressBlockContents(const UncompressionInfo& uncompression_info, const char* data, size_t n, BlockContents* contents, uint32_t format_version, const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) { assert(data[n] != kNoCompression); - assert(data[n] == uncompression_ctx.type()); - return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n, + assert(data[n] == uncompression_info.type()); + return UncompressBlockContentsForCompressionType(uncompression_info, data, n, contents, format_version, ioptions, allocator); } diff --git a/table/format.h b/table/format.h index ae2bdafcc..dde38214d 100644 --- a/table/format.h +++ b/table/format.h @@ -277,16 +277,18 @@ extern Status ReadBlockContents( // free this buffer. // For description of compress_format_version and possible values, see // util/compression.h -extern Status UncompressBlockContents( - const UncompressionContext& uncompression_ctx, const char* data, size_t n, - BlockContents* contents, uint32_t compress_format_version, - const ImmutableCFOptions& ioptions, MemoryAllocator* allocator = nullptr); +extern Status UncompressBlockContents(const UncompressionInfo& info, + const char* data, size_t n, + BlockContents* contents, + uint32_t compress_format_version, + const ImmutableCFOptions& ioptions, + MemoryAllocator* allocator = nullptr); // This is an extension to UncompressBlockContents that accepts // a specific compression type. This is used by un-wrapped blocks // with no compression header. extern Status UncompressBlockContentsForCompressionType( - const UncompressionContext& uncompression_ctx, const char* data, size_t n, + const UncompressionInfo& info, const char* data, size_t n, BlockContents* contents, uint32_t compress_format_version, const ImmutableCFOptions& ioptions, MemoryAllocator* allocator = nullptr); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 3bd7f92da..37bbcc485 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -2051,28 +2051,28 @@ class Benchmark { return true; } - inline bool CompressSlice(const CompressionContext& compression_ctx, + inline bool CompressSlice(const CompressionInfo& compression_info, const Slice& input, std::string* compressed) { bool ok = true; switch (FLAGS_compression_type_e) { case rocksdb::kSnappyCompression: - ok = Snappy_Compress(compression_ctx, input.data(), input.size(), + ok = Snappy_Compress(compression_info, input.data(), input.size(), compressed); break; case rocksdb::kZlibCompression: - ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(), + ok = Zlib_Compress(compression_info, 2, input.data(), input.size(), compressed); break; case rocksdb::kBZip2Compression: - ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(), + ok = BZip2_Compress(compression_info, 2, input.data(), input.size(), compressed); break; case rocksdb::kLZ4Compression: - ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(), + ok = LZ4_Compress(compression_info, 2, input.data(), input.size(), compressed); break; case rocksdb::kLZ4HCCompression: - ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(), + ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(), compressed); break; case rocksdb::kXpressCompression: @@ -2080,7 +2080,7 @@ class Benchmark { input.size(), compressed); break; case rocksdb::kZSTD: - ok = ZSTD_Compress(compression_ctx, input.data(), input.size(), + ok = ZSTD_Compress(compression_info, input.data(), input.size(), compressed); break; default: @@ -2163,10 +2163,11 @@ class Benchmark { const int len = FLAGS_block_size; std::string input_str(len, 'y'); std::string compressed; - CompressionContext compression_ctx(FLAGS_compression_type_e, - Options().compression_opts); - bool result = - CompressSlice(compression_ctx, Slice(input_str), &compressed); + CompressionOptions opts; + CompressionContext context(FLAGS_compression_type_e); + CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), + FLAGS_compression_type_e); + bool result = CompressSlice(info, Slice(input_str), &compressed); if (!result) { fprintf(stdout, "WARNING: %s compression is not enabled\n", @@ -3020,13 +3021,14 @@ void VerifyDBFromDB(std::string& truth_db_name) { int64_t produced = 0; bool ok = true; std::string compressed; - CompressionContext compression_ctx(FLAGS_compression_type_e, - Options().compression_opts); - + CompressionOptions opts; + CompressionContext context(FLAGS_compression_type_e); + CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), + FLAGS_compression_type_e); // Compress 1G while (ok && bytes < int64_t(1) << 30) { compressed.clear(); - ok = CompressSlice(compression_ctx, input, &compressed); + ok = CompressSlice(info, input, &compressed); produced += compressed.size(); bytes += input.size(); thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress); @@ -3048,11 +3050,17 @@ void VerifyDBFromDB(std::string& truth_db_name) { Slice input = gen.Generate(FLAGS_block_size); std::string compressed; + CompressionContext compression_ctx(FLAGS_compression_type_e); + CompressionOptions compression_opts; + CompressionInfo compression_info(compression_opts, compression_ctx, + CompressionDict::GetEmptyDict(), + FLAGS_compression_type_e); UncompressionContext uncompression_ctx(FLAGS_compression_type_e); - CompressionContext compression_ctx(FLAGS_compression_type_e, - Options().compression_opts); + UncompressionInfo uncompression_info(uncompression_ctx, + UncompressionDict::GetEmptyDict(), + FLAGS_compression_type_e); - bool ok = CompressSlice(compression_ctx, input, &compressed); + bool ok = CompressSlice(compression_info, input, &compressed); int64_t bytes = 0; int decompress_size; while (ok && bytes < 1024 * 1048576) { @@ -3072,7 +3080,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { break; } case rocksdb::kZlibCompression: - uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(), + uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(), compressed.size(), &decompress_size, 2); ok = uncompressed.get() != nullptr; break; @@ -3082,12 +3090,12 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = uncompressed.get() != nullptr; break; case rocksdb::kLZ4Compression: - uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), + uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(), compressed.size(), &decompress_size, 2); ok = uncompressed.get() != nullptr; break; case rocksdb::kLZ4HCCompression: - uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), + uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(), compressed.size(), &decompress_size, 2); ok = uncompressed.get() != nullptr; break; @@ -3097,7 +3105,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = uncompressed.get() != nullptr; break; case rocksdb::kZSTD: - uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(), + uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(), compressed.size(), &decompress_size); ok = uncompressed.get() != nullptr; break; diff --git a/util/compression.h b/util/compression.h index e91faeac6..1b73fff76 100644 --- a/util/compression.h +++ b/util/compression.h @@ -135,12 +135,127 @@ class ZSTDUncompressCachedData { namespace rocksdb { -// Instantiate this class and pass it to the uncompression API below +// Holds dictionary and related data, like ZSTD's digested compression +// dictionary. +struct CompressionDict { +#if ZSTD_VERSION_NUMBER >= 700 + ZSTD_CDict* zstd_cdict_ = nullptr; +#endif // ZSTD_VERSION_NUMBER >= 700 + Slice dict_; + + public: +#if ZSTD_VERSION_NUMBER >= 700 + CompressionDict(Slice dict, CompressionType type, int level) { +#else // ZSTD_VERSION_NUMBER >= 700 + CompressionDict(Slice dict, CompressionType /*type*/, int /*level*/) { +#endif // ZSTD_VERSION_NUMBER >= 700 + dict_ = std::move(dict); +#if ZSTD_VERSION_NUMBER >= 700 + zstd_cdict_ = nullptr; + if (!dict_.empty() && (type == kZSTD || type == kZSTDNotFinalCompression)) { + if (level == CompressionOptions::kDefaultCompressionLevel) { + // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see + // https://github.com/facebook/zstd/issues/1148 + level = 3; + } + // Should be safe (but slower) if below call fails as we'll use the + // raw dictionary to compress. + zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level); + assert(zstd_cdict_ != nullptr); + } +#endif // ZSTD_VERSION_NUMBER >= 700 + } + + ~CompressionDict() { +#if ZSTD_VERSION_NUMBER >= 700 + size_t res = 0; + if (zstd_cdict_ != nullptr) { + res = ZSTD_freeCDict(zstd_cdict_); + } + assert(res == 0); // Last I checked they can't fail + (void)res; // prevent unused var warning +#endif // ZSTD_VERSION_NUMBER >= 700 + } + +#if ZSTD_VERSION_NUMBER >= 700 + const ZSTD_CDict* GetDigestedZstdCDict() const { + return zstd_cdict_; + } +#endif // ZSTD_VERSION_NUMBER >= 700 + + Slice GetRawDict() const { return dict_; } + + static const CompressionDict& GetEmptyDict() { + static CompressionDict empty_dict{}; + return empty_dict; + } + + CompressionDict() = default; + // Disable copy/move + CompressionDict(const CompressionDict&) = delete; + CompressionDict& operator=(const CompressionDict&) = delete; + CompressionDict(CompressionDict&&) = delete; + CompressionDict& operator=(CompressionDict&&) = delete; +}; + +// Holds dictionary and related data, like ZSTD's digested uncompression +// dictionary. +struct UncompressionDict { +#if ZSTD_VERSION_NUMBER >= 700 + ZSTD_DDict* zstd_ddict_; +#endif // ZSTD_VERSION_NUMBER >= 700 + Slice dict_; + +#if ZSTD_VERSION_NUMBER >= 700 + UncompressionDict(Slice dict, CompressionType type) { +#else // ZSTD_VERSION_NUMBER >= 700 + UncompressionDict(Slice dict, CompressionType /*type*/) { +#endif // ZSTD_VERSION_NUMBER >= 700 + dict_ = std::move(dict); +#if ZSTD_VERSION_NUMBER >= 700 + zstd_ddict_ = nullptr; + if (!dict_.empty() && (type == kZSTD || type == kZSTDNotFinalCompression)) { + zstd_ddict_ = ZSTD_createDDict(dict_.data(), dict_.size()); + assert(zstd_ddict_ != nullptr); + } +#endif // ZSTD_VERSION_NUMBER >= 700 + } + + ~UncompressionDict() { +#if ZSTD_VERSION_NUMBER >= 700 + size_t res = 0; + if (zstd_ddict_ != nullptr) { + res = ZSTD_freeDDict(zstd_ddict_); + } + assert(res == 0); // Last I checked they can't fail + (void)res; // prevent unused var warning +#endif // ZSTD_VERSION_NUMBER >= 700 + } + +#if ZSTD_VERSION_NUMBER >= 700 + const ZSTD_DDict* GetDigestedZstdDDict() const { + return zstd_ddict_; + } +#endif // ZSTD_VERSION_NUMBER >= 700 + + Slice GetRawDict() const { return dict_; } + + static const UncompressionDict& GetEmptyDict() { + static UncompressionDict empty_dict{}; + return empty_dict; + } + + UncompressionDict() = default; + // Disable copy/move + UncompressionDict(const CompressionDict&) = delete; + UncompressionDict& operator=(const CompressionDict&) = delete; + UncompressionDict(CompressionDict&&) = delete; + UncompressionDict& operator=(CompressionDict&&) = delete; +}; + class CompressionContext { private: const CompressionType type_; - const CompressionOptions opts_; - Slice dict_; #if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500) ZSTD_CCtx* zstd_ctx_ = nullptr; void CreateNativeContext() { @@ -165,6 +280,7 @@ class CompressionContext { assert(type_ == kZSTD || type_ == kZSTDNotFinalCompression); return zstd_ctx_; } + #else // ZSTD && (ZSTD_VERSION_NUMBER >= 500) private: void CreateNativeContext() {} @@ -172,28 +288,35 @@ class CompressionContext { #endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500) public: explicit CompressionContext(CompressionType comp_type) : type_(comp_type) { - CreateNativeContext(); - } - CompressionContext(CompressionType comp_type, const CompressionOptions& opts, - const Slice& comp_dict = Slice()) - : type_(comp_type), opts_(opts), dict_(comp_dict) { + (void)type_; CreateNativeContext(); } ~CompressionContext() { DestroyNativeContext(); } CompressionContext(const CompressionContext&) = delete; CompressionContext& operator=(const CompressionContext&) = delete; - - const CompressionOptions& options() const { return opts_; } - CompressionType type() const { return type_; } - const Slice& dict() const { return dict_; } - Slice& dict() { return dict_; } }; -// Instantiate this class and pass it to the uncompression API below +class CompressionInfo { + const CompressionOptions& opts_; + const CompressionContext& context_; + const CompressionDict& dict_; + const CompressionType type_; + + public: + CompressionInfo(const CompressionOptions& _opts, + const CompressionContext& _context, + const CompressionDict& _dict, CompressionType _type) + : opts_(_opts), context_(_context), dict_(_dict), type_(_type) {} + + const CompressionOptions& options() const { return opts_; } + const CompressionContext& context() const { return context_; } + const CompressionDict& dict() const { return dict_; } + CompressionType type() const { return type_; } +}; + class UncompressionContext { private: - CompressionType type_; - Slice dict_; + const CompressionType type_; CompressionContextCache* ctx_cache_ = nullptr; ZSTDUncompressCachedData uncomp_cached_data_; @@ -201,10 +324,8 @@ class UncompressionContext { struct NoCache {}; // Do not use context cache, used by TableBuilder UncompressionContext(NoCache, CompressionType comp_type) : type_(comp_type) {} - explicit UncompressionContext(CompressionType comp_type) - : UncompressionContext(comp_type, Slice()) {} - UncompressionContext(CompressionType comp_type, const Slice& comp_dict) - : type_(comp_type), dict_(comp_dict) { + + explicit UncompressionContext(CompressionType comp_type) : type_(comp_type) { if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) { ctx_cache_ = CompressionContextCache::Instance(); uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData(); @@ -224,9 +345,21 @@ class UncompressionContext { ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const { return uncomp_cached_data_.Get(); } +}; + +class UncompressionInfo { + const UncompressionContext& context_; + const UncompressionDict& dict_; + const CompressionType type_; + + public: + UncompressionInfo(const UncompressionContext& _context, + const UncompressionDict& _dict, CompressionType _type) + : context_(_context), dict_(_dict), type_(_type) {} + + const UncompressionContext& context() const { return context_; } + const UncompressionDict& dict() const { return dict_; } CompressionType type() const { return type_; } - const Slice& dict() const { return dict_; } - Slice& dict() { return dict_; } }; inline bool Snappy_Supported() { @@ -345,9 +478,8 @@ inline std::string CompressionTypeToString(CompressionType compression_type) { // 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the // start of compressed block. Snappy format is the same as version 1. -inline bool Snappy_Compress(const CompressionContext& /*ctx*/, - const char* input, size_t length, - ::std::string* output) { +inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input, + size_t length, ::std::string* output) { #ifdef SNAPPY output->resize(snappy::MaxCompressedLength(length)); size_t outlen; @@ -412,7 +544,7 @@ inline bool GetDecompressedSizeInfo(const char** input_data, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool Zlib_Compress(const CompressionContext& ctx, +inline bool Zlib_Compress(const CompressionInfo& info, uint32_t compress_format_version, const char* input, size_t length, ::std::string* output) { #ifdef ZLIB @@ -437,24 +569,25 @@ inline bool Zlib_Compress(const CompressionContext& ctx, // The default value is 8. See zconf.h for more details. static const int memLevel = 8; int level; - if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { + if (info.options().level == CompressionOptions::kDefaultCompressionLevel) { level = Z_DEFAULT_COMPRESSION; } else { - level = ctx.options().level; + level = info.options().level; } z_stream _stream; memset(&_stream, 0, sizeof(z_stream)); - int st = deflateInit2(&_stream, level, Z_DEFLATED, ctx.options().window_bits, - memLevel, ctx.options().strategy); + int st = deflateInit2(&_stream, level, Z_DEFLATED, info.options().window_bits, + memLevel, info.options().strategy); if (st != Z_OK) { return false; } - if (ctx.dict().size()) { + Slice compression_dict = info.dict().GetRawDict(); + if (compression_dict.size()) { // Initialize the compression library's dictionary - st = deflateSetDictionary(&_stream, - reinterpret_cast(ctx.dict().data()), - static_cast(ctx.dict().size())); + st = deflateSetDictionary( + &_stream, reinterpret_cast(compression_dict.data()), + static_cast(compression_dict.size())); if (st != Z_OK) { deflateEnd(&_stream); return false; @@ -482,7 +615,7 @@ inline bool Zlib_Compress(const CompressionContext& ctx, deflateEnd(&_stream); return compressed; #else - (void)ctx; + (void)info; (void)compress_format_version; (void)input; (void)length; @@ -498,8 +631,8 @@ inline bool Zlib_Compress(const CompressionContext& ctx, // @param compression_dict Data for presetting the compression library's // dictionary. inline CacheAllocationPtr Zlib_Uncompress( - const UncompressionContext& ctx, const char* input_data, - size_t input_length, int* decompress_size, uint32_t compress_format_version, + const UncompressionInfo& info, const char* input_data, size_t input_length, + int* decompress_size, uint32_t compress_format_version, MemoryAllocator* allocator = nullptr, int windowBits = -14) { #ifdef ZLIB uint32_t output_len = 0; @@ -529,11 +662,12 @@ inline CacheAllocationPtr Zlib_Uncompress( return nullptr; } - if (ctx.dict().size()) { + Slice compression_dict = info.dict().GetRawDict(); + if (compression_dict.size()) { // Initialize the compression library's dictionary - st = inflateSetDictionary(&_stream, - reinterpret_cast(ctx.dict().data()), - static_cast(ctx.dict().size())); + st = inflateSetDictionary( + &_stream, reinterpret_cast(compression_dict.data()), + static_cast(compression_dict.size())); if (st != Z_OK) { return nullptr; } @@ -584,7 +718,7 @@ inline CacheAllocationPtr Zlib_Uncompress( inflateEnd(&_stream); return output; #else - (void)ctx; + (void)info; (void)input_data; (void)input_length; (void)decompress_size; @@ -599,7 +733,7 @@ inline CacheAllocationPtr Zlib_Uncompress( // block header // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format -inline bool BZip2_Compress(const CompressionContext& /*ctx*/, +inline bool BZip2_Compress(const CompressionInfo& /*info*/, uint32_t compress_format_version, const char* input, size_t length, ::std::string* output) { #ifdef BZIP2 @@ -745,7 +879,7 @@ inline CacheAllocationPtr BZip2_Uncompress( // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool LZ4_Compress(const CompressionContext& ctx, +inline bool LZ4_Compress(const CompressionInfo& info, uint32_t compress_format_version, const char* input, size_t length, ::std::string* output) { #ifdef LZ4 @@ -773,9 +907,10 @@ inline bool LZ4_Compress(const CompressionContext& ctx, int outlen; #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_stream_t* stream = LZ4_createStream(); - if (ctx.dict().size()) { - LZ4_loadDict(stream, ctx.dict().data(), - static_cast(ctx.dict().size())); + Slice compression_dict = info.dict().GetRawDict(); + if (compression_dict.size()) { + LZ4_loadDict(stream, compression_dict.data(), + static_cast(compression_dict.size())); } #if LZ4_VERSION_NUMBER >= 10700 // r129+ outlen = @@ -799,7 +934,7 @@ inline bool LZ4_Compress(const CompressionContext& ctx, output->resize(static_cast(output_header_len + outlen)); return true; #else // LZ4 - (void)ctx; + (void)info; (void)compress_format_version; (void)input; (void)length; @@ -814,7 +949,7 @@ inline bool LZ4_Compress(const CompressionContext& ctx, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx, +inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info, const char* input_data, size_t input_length, int* decompress_size, @@ -842,9 +977,10 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx, auto output = AllocateBlock(output_len, allocator); #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); - if (ctx.dict().size()) { - LZ4_setStreamDecode(stream, ctx.dict().data(), - static_cast(ctx.dict().size())); + Slice compression_dict = info.dict().GetRawDict(); + if (compression_dict.size()) { + LZ4_setStreamDecode(stream, compression_dict.data(), + static_cast(compression_dict.size())); } *decompress_size = LZ4_decompress_safe_continue( stream, input_data, output.get(), static_cast(input_length), @@ -863,7 +999,7 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx, assert(*decompress_size == static_cast(output_len)); return output; #else // LZ4 - (void)ctx; + (void)info; (void)input_data; (void)input_length; (void)decompress_size; @@ -879,7 +1015,7 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool LZ4HC_Compress(const CompressionContext& ctx, +inline bool LZ4HC_Compress(const CompressionInfo& info, uint32_t compress_format_version, const char* input, size_t length, ::std::string* output) { #ifdef LZ4 @@ -906,17 +1042,18 @@ inline bool LZ4HC_Compress(const CompressionContext& ctx, int outlen; int level; - if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { + if (info.options().level == CompressionOptions::kDefaultCompressionLevel) { level = 0; // lz4hc.h says any value < 1 will be sanitized to default } else { - level = ctx.options().level; + level = info.options().level; } #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamHC_t* stream = LZ4_createStreamHC(); LZ4_resetStreamHC(stream, level); + Slice compression_dict = info.dict().GetRawDict(); const char* compression_dict_data = - ctx.dict().size() > 0 ? ctx.dict().data() : nullptr; - size_t compression_dict_size = ctx.dict().size(); + compression_dict.size() > 0 ? compression_dict.data() : nullptr; + size_t compression_dict_size = compression_dict.size(); LZ4_loadDictHC(stream, compression_dict_data, static_cast(compression_dict_size)); @@ -947,7 +1084,7 @@ inline bool LZ4HC_Compress(const CompressionContext& ctx, output->resize(static_cast(output_header_len + outlen)); return true; #else // LZ4 - (void)ctx; + (void)info; (void)compress_format_version; (void)input; (void)length; @@ -981,9 +1118,7 @@ inline char* XPRESS_Uncompress(const char* /*input_data*/, } #endif -// @param compression_dict Data for presetting the compression library's -// dictionary. -inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input, +inline bool ZSTD_Compress(const CompressionInfo& info, const char* input, size_t length, ::std::string* output) { #ifdef ZSTD if (length > std::numeric_limits::max()) { @@ -998,19 +1133,29 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input, output->resize(static_cast(output_header_len + compressBound)); size_t outlen = 0; int level; - if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { + if (info.options().level == CompressionOptions::kDefaultCompressionLevel) { // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see // https://github.com/facebook/zstd/issues/1148 level = 3; } else { - level = ctx.options().level; + level = info.options().level; } #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ - ZSTD_CCtx* context = ctx.ZSTDPreallocCtx(); + ZSTD_CCtx* context = info.context().ZSTDPreallocCtx(); assert(context != nullptr); - outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len], - compressBound, input, length, - ctx.dict().data(), ctx.dict().size(), level); +#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+ + if (info.dict().GetDigestedZstdCDict() != nullptr) { + outlen = ZSTD_compress_usingCDict(context, &(*output)[output_header_len], + compressBound, input, length, + info.dict().GetDigestedZstdCDict()); + } +#endif // ZSTD_VERSION_NUMBER >= 700 + if (outlen == 0) { + outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len], + compressBound, input, length, + info.dict().GetRawDict().data(), + info.dict().GetRawDict().size(), level); + } #else // up to v0.4.x outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input, length, level); @@ -1021,7 +1166,7 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input, output->resize(output_header_len + outlen); return true; #else // ZSTD - (void)ctx; + (void)info; (void)input; (void)length; (void)output; @@ -1032,9 +1177,8 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input, // @param compression_dict Data for presetting the compression library's // dictionary. inline CacheAllocationPtr ZSTD_Uncompress( - const UncompressionContext& ctx, const char* input_data, - size_t input_length, int* decompress_size, - MemoryAllocator* allocator = nullptr) { + const UncompressionInfo& info, const char* input_data, size_t input_length, + int* decompress_size, MemoryAllocator* allocator = nullptr) { #ifdef ZSTD uint32_t output_len = 0; if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, @@ -1043,14 +1187,24 @@ inline CacheAllocationPtr ZSTD_Uncompress( } auto output = AllocateBlock(output_len, allocator); - size_t actual_output_length; + size_t actual_output_length = 0; #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ - ZSTD_DCtx* context = ctx.GetZSTDContext(); + ZSTD_DCtx* context = info.context().GetZSTDContext(); assert(context != nullptr); - actual_output_length = ZSTD_decompress_usingDict( - context, output.get(), output_len, input_data, input_length, - ctx.dict().data(), ctx.dict().size()); +#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+ + if (info.dict().GetDigestedZstdDDict() != nullptr) { + actual_output_length = ZSTD_decompress_usingDDict( + context, output.get(), output_len, input_data, input_length, + info.dict().GetDigestedZstdDDict()); + } +#endif // ZSTD_VERSION_NUMBER >= 700 + if (actual_output_length == 0) { + actual_output_length = ZSTD_decompress_usingDict( + context, output.get(), output_len, input_data, input_length, + info.dict().GetRawDict().data(), info.dict().GetRawDict().size()); + } #else // up to v0.4.x + (void)info; actual_output_length = ZSTD_decompress(output.get(), output_len, input_data, input_length); #endif // ZSTD_VERSION_NUMBER >= 500 @@ -1058,7 +1212,7 @@ inline CacheAllocationPtr ZSTD_Uncompress( *decompress_size = static_cast(actual_output_length); return output; #else // ZSTD - (void)ctx; + (void)info; (void)input_data; (void)input_length; (void)decompress_size; @@ -1067,6 +1221,17 @@ inline CacheAllocationPtr ZSTD_Uncompress( #endif } +inline bool ZSTD_TrainDictionarySupported() { +#ifdef ZSTD + // Dictionary trainer is available since v0.6.1 for static linking, but not + // available for dynamic linking until v1.1.3. For now we enable the feature + // in v1.1.3+ only. + return (ZSTD_versionNumber() >= 10103); +#else + return false; +#endif +} + inline std::string ZSTD_TrainDictionary(const std::string& samples, const std::vector& sample_lens, size_t max_dict_bytes) { @@ -1074,6 +1239,10 @@ inline std::string ZSTD_TrainDictionary(const std::string& samples, // available for dynamic linking until v1.1.3. For now we enable the feature // in v1.1.3+ only. #if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+ + assert(samples.empty() == sample_lens.empty()); + if (samples.empty()) { + return ""; + } std::string dict_data(max_dict_bytes, '\0'); size_t dict_len = ZDICT_trainFromBuffer( &dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0], diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index bdec65462..5dc9f01f0 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -761,9 +761,11 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, return raw; } StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS); - CompressionType ct = bdb_options_.compression; - CompressionContext compression_ctx(ct); - CompressBlock(raw, compression_ctx, &ct, kBlockBasedTableVersionFormat, + CompressionType type = bdb_options_.compression; + CompressionOptions opts; + CompressionContext context(type); + CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type); + CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, compression_output); return *compression_output; } @@ -1100,9 +1102,11 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, { StopWatch decompression_sw(env_, statistics_, BLOB_DB_DECOMPRESSION_MICROS); - UncompressionContext uncompression_ctx(bfile->compression()); + UncompressionContext context(bfile->compression()); + UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), + bfile->compression()); s = UncompressBlockContentsForCompressionType( - uncompression_ctx, blob_value.data(), blob_value.size(), &contents, + info, blob_value.data(), blob_value.size(), &contents, kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions())); } value->PinSelf(contents.data); diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index 7ce0697e3..37eee19db 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -208,10 +208,13 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob, if (compression != kNoCompression && (show_uncompressed_blob != DisplayType::kNone || show_summary)) { BlockContents contents; - UncompressionContext uncompression_ctx(compression); + UncompressionContext context(compression); + UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), + compression); s = UncompressBlockContentsForCompressionType( - uncompression_ctx, slice.data() + key_size, static_cast(value_size), - &contents, 2 /*compress_format_version*/, ImmutableCFOptions(Options())); + info, slice.data() + key_size, static_cast(value_size), + &contents, 2 /*compress_format_version*/, + ImmutableCFOptions(Options())); if (!s.ok()) { return s; }