From 1adbceb5810d6ee1694dcbdaf41aebd809fb265f Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Mon, 2 Nov 2020 19:20:15 -0800 Subject: [PATCH] Expand effect of dictionary settings in `ColumnFamilyOptions::compression_opts` (#7619) Summary: In dictionary compression's initial implementation, in order to save CPU overhead, we only enabled it for bottom level under the assumption that the vast majority of data is stored there. At that time, there was no such thing as `ColumnFamilyOptions::bottommost_compression_opts`, so we just hardcoded disabling dictionary compression in flush and compactions to non-bottommost level. Now, we have users who generate all their files through flush and are considering using dictionary compression. To support such a use case, this PR expands the scope of `ColumnFamilyOptions::compression_opts` to additionally include flushed files and files generated by compaction to a non-bottommost level. Users can still get the old behavior by moving their dictionary settings to `ColumnFamilyOptions::bottommost_compression_opts` and explicitly enabling both that and `ColumnFamilyOptions::bottommost_compression`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7619 Reviewed By: ltamasi Differential Revision: D24665610 Pulled By: ajkr fbshipit-source-id: 656b90bce1033fe21c71e09af931ef5bde3e464c --- HISTORY.md | 3 + db/builder.cc | 7 +- db/compaction/compaction.cc | 6 -- db/db_block_cache_test.cc | 8 +- db/db_test2.cc | 173 ++++++++++++++++++++++++++++++++++++ options/options_helper.cc | 11 +++ options/options_helper.h | 2 + util/compression.h | 37 ++++++++ 8 files changed, 233 insertions(+), 14 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 326e06d98..ea44e6070 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -14,6 +14,9 @@ ### Public API Change * Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options. +### Behavior Changes +* The dictionary compression settings specified in `ColumnFamilyOptions::compression_opts` now additionally affect files generated by flush and compaction to non-bottommost level. Previously those settings at most affected files generated by compaction to bottommost level, depending on whether `ColumnFamilyOptions::bottommost_compression_opts` overrode them. Users who relied on dictionary compression settings in `ColumnFamilyOptions::compression_opts` affecting only the bottommost level can keep the behavior by moving their dictionary settings to `ColumnFamilyOptions::bottommost_compression_opts` and setting its `enabled` flag. + ## 6.14 (10/09/2020) ### Bug fixes * Fixed a bug after a `CompactRange()` with `CompactRangeOptions::change_level` set fails due to a conflict in the level change step, which caused all subsequent calls to `CompactRange()` with `CompactRangeOptions::change_level` set to incorrectly fail with a `Status::NotSupported("another thread is refitting")` error. diff --git a/db/builder.cc b/db/builder.cc index 206ff257f..543f957b5 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -124,11 +124,6 @@ Status BuildTable( if (iter->Valid() || !range_del_agg->IsEmpty()) { TableBuilder* builder; std::unique_ptr file_writer; - // Currently we only enable dictionary compression during compaction to the - // bottommost level. - CompressionOptions compression_opts_for_flush(compression_opts); - compression_opts_for_flush.max_dict_bytes = 0; - compression_opts_for_flush.zstd_max_train_bytes = 0; { std::unique_ptr file; #ifndef NDEBUG @@ -160,7 +155,7 @@ Status BuildTable( ioptions, mutable_cf_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, column_family_name, file_writer.get(), compression, - sample_for_compression, compression_opts_for_flush, level, + sample_for_compression, compression_opts, level, false /* skip_filters */, creation_time, oldest_key_time, 0 /*target_file_size*/, file_creation_time, db_id, db_session_id); } diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index efe27870d..2550e0c47 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -248,12 +248,6 @@ Compaction::Compaction(VersionStorageInfo* vstorage, if (max_subcompactions_ == 0) { max_subcompactions_ = _mutable_db_options.max_subcompactions; } - if (!bottommost_level_) { - // Currently we only enable dictionary compression during compaction to the - // bottommost level. - output_compression_opts_.max_dict_bytes = 0; - output_compression_opts_.zstd_max_train_bytes = 0; - } #ifndef NDEBUG for (size_t i = 1; i < inputs_.size(); ++i) { diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index 239b4660b..57c4d47c0 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -837,8 +837,9 @@ TEST_F(DBBlockCacheTest, CacheCompressionDict) { Random rnd(301); for (auto compression_type : compression_types) { Options options = CurrentOptions(); - options.compression = compression_type; - options.compression_opts.max_dict_bytes = 4096; + options.bottommost_compression = compression_type; + options.bottommost_compression_opts.max_dict_bytes = 4096; + options.bottommost_compression_opts.enabled = true; options.create_if_missing = true; options.num_levels = 2; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); @@ -991,6 +992,9 @@ TEST_P(DBBlockCachePinningTest, TwoLevelDB) { ++expected_index_misses; } } + if (unpartitioned_pinning_ == PinningTier::kNone) { + ++expected_compression_dict_misses; + } ASSERT_EQ(expected_filter_misses, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); ASSERT_EQ(expected_index_misses, diff --git a/db/db_test2.cc b/db/db_test2.cc index b3875f845..e0ee2f149 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -12,6 +12,7 @@ #include "db/db_test_util.h" #include "db/read_callback.h" +#include "options/options_helper.h" #include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/persistent_cache.h" @@ -1390,6 +1391,178 @@ TEST_F(DBTest2, PresetCompressionDictLocality) { } } +class PresetCompressionDictTest + : public DBTestBase, + public testing::WithParamInterface> { + public: + PresetCompressionDictTest() + : DBTestBase("/db_test2", false /* env_do_fsync */), + compression_type_(std::get<0>(GetParam())), + bottommost_(std::get<1>(GetParam())) {} + + protected: + const CompressionType compression_type_; + const bool bottommost_; +}; + +INSTANTIATE_TEST_CASE_P( + DBTest2, PresetCompressionDictTest, + ::testing::Combine(::testing::ValuesIn(GetSupportedDictCompressions()), + ::testing::Bool())); + +TEST_P(PresetCompressionDictTest, Flush) { + // Verifies that dictionary is generated and written during flush only when + // `ColumnFamilyOptions::compression` enables dictionary. + const size_t kValueLen = 256; + const size_t kKeysPerFile = 1 << 10; + const size_t kDictLen = 4 << 10; + + Options options = CurrentOptions(); + if (bottommost_) { + options.bottommost_compression = compression_type_; + options.bottommost_compression_opts.enabled = true; + options.bottommost_compression_opts.max_dict_bytes = kDictLen; + } else { + options.compression = compression_type_; + options.compression_opts.max_dict_bytes = kDictLen; + } + options.memtable_factory.reset(new SpecialSkipListFactory(kKeysPerFile)); + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions bbto; + bbto.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + + uint64_t prev_compression_dict_misses = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); + Random rnd(301); + for (size_t i = 0; i <= kKeysPerFile; ++i) { + ASSERT_OK(Put(Key(static_cast(i)), rnd.RandomString(kValueLen))); + } + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + + // If there's a compression dictionary, it should have been loaded when the + // flush finished, incurring a cache miss. + uint64_t expected_compression_dict_misses; + if (bottommost_) { + expected_compression_dict_misses = prev_compression_dict_misses; + } else { + expected_compression_dict_misses = prev_compression_dict_misses + 1; + } + ASSERT_EQ(expected_compression_dict_misses, + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); +} + +TEST_P(PresetCompressionDictTest, CompactNonBottommost) { + // Verifies that dictionary is generated and written during compaction to + // non-bottommost level only when `ColumnFamilyOptions::compression` enables + // dictionary. + const size_t kValueLen = 256; + const size_t kKeysPerFile = 1 << 10; + const size_t kDictLen = 4 << 10; + + Options options = CurrentOptions(); + if (bottommost_) { + options.bottommost_compression = compression_type_; + options.bottommost_compression_opts.enabled = true; + options.bottommost_compression_opts.max_dict_bytes = kDictLen; + } else { + options.compression = compression_type_; + options.compression_opts.max_dict_bytes = kDictLen; + } + options.disable_auto_compactions = true; + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions bbto; + bbto.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + + Random rnd(301); + for (size_t j = 0; j <= kKeysPerFile; ++j) { + ASSERT_OK(Put(Key(static_cast(j)), rnd.RandomString(kValueLen))); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + + for (int i = 0; i < 2; ++i) { + for (size_t j = 0; j <= kKeysPerFile; ++j) { + ASSERT_OK(Put(Key(static_cast(j)), rnd.RandomString(kValueLen))); + } + ASSERT_OK(Flush()); + } +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,0,1", FilesPerLevel(0)); +#endif // ROCKSDB_LITE + + uint64_t prev_compression_dict_misses = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); + // This L0->L1 compaction merges the two L0 files into L1. The produced L1 + // file is not bottommost due to the existing L2 file covering the same key- + // range. + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr)); +#ifndef ROCKSDB_LITE + ASSERT_EQ("0,1,1", FilesPerLevel(0)); +#endif // ROCKSDB_LITE + // If there's a compression dictionary, it should have been loaded when the + // compaction finished, incurring a cache miss. + uint64_t expected_compression_dict_misses; + if (bottommost_) { + expected_compression_dict_misses = prev_compression_dict_misses; + } else { + expected_compression_dict_misses = prev_compression_dict_misses + 1; + } + ASSERT_EQ(expected_compression_dict_misses, + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); +} + +TEST_P(PresetCompressionDictTest, CompactBottommost) { + // Verifies that dictionary is generated and written during compaction to + // non-bottommost level only when either `ColumnFamilyOptions::compression` or + // `ColumnFamilyOptions::bottommost_compression` enables dictionary. + const size_t kValueLen = 256; + const size_t kKeysPerFile = 1 << 10; + const size_t kDictLen = 4 << 10; + + Options options = CurrentOptions(); + if (bottommost_) { + options.bottommost_compression = compression_type_; + options.bottommost_compression_opts.enabled = true; + options.bottommost_compression_opts.max_dict_bytes = kDictLen; + } else { + options.compression = compression_type_; + options.compression_opts.max_dict_bytes = kDictLen; + } + options.disable_auto_compactions = true; + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions bbto; + bbto.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + + Random rnd(301); + for (int i = 0; i < 2; ++i) { + for (size_t j = 0; j <= kKeysPerFile; ++j) { + ASSERT_OK(Put(Key(static_cast(j)), rnd.RandomString(kValueLen))); + } + ASSERT_OK(Flush()); + } +#ifndef ROCKSDB_LITE + ASSERT_EQ("2", FilesPerLevel(0)); +#endif // ROCKSDB_LITE + + uint64_t prev_compression_dict_misses = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); + CompactRangeOptions cro; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); +#ifndef ROCKSDB_LITE + ASSERT_EQ("0,1", FilesPerLevel(0)); +#endif // ROCKSDB_LITE + // If there's a compression dictionary, it should have been loaded when the + // compaction finished, incurring a cache miss. + ASSERT_EQ(prev_compression_dict_misses + 1, + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); +} + class CompactionCompressionListener : public EventListener { public: explicit CompactionCompressionListener(Options* db_options) diff --git a/options/options_helper.cc b/options/options_helper.cc index e0f8d26cd..f1ed4eda3 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -299,6 +299,17 @@ std::vector GetSupportedCompressions() { return supported_compressions; } +std::vector GetSupportedDictCompressions() { + std::vector dict_compression_types; + for (const auto& comp_to_name : OptionsHelper::compression_type_string_map) { + CompressionType t = comp_to_name.second; + if (t != kDisableCompressionOption && DictCompressionTypeSupported(t)) { + dict_compression_types.push_back(t); + } + } + return dict_compression_types; +} + #ifndef ROCKSDB_LITE bool ParseSliceTransformHelper( const std::string& kFixedPrefixName, const std::string& kCappedPrefixName, diff --git a/options/options_helper.h b/options/options_helper.h index 4323d5f8e..43ddbdfc3 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -25,6 +25,8 @@ struct Options; std::vector GetSupportedCompressions(); +std::vector GetSupportedDictCompressions(); + // Checks that the combination of DBOptions and ColumnFamilyOptions are valid Status ValidateOptions(const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts); diff --git a/util/compression.h b/util/compression.h index 4f5254499..53e977c88 100644 --- a/util/compression.h +++ b/util/compression.h @@ -540,6 +540,43 @@ inline bool CompressionTypeSupported(CompressionType compression_type) { } } +inline bool DictCompressionTypeSupported(CompressionType compression_type) { + switch (compression_type) { + case kNoCompression: + return false; + case kSnappyCompression: + return false; + case kZlibCompression: + return Zlib_Supported(); + case kBZip2Compression: + return false; + case kLZ4Compression: + case kLZ4HCCompression: +#if LZ4_VERSION_NUMBER >= 10400 // r124+ + return LZ4_Supported(); +#else + return false; +#endif + case kXpressCompression: + return false; + case kZSTDNotFinalCompression: +#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ + return ZSTDNotFinal_Supported(); +#else + return false; +#endif + case kZSTD: +#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ + return ZSTD_Supported(); +#else + return false; +#endif + default: + assert(false); + return false; + } +} + inline std::string CompressionTypeToString(CompressionType compression_type) { switch (compression_type) { case kNoCompression: