From 58ec8cf30427b38d1a069889cda7adf1a9c807bb Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Wed, 17 Feb 2021 10:42:56 -0800 Subject: [PATCH] Backport https://github.com/facebook/rocksdb/pull/7970 --- HISTORY.md | 4 + db/c.cc | 13 +++ db/db_test2.cc | 100 ++++++++++++------ db_stress_tool/db_stress_common.h | 1 + db_stress_tool/db_stress_gflags.cc | 4 + db_stress_tool/db_stress_test_base.cc | 2 + include/rocksdb/advanced_options.h | 30 +++++- include/rocksdb/c.h | 6 ++ java/rocksjni/compression_options.cc | 21 ++++ options/cf_options.cc | 19 +++- options/options.cc | 9 ++ options/options_settable_test.cc | 13 +-- options/options_test.cc | 2 +- .../block_based/block_based_table_builder.cc | 81 ++++++++++---- table/block_based/block_based_table_builder.h | 5 +- table/sst_file_dumper.cc | 4 +- table/sst_file_dumper.h | 3 +- tools/db_bench_tool.cc | 6 ++ tools/db_crashtest.py | 7 +- tools/sst_dump_tool.cc | 18 +++- util/compression.h | 3 + util/string_util.cc | 11 +- 22 files changed, 289 insertions(+), 73 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 2baf84687..0314cdb66 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,8 @@ # Rocksdb Change Log +## Unreleased +### Public API Change +* Add an option, `CompressionOptions::max_dict_buffer_bytes`, to limit the in-memory buffering for selecting samples for generating/training a dictionary. The limit is currently loosely adhered to. + ## 6.17.2 (02/05/2021) ### Bug Fixes * Since 6.15.0, `TransactionDB` returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. There are certain cases where range deletion can still be used on such DBs; see the API doc on `TransactionDB::DeleteRange()` for details. diff --git a/db/c.cc b/db/c.cc index b93b42980..5469b6754 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2774,6 +2774,14 @@ void rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes( opt->rep.bottommost_compression_opts.enabled = enabled; } +void rocksdb_options_set_bottommost_compression_options_max_dict_buffer_bytes( + rocksdb_options_t* opt, uint64_t max_dict_buffer_bytes, + unsigned char enabled) { + opt->rep.bottommost_compression_opts.max_dict_buffer_bytes = + max_dict_buffer_bytes; + opt->rep.bottommost_compression_opts.enabled = enabled; +} + void rocksdb_options_set_compression_options(rocksdb_options_t* opt, int w_bits, int level, int strategy, int max_dict_bytes) { @@ -2788,6 +2796,11 @@ void rocksdb_options_set_compression_options_zstd_max_train_bytes( opt->rep.compression_opts.zstd_max_train_bytes = zstd_max_train_bytes; } +void rocksdb_options_set_compression_options_max_dict_buffer_bytes( + rocksdb_options_t* opt, uint64_t max_dict_buffer_bytes) { + opt->rep.compression_opts.max_dict_buffer_bytes = max_dict_buffer_bytes; +} + void rocksdb_options_set_prefix_extractor( rocksdb_options_t* opt, rocksdb_slicetransform_t* prefix_extractor) { opt->rep.prefix_extractor.reset(prefix_extractor); diff --git a/db/db_test2.cc b/db/db_test2.cc index 33c13e69c..8fb7ed278 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1410,67 +1410,88 @@ INSTANTIATE_TEST_CASE_P( TEST_P(PresetCompressionDictTest, Flush) { // Verifies that dictionary is generated and written during flush only when - // `ColumnFamilyOptions::compression` enables dictionary. + // `ColumnFamilyOptions::compression` enables dictionary. Also verifies the + // size of the dictionary is within expectations according to the limit on + // buffering set by `CompressionOptions::max_dict_buffer_bytes`. const size_t kValueLen = 256; const size_t kKeysPerFile = 1 << 10; - const size_t kDictLen = 4 << 10; + const size_t kDictLen = 16 << 10; + const size_t kBlockLen = 4 << 10; Options options = CurrentOptions(); if (bottommost_) { options.bottommost_compression = compression_type_; options.bottommost_compression_opts.enabled = true; options.bottommost_compression_opts.max_dict_bytes = kDictLen; + options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen; } else { options.compression = compression_type_; options.compression_opts.max_dict_bytes = kDictLen; + options.compression_opts.max_dict_buffer_bytes = kBlockLen; } options.memtable_factory.reset(new SpecialSkipListFactory(kKeysPerFile)); options.statistics = CreateDBStatistics(); BlockBasedTableOptions bbto; + bbto.block_size = kBlockLen; bbto.cache_index_and_filter_blocks = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); Reopen(options); - uint64_t prev_compression_dict_misses = - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); Random rnd(301); for (size_t i = 0; i <= kKeysPerFile; ++i) { ASSERT_OK(Put(Key(static_cast(i)), rnd.RandomString(kValueLen))); } ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); - // If there's a compression dictionary, it should have been loaded when the - // flush finished, incurring a cache miss. - uint64_t expected_compression_dict_misses; + // We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a + // compression dictionary exists since dictionaries would be preloaded when + // the flush finishes. if (bottommost_) { - expected_compression_dict_misses = prev_compression_dict_misses; + // Flush is never considered bottommost. This should change in the future + // since flushed files may have nothing underneath them, like the one in + // this test case. + ASSERT_EQ( + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), + 0); } else { - expected_compression_dict_misses = prev_compression_dict_misses + 1; + ASSERT_GT( + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), + 0); + // Although we limited buffering to `kBlockLen`, there may be up to two + // blocks of data included in the dictionary since we only check limit after + // each block is built. + ASSERT_LE( + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), + 2 * kBlockLen); } - ASSERT_EQ(expected_compression_dict_misses, - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); } TEST_P(PresetCompressionDictTest, CompactNonBottommost) { // Verifies that dictionary is generated and written during compaction to // non-bottommost level only when `ColumnFamilyOptions::compression` enables - // dictionary. + // dictionary. Also verifies the size of the dictionary is within expectations + // according to the limit on buffering set by + // `CompressionOptions::max_dict_buffer_bytes`. const size_t kValueLen = 256; const size_t kKeysPerFile = 1 << 10; - const size_t kDictLen = 4 << 10; + const size_t kDictLen = 16 << 10; + const size_t kBlockLen = 4 << 10; Options options = CurrentOptions(); if (bottommost_) { options.bottommost_compression = compression_type_; options.bottommost_compression_opts.enabled = true; options.bottommost_compression_opts.max_dict_bytes = kDictLen; + options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen; } else { options.compression = compression_type_; options.compression_opts.max_dict_bytes = kDictLen; + options.compression_opts.max_dict_buffer_bytes = kBlockLen; } options.disable_auto_compactions = true; options.statistics = CreateDBStatistics(); BlockBasedTableOptions bbto; + bbto.block_size = kBlockLen; bbto.cache_index_and_filter_blocks = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); Reopen(options); @@ -1492,8 +1513,8 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) { ASSERT_EQ("2,0,1", FilesPerLevel(0)); #endif // ROCKSDB_LITE - uint64_t prev_compression_dict_misses = - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); + uint64_t prev_compression_dict_bytes_inserted = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT); // This L0->L1 compaction merges the two L0 files into L1. The produced L1 // file is not bottommost due to the existing L2 file covering the same key- // range. @@ -1501,38 +1522,52 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) { #ifndef ROCKSDB_LITE ASSERT_EQ("0,1,1", FilesPerLevel(0)); #endif // ROCKSDB_LITE - // If there's a compression dictionary, it should have been loaded when the - // compaction finished, incurring a cache miss. - uint64_t expected_compression_dict_misses; + // We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a + // compression dictionary exists since dictionaries would be preloaded when + // the compaction finishes. if (bottommost_) { - expected_compression_dict_misses = prev_compression_dict_misses; + ASSERT_EQ( + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), + prev_compression_dict_bytes_inserted); } else { - expected_compression_dict_misses = prev_compression_dict_misses + 1; + ASSERT_GT( + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), + prev_compression_dict_bytes_inserted); + // Although we limited buffering to `kBlockLen`, there may be up to two + // blocks of data included in the dictionary since we only check limit after + // each block is built. + ASSERT_LE( + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), + prev_compression_dict_bytes_inserted + 2 * kBlockLen); } - ASSERT_EQ(expected_compression_dict_misses, - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); } TEST_P(PresetCompressionDictTest, CompactBottommost) { // Verifies that dictionary is generated and written during compaction to // non-bottommost level only when either `ColumnFamilyOptions::compression` or - // `ColumnFamilyOptions::bottommost_compression` enables dictionary. + // `ColumnFamilyOptions::bottommost_compression` enables dictionary. Also + // verifies the size of the dictionary is within expectations according to the + // limit on buffering set by `CompressionOptions::max_dict_buffer_bytes`. const size_t kValueLen = 256; const size_t kKeysPerFile = 1 << 10; - const size_t kDictLen = 4 << 10; + const size_t kDictLen = 16 << 10; + const size_t kBlockLen = 4 << 10; Options options = CurrentOptions(); if (bottommost_) { options.bottommost_compression = compression_type_; options.bottommost_compression_opts.enabled = true; options.bottommost_compression_opts.max_dict_bytes = kDictLen; + options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen; } else { options.compression = compression_type_; options.compression_opts.max_dict_bytes = kDictLen; + options.compression_opts.max_dict_buffer_bytes = kBlockLen; } options.disable_auto_compactions = true; options.statistics = CreateDBStatistics(); BlockBasedTableOptions bbto; + bbto.block_size = kBlockLen; bbto.cache_index_and_filter_blocks = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); Reopen(options); @@ -1548,17 +1583,22 @@ TEST_P(PresetCompressionDictTest, CompactBottommost) { ASSERT_EQ("2", FilesPerLevel(0)); #endif // ROCKSDB_LITE - uint64_t prev_compression_dict_misses = - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); + uint64_t prev_compression_dict_bytes_inserted = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT); CompactRangeOptions cro; ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); #ifndef ROCKSDB_LITE ASSERT_EQ("0,1", FilesPerLevel(0)); #endif // ROCKSDB_LITE - // If there's a compression dictionary, it should have been loaded when the - // compaction finished, incurring a cache miss. - ASSERT_EQ(prev_compression_dict_misses + 1, - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); + ASSERT_GT( + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), + prev_compression_dict_bytes_inserted); + // Although we limited buffering to `kBlockLen`, there may be up to two + // blocks of data included in the dictionary since we only check limit after + // each block is built. + ASSERT_LE( + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), + prev_compression_dict_bytes_inserted + 2 * kBlockLen); } class CompactionCompressionListener : public EventListener { diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 4c48d73c9..fb43abdd8 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -206,6 +206,7 @@ DECLARE_string(bottommost_compression_type); DECLARE_int32(compression_max_dict_bytes); DECLARE_int32(compression_zstd_max_train_bytes); DECLARE_int32(compression_parallel_threads); +DECLARE_uint64(compression_max_dict_buffer_bytes); DECLARE_string(checksum_type); DECLARE_string(hdfs); DECLARE_string(env_uri); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 5046987f6..cffd88a5b 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -626,6 +626,10 @@ DEFINE_int32(compression_zstd_max_train_bytes, 0, DEFINE_int32(compression_parallel_threads, 1, "Number of threads for parallel compression."); +DEFINE_uint64(compression_max_dict_buffer_bytes, 0, + "Buffering limit for SST file data to sample for dictionary " + "compression."); + DEFINE_string(bottommost_compression_type, "disable", "Algorithm to use to compress bottommost level of the database. " "\"disable\" means disabling the feature"); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 52e879e24..d0b1ee100 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -2052,6 +2052,8 @@ void StressTest::Open() { FLAGS_compression_zstd_max_train_bytes; options_.compression_opts.parallel_threads = FLAGS_compression_parallel_threads; + options_.compression_opts.max_dict_buffer_bytes = + FLAGS_compression_max_dict_buffer_bytes; options_.create_if_missing = true; options_.max_manifest_file_size = FLAGS_max_manifest_file_size; options_.inplace_update_support = FLAGS_in_place_update; diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index a7d9f542f..52d4a8aa5 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -143,6 +143,28 @@ struct CompressionOptions { // Default: false. bool enabled; + // Limit on data buffering, which is used to gather samples to build a + // dictionary. Zero means no limit. When dictionary is disabled + // (`max_dict_bytes == 0`), enabling this limit (`max_dict_buffer_bytes != 0`) + // has no effect. + // + // In compaction, the buffering is limited to the target file size (see + // `target_file_size_base` and `target_file_size_multiplier`) even if this + // setting permits more buffering. Since we cannot determine where the file + // should be cut until data blocks are compressed with dictionary, buffering + // more than the target file size could lead to selecting samples that belong + // to a later output SST. + // + // Limiting too strictly may harm dictionary effectiveness since it forces + // RocksDB to pick samples from the initial portion of the output SST, which + // may not be representative of the whole file. Configuring this limit below + // `zstd_max_train_bytes` (when enabled) can restrict how many samples we can + // pass to the dictionary trainer. Configuring it below `max_dict_bytes` can + // restrict the size of the final dictionary. + // + // Default: 0 (unlimited) + uint64_t max_dict_buffer_bytes; + CompressionOptions() : window_bits(-14), level(kDefaultCompressionLevel), @@ -150,17 +172,19 @@ struct CompressionOptions { max_dict_bytes(0), zstd_max_train_bytes(0), parallel_threads(1), - enabled(false) {} + enabled(false), + max_dict_buffer_bytes(0) {} CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes, int _zstd_max_train_bytes, int _parallel_threads, - bool _enabled) + bool _enabled, uint64_t _max_dict_buffer_bytes) : window_bits(wbits), level(_lev), strategy(_strategy), max_dict_bytes(_max_dict_bytes), zstd_max_train_bytes(_zstd_max_train_bytes), parallel_threads(_parallel_threads), - enabled(_enabled) {} + enabled(_enabled), + max_dict_buffer_bytes(_max_dict_buffer_bytes) {} }; enum UpdateStatus { // Return status For inplace update callback diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index ed3382a7a..60d66a616 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -998,11 +998,17 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_compression_options_zstd_max_train_bytes(rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_compression_options_max_dict_buffer_bytes( + rocksdb_options_t*, uint64_t); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_bottommost_compression_options(rocksdb_options_t*, int, int, int, int, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes( rocksdb_options_t*, int, unsigned char); +extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_bottommost_compression_options_max_dict_buffer_bytes( + rocksdb_options_t*, uint64_t, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_prefix_extractor( rocksdb_options_t*, rocksdb_slicetransform_t*); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_num_levels( diff --git a/java/rocksjni/compression_options.cc b/java/rocksjni/compression_options.cc index 4fed5ba5f..1857faf68 100644 --- a/java/rocksjni/compression_options.cc +++ b/java/rocksjni/compression_options.cc @@ -132,6 +132,27 @@ jint Java_org_rocksdb_CompressionOptions_zstdMaxTrainBytes( return static_cast(opt->zstd_max_train_bytes); } +/* + * Class: org_rocksdb_CompressionOptions + * Method: setMaxDictBufferBytes + * Signature: (JJ)V + */ +void Java_org_rocksdb_CompressionOptions_setMaxDictBufferBytes( + JNIEnv*, jobject, jlong jhandle, jlong jmax_dict_buffer_bytes) { + auto* opt = reinterpret_cast(jhandle); + opt->max_dict_buffer_bytes = static_cast(jmax_dict_buffer_bytes); +} + +/* + * Class: org_rocksdb_CompressionOptions + * Method: maxDictBufferBytes + * Signature: (J)J + */ +jlong Java_org_rocksdb_CompressionOptions_maxDictBufferBytes(JNIEnv*, jobject, + jlong jhandle) { + auto* opt = reinterpret_cast(jhandle); + return static_cast(opt->max_dict_buffer_bytes); +} /* * Class: org_rocksdb_CompressionOptions * Method: setEnabled diff --git a/options/cf_options.cc b/options/cf_options.cc index c436dd312..b1fb66c84 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -105,7 +105,7 @@ static Status ParseCompressionOptions(const std::string& value, } // Since parallel_threads comes before enabled but was added optionally // later, we need to check if this is the final token (meaning it is the - // enabled bit), or if there is another token (meaning this one is + // enabled bit), or if there are more tokens (meaning this one is // parallel_threads) end = value.find(':', start); if (end != std::string::npos) { @@ -113,7 +113,6 @@ static Status ParseCompressionOptions(const std::string& value, ParseInt(value.substr(start, value.size() - start)); } else { // parallel_threads is not serialized with this format, but enabled is - compression_opts.parallel_threads = CompressionOptions().parallel_threads; compression_opts.enabled = ParseBoolean("", value.substr(start, value.size() - start)); } @@ -128,6 +127,18 @@ static Status ParseCompressionOptions(const std::string& value, } compression_opts.enabled = ParseBoolean("", value.substr(start, value.size() - start)); + end = value.find(':', start); + } + + // max_dict_buffer_bytes is optional for backwards compatibility + if (end != std::string::npos) { + start = end + 1; + if (start >= value.size()) { + return Status::InvalidArgument( + "unable to parse the specified CF option " + name); + } + compression_opts.max_dict_buffer_bytes = + ParseUint64(value.substr(start, value.size() - start)); } return Status::OK(); } @@ -161,6 +172,10 @@ static std::unordered_map {"enabled", {offsetof(struct CompressionOptions, enabled), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"max_dict_buffer_bytes", + {offsetof(struct CompressionOptions, max_dict_buffer_bytes), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, }; static std::unordered_map diff --git a/options/options.cc b/options/options.cc index d76a15441..4faee64b4 100644 --- a/options/options.cc +++ b/options/options.cc @@ -201,6 +201,11 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER( log, " Options.bottommost_compression_opts.enabled: %s", bottommost_compression_opts.enabled ? "true" : "false"); + ROCKS_LOG_HEADER( + log, + " Options.bottommost_compression_opts.max_dict_buffer_bytes: " + "%" PRIu64, + bottommost_compression_opts.max_dict_buffer_bytes); ROCKS_LOG_HEADER(log, " Options.compression_opts.window_bits: %d", compression_opts.window_bits); ROCKS_LOG_HEADER(log, " Options.compression_opts.level: %d", @@ -222,6 +227,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.compression_opts.enabled: %s", compression_opts.enabled ? "true" : "false"); + ROCKS_LOG_HEADER(log, + " Options.compression_opts.max_dict_buffer_bytes: " + "%" PRIu64, + compression_opts.max_dict_buffer_bytes); ROCKS_LOG_HEADER(log, " Options.level0_file_num_compaction_trigger: %d", level0_file_num_compaction_trigger); ROCKS_LOG_HEADER(log, " Options.level0_slowdown_writes_trigger: %d", diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 5e0d402fd..04b202620 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -409,10 +409,11 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { FillWithSpecialChar(options_ptr, sizeof(ColumnFamilyOptions), kColumnFamilyOptionsExcluded); - // It based on the behavior of compiler that padding bytes are not changed - // when copying the struct. It's prone to failure when compiler behavior - // changes. We verify there is unset bytes to detect the case. - *options = ColumnFamilyOptions(); + // Invoke a user-defined constructor in the hope that it does not overwrite + // padding bytes. Note that previously we relied on the implicitly-defined + // copy-assignment operator (i.e., `*options = ColumnFamilyOptions();`) here, + // which did in fact modify padding bytes. + options = new (options_ptr) ColumnFamilyOptions(); // Deprecatd option which is not initialized. Need to set it to avoid // Valgrind error @@ -470,8 +471,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "max_bytes_for_level_multiplier=60;" "memtable_factory=SkipListFactory;" "compression=kNoCompression;" - "compression_opts=5:6:7:8:9:true;" - "bottommost_compression_opts=4:5:6:7:8:true;" + "compression_opts=5:6:7:8:9:10:true:11;" + "bottommost_compression_opts=4:5:6:7:8:9:true:10;" "bottommost_compression=kDisableCompressionOption;" "level0_stop_writes_trigger=33;" "num_levels=99;" diff --git a/options/options_test.cc b/options/options_test.cc index b15be0206..08cbc6187 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -725,7 +725,7 @@ TEST_F(OptionsTest, CompressionOptionsFromString) { ASSERT_OK(GetColumnFamilyOptionsFromString( ignore, ColumnFamilyOptions(), "compression_opts=5:6:7:8:9:x:false", &base_cf_opt)); - ASSERT_NOK(GetColumnFamilyOptionsFromString( + ASSERT_OK(GetColumnFamilyOptionsFromString( config_options, ColumnFamilyOptions(), "compression_opts=1:2:3:4:5:6:true:8", &base_cf_opt)); ASSERT_OK(GetColumnFamilyOptionsFromString( diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index a28639932..51e064216 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -11,24 +11,25 @@ #include #include + #include #include #include #include +#include #include #include #include #include "db/dbformat.h" #include "index_builder.h" - +#include "memory/memory_allocator.h" #include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/flush_block_policy.h" #include "rocksdb/merge_operator.h" #include "rocksdb/table.h" - #include "table/block_based/block.h" #include "table/block_based/block_based_filter_block.h" #include "table/block_based/block_based_table_factory.h" @@ -40,8 +41,6 @@ #include "table/block_based/partitioned_filter_block.h" #include "table/format.h" #include "table/table_builder.h" - -#include "memory/memory_allocator.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -306,6 +305,10 @@ struct BlockBasedTableBuilder::Rep { kClosed, }; State state; + // `kBuffered` state is allowed only as long as the buffering of uncompressed + // data blocks (see `data_block_and_keys_buffers`) does not exceed + // `buffer_limit`. + uint64_t buffer_limit; const bool use_delta_encoding_for_index_values; std::unique_ptr filter_builder; @@ -321,7 +324,6 @@ struct BlockBasedTableBuilder::Rep { const std::string& column_family_name; uint64_t creation_time = 0; uint64_t oldest_key_time = 0; - const uint64_t target_file_size; uint64_t file_creation_time = 0; // DB IDs @@ -407,7 +409,7 @@ struct BlockBasedTableBuilder::Rep { const CompressionOptions& _compression_opts, const bool skip_filters, const int _level_at_creation, const std::string& _column_family_name, const uint64_t _creation_time, const uint64_t _oldest_key_time, - const uint64_t _target_file_size, const uint64_t _file_creation_time, + const uint64_t target_file_size, const uint64_t _file_creation_time, const std::string& _db_id, const std::string& _db_session_id) : ioptions(_ioptions), moptions(_moptions), @@ -448,13 +450,20 @@ struct BlockBasedTableBuilder::Rep { column_family_name(_column_family_name), creation_time(_creation_time), oldest_key_time(_oldest_key_time), - target_file_size(_target_file_size), file_creation_time(_file_creation_time), db_id(_db_id), db_session_id(_db_session_id), db_host_id(ioptions.db_host_id), status_ok(true), io_status_ok(true) { + if (target_file_size == 0) { + buffer_limit = compression_opts.max_dict_buffer_bytes; + } else if (compression_opts.max_dict_buffer_bytes == 0) { + buffer_limit = target_file_size; + } else { + buffer_limit = + std::min(target_file_size, compression_opts.max_dict_buffer_bytes); + } for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { compression_ctxs[i].reset(new CompressionContext(compression_type)); } @@ -896,8 +905,8 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { r->first_key_in_next_block = &key; Flush(); - if (r->state == Rep::State::kBuffered && r->target_file_size != 0 && - r->data_begin_offset > r->target_file_size) { + if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 && + r->data_begin_offset > r->buffer_limit) { EnterUnbuffered(); } @@ -997,23 +1006,28 @@ void BlockBasedTableBuilder::Flush() { void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block) { - WriteBlock(block->Finish(), handle, is_data_block); - block->Reset(); + block->Finish(); + std::string raw_block_contents; + block->SwapAndReset(raw_block_contents); + if (rep_->state == Rep::State::kBuffered) { + assert(is_data_block); + assert(!rep_->data_block_and_keys_buffers.empty()); + rep_->data_block_and_keys_buffers.back().first = + std::move(raw_block_contents); + rep_->data_begin_offset += + rep_->data_block_and_keys_buffers.back().first.size(); + return; + } + WriteBlock(raw_block_contents, handle, is_data_block); } void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, BlockHandle* handle, bool is_data_block) { Rep* r = rep_; + assert(r->state == Rep::State::kUnbuffered); Slice block_contents; CompressionType type; - if (r->state == Rep::State::kBuffered) { - assert(is_data_block); - assert(!r->data_block_and_keys_buffers.empty()); - r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString(); - r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size(); - return; - } Status compress_status; CompressAndVerifyBlock(raw_block_contents, is_data_block, *(r->compression_ctxs[0]), r->verify_ctxs[0].get(), @@ -1629,14 +1643,37 @@ void BlockBasedTableBuilder::EnterUnbuffered() { const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0 ? r->compression_opts.zstd_max_train_bytes : r->compression_opts.max_dict_bytes; + + // If buffer size is reasonable, we pre-generate a permutation to enforce + // uniqueness. This prevents wasting samples on duplicates, which is + // particularly likely when not many blocks were buffered. + std::vector data_block_order; + size_t data_block_order_idx = 0; + if (r->data_block_and_keys_buffers.size() <= ((1 << 16) - 1)) { + data_block_order.resize(r->data_block_and_keys_buffers.size()); + std::iota(data_block_order.begin(), data_block_order.end(), + static_cast(0)); + // We could be smarter and interleave the shuffling and sample appending + // logic. Then we could terminate as soon as `kSampleBytes` is reached, + // saving some shuffling computation. + RandomShuffle(data_block_order.begin(), data_block_order.end()); + } + Random64 generator{r->creation_time}; std::string compression_dict_samples; std::vector compression_dict_sample_lens; if (!r->data_block_and_keys_buffers.empty()) { - while (compression_dict_samples.size() < kSampleBytes) { - size_t rand_idx = - static_cast( - generator.Uniform(r->data_block_and_keys_buffers.size())); + while ((data_block_order.empty() || + data_block_order_idx < data_block_order.size()) && + compression_dict_samples.size() < kSampleBytes) { + size_t rand_idx; + if (data_block_order.empty()) { + rand_idx = static_cast( + generator.Uniform(r->data_block_and_keys_buffers.size())); + } else { + rand_idx = data_block_order[data_block_order_idx]; + ++data_block_order_idx; + } size_t copy_len = std::min(kSampleBytes - compression_dict_samples.size(), r->data_block_and_keys_buffers[rand_idx].first.size()); diff --git a/table/block_based/block_based_table_builder.h b/table/block_based/block_based_table_builder.h index 147fb8e7a..99c80f843 100644 --- a/table/block_based/block_based_table_builder.h +++ b/table/block_based/block_based_table_builder.h @@ -117,8 +117,9 @@ class BlockBasedTableBuilder : public TableBuilder { // REQUIRES: `rep_->state == kBuffered` void EnterUnbuffered(); - // Call block's Finish() method - // and then write the compressed block contents to file. + // Call block's Finish() method and then + // - in buffered mode, buffer the uncompressed block contents. + // - in unbuffered mode, write the compressed block contents to file. void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block); // Compress and write block content to the file. diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc index c405609bc..62130c424 100644 --- a/table/sst_file_dumper.cc +++ b/table/sst_file_dumper.cc @@ -235,7 +235,8 @@ Status SstFileDumper::ShowAllCompressionSizes( const std::vector>& compression_types, int32_t compress_level_from, int32_t compress_level_to, - uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes) { + uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes, + uint64_t max_dict_buffer_bytes) { fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size); for (auto& i : compression_types) { if (CompressionTypeSupported(i.first)) { @@ -243,6 +244,7 @@ Status SstFileDumper::ShowAllCompressionSizes( CompressionOptions compress_opt; compress_opt.max_dict_bytes = max_dict_bytes; compress_opt.zstd_max_train_bytes = zstd_max_train_bytes; + compress_opt.max_dict_buffer_bytes = max_dict_buffer_bytes; for (int32_t j = compress_level_from; j <= compress_level_to; j++) { fprintf(stdout, "Compression level: %d", j); compress_opt.level = j; diff --git a/table/sst_file_dumper.h b/table/sst_file_dumper.h index 9153f8a38..ab8f5a3e2 100644 --- a/table/sst_file_dumper.h +++ b/table/sst_file_dumper.h @@ -40,7 +40,8 @@ class SstFileDumper { const std::vector>& compression_types, int32_t compress_level_from, int32_t compress_level_to, - uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes); + uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes, + uint64_t max_dict_buffer_bytes); Status ShowCompressionSize(size_t block_size, CompressionType compress_type, const CompressionOptions& compress_opt); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 852ea3406..2a4a742d2 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -948,6 +948,10 @@ DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts" DEFINE_int32(compression_parallel_threads, 1, "Number of threads for parallel compression."); +DEFINE_uint64(compression_max_dict_buffer_bytes, + ROCKSDB_NAMESPACE::CompressionOptions().max_dict_buffer_bytes, + "Maximum bytes to buffer to collect samples for dictionary."); + static bool ValidateTableCacheNumshardbits(const char* flagname, int32_t value) { if (0 >= value || value > 20) { @@ -4053,6 +4057,8 @@ class Benchmark { FLAGS_compression_zstd_max_train_bytes; options.compression_opts.parallel_threads = FLAGS_compression_parallel_threads; + options.compression_opts.max_dict_buffer_bytes = + FLAGS_compression_max_dict_buffer_bytes; // If this is a block based table, set some related options auto table_options = options.table_factory->GetOptions(); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index caff49790..6e2888350 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -50,6 +50,7 @@ default_params = { # Disabled compression_parallel_threads as the feature is not stable # lambda: random.choice([1] * 9 + [4]) "compression_parallel_threads": 1, + "compression_max_dict_buffer_bytes": lambda: 4096 * random.randint(0, 32), "clear_column_family_one_in": 0, "compact_files_one_in": 1000000, "compact_range_one_in": 1000000, @@ -267,8 +268,10 @@ best_efforts_recovery_params = { def finalize_and_sanitize(src_params): dest_params = dict([(k, v() if callable(v) else v) for (k, v) in src_params.items()]) - if dest_params.get("compression_type") != "zstd" or \ - dest_params.get("compression_max_dict_bytes") == 0: + if dest_params.get("compression_max_dict_bytes") == 0: + dest_params["compression_zstd_max_train_bytes"] = 0 + dest_params["compression_max_dict_buffer_bytes"] = 0 + if dest_params.get("compression_type") != "zstd": dest_params["compression_zstd_max_train_bytes"] = 0 if dest_params.get("allow_concurrent_memtable_write", 1) == 1: dest_params["memtablerep"] = "skip_list" diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc index 7d1be21ef..415ba28bf 100644 --- a/tools/sst_dump_tool.cc +++ b/tools/sst_dump_tool.cc @@ -103,6 +103,9 @@ void print_help(bool to_stderr) { --compression_zstd_max_train_bytes= Maximum size of training data passed to zstd's dictionary trainer + + --compression_max_dict_buffer_bytes= + Limit on buffer size from which we collect samples for dictionary generation. )"); } @@ -166,6 +169,8 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { ROCKSDB_NAMESPACE::CompressionOptions().max_dict_bytes; uint32_t compression_zstd_max_train_bytes = ROCKSDB_NAMESPACE::CompressionOptions().zstd_max_train_bytes; + uint64_t compression_max_dict_buffer_bytes = + ROCKSDB_NAMESPACE::CompressionOptions().max_dict_buffer_bytes; int64_t tmp_val; @@ -276,6 +281,17 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { return 1; } compression_zstd_max_train_bytes = static_cast(tmp_val); + } else if (ParseIntArg(argv[i], "--compression_max_dict_buffer_bytes=", + "compression_max_dict_buffer_bytes must be numeric", + &tmp_val)) { + if (tmp_val < 0) { + fprintf(stderr, + "compression_max_dict_buffer_bytes must be positive: '%s'\n", + argv[i]); + print_help(/*to_stderr*/ true); + return 1; + } + compression_max_dict_buffer_bytes = static_cast(tmp_val); } else if (strcmp(argv[i], "--help") == 0) { print_help(/*to_stderr*/ false); return 0; @@ -404,7 +420,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) { set_block_size ? block_size : 16384, compression_types.empty() ? kCompressions : compression_types, compress_level_from, compress_level_to, compression_max_dict_bytes, - compression_zstd_max_train_bytes); + compression_zstd_max_train_bytes, compression_max_dict_buffer_bytes); if (!st.ok()) { fprintf(stderr, "Failed to recompress: %s\n", st.ToString().c_str()); exit(1); diff --git a/util/compression.h b/util/compression.h index 53e977c88..5ce419c9c 100644 --- a/util/compression.h +++ b/util/compression.h @@ -627,6 +627,9 @@ inline std::string CompressionOptionsToString( result.append("enabled=") .append(ToString(compression_options.enabled)) .append("; "); + result.append("max_dict_buffer_bytes=") + .append(ToString(compression_options.max_dict_buffer_bytes)) + .append("; "); return result; } diff --git a/util/string_util.cc b/util/string_util.cc index c44992f88..97bd851a2 100644 --- a/util/string_util.cc +++ b/util/string_util.cc @@ -279,9 +279,16 @@ bool StartsWith(const std::string& string, const std::string& pattern) { #ifndef ROCKSDB_LITE bool ParseBoolean(const std::string& type, const std::string& value) { - if (value == "true" || value == "1") { + const static std::string kTrue = "true", kFalse = "false"; + if (value.compare(0 /* pos */, kTrue.size(), kTrue) == 0) { return true; - } else if (value == "false" || value == "0") { + } else if (value.compare(0 /* pos */, kFalse.size(), kFalse) == 0) { + return false; + } + int num = ParseInt(value); + if (num == 1) { + return true; + } else if (num == 0) { return false; } throw std::invalid_argument(type);