diff --git a/HISTORY.md b/HISTORY.md index a92f90edf..e7729005f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,13 @@ * Fix JEMALLOC_CXX_THROW macro missing from older Jemalloc versions, causing build failures on some platforms. +## Unreleased +### New Features +* Added a feature to perform data-block sampling for compressibility, and report stats to user. +### Public API Change +### Bug fixes + + ## 6.0.0 (2/19/2019) ### New Features * Enabled checkpoint on readonly db (DBImplReadOnly). diff --git a/db/builder.cc b/db/builder.cc index a60eb7ece..a41a8ca4c 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -47,8 +47,8 @@ TableBuilder* NewTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, int level, - const bool skip_filters, const uint64_t creation_time, + uint64_t sample_for_compression, const CompressionOptions& compression_opts, + int level, const bool skip_filters, const uint64_t creation_time, const uint64_t oldest_key_time, const uint64_t target_file_size) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == @@ -56,9 +56,9 @@ TableBuilder* NewTableBuilder( return ioptions.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, moptions, internal_comparator, int_tbl_prop_collector_factories, compression_type, - compression_opts, skip_filters, column_family_name, - level, creation_time, oldest_key_time, - target_file_size), + sample_for_compression, compression_opts, + skip_filters, column_family_name, level, + creation_time, oldest_key_time, target_file_size), column_family_id, file); } @@ -75,11 +75,12 @@ Status BuildTable( std::vector snapshots, SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, const CompressionType compression, - const CompressionOptions& compression_opts, bool paranoid_file_checks, - InternalStats* internal_stats, TableFileCreationReason reason, - EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, - TableProperties* table_properties, int level, const uint64_t creation_time, - const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint) { + uint64_t sample_for_compression, const CompressionOptions& compression_opts, + bool paranoid_file_checks, InternalStats* internal_stats, + TableFileCreationReason reason, EventLogger* event_logger, int job_id, + const Env::IOPriority io_priority, TableProperties* table_properties, + int level, const uint64_t creation_time, const uint64_t oldest_key_time, + Env::WriteLifeTimeHint write_hint) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); @@ -133,8 +134,8 @@ Status BuildTable( ioptions, mutable_cf_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, column_family_name, file_writer.get(), compression, - compression_opts_for_flush, level, false /* skip_filters */, - creation_time, oldest_key_time); + sample_for_compression, compression_opts_for_flush, level, + false /* skip_filters */, creation_time, oldest_key_time); } MergeHelper merge(env, internal_comparator.user_comparator(), diff --git a/db/builder.h b/db/builder.h index 95985a558..c00c8273c 100644 --- a/db/builder.h +++ b/db/builder.h @@ -47,6 +47,7 @@ TableBuilder* NewTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, WritableFileWriter* file, const CompressionType compression_type, + const uint64_t sample_for_compression, const CompressionOptions& compression_opts, int level, const bool skip_filters = false, const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0); @@ -72,6 +73,7 @@ extern Status BuildTable( std::vector snapshots, SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, const CompressionType compression, + const uint64_t sample_for_compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger = nullptr, int job_id = 0, diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 2ecb960fd..b208b444c 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1499,6 +1499,7 @@ Status CompactionJob::OpenCompactionOutputFile( cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(), sub_compact->compaction->output_compression(), + 0 /*sample_for_compression */, sub_compact->compaction->output_compression_opts(), sub_compact->compaction->output_level(), skip_filters, output_file_creation_time, 0 /* oldest_key_time */, diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 6321b20d1..0806c486e 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -1029,6 +1029,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), + mutable_cf_options.sample_for_compression, cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->internal_stats(), TableFileCreationReason::kRecovery, &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */, diff --git a/db/flush_job.cc b/db/flush_job.cc index 413fa1c99..f8837352f 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -372,7 +372,8 @@ Status FlushJob::WriteLevel0Table() { cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->GetName(), existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, - output_compression_, cfd_->ioptions()->compression_opts, + output_compression_, mutable_cf_options_.sample_for_compression, + cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */, current_time, diff --git a/db/repair.cc b/db/repair.cc index dcccd2196..ae74e578c 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -430,10 +430,11 @@ class Repairer { &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, snapshot_checker, kNoCompression, - CompressionOptions(), false, nullptr /* internal_stats */, - TableFileCreationReason::kRecovery, nullptr /* event_logger */, - 0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */, - -1 /* level */, current_time, write_hint); + 0 /* sample_for_compression */, CompressionOptions(), false, + nullptr /* internal_stats */, TableFileCreationReason::kRecovery, + nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, + nullptr /* table_properties */, -1 /* level */, current_time, + write_hint); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), diff --git a/db/table_properties_collector.cc b/db/table_properties_collector.cc index 4254e179c..4dbcd4cc4 100644 --- a/db/table_properties_collector.cc +++ b/db/table_properties_collector.cc @@ -41,6 +41,13 @@ Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key, ikey.sequence, file_size); } +void UserKeyTablePropertiesCollector::BlockAdd( + uint64_t bLockRawBytes, uint64_t blockCompressedBytesFast, + uint64_t blockCompressedBytesSlow) { + return collector_->BlockAdd(bLockRawBytes, blockCompressedBytesFast, + blockCompressedBytesSlow); +} + Status UserKeyTablePropertiesCollector::Finish( UserCollectedProperties* properties) { return collector_->Finish(properties); diff --git a/db/table_properties_collector.h b/db/table_properties_collector.h index 4792b1a81..e4d621715 100644 --- a/db/table_properties_collector.h +++ b/db/table_properties_collector.h @@ -27,6 +27,10 @@ class IntTblPropCollector { virtual Status InternalAdd(const Slice& key, const Slice& value, uint64_t file_size) = 0; + virtual void BlockAdd(uint64_t blockRawBytes, + uint64_t blockCompressedBytesFast, + uint64_t blockCompressedBytesSlow) = 0; + virtual UserCollectedProperties GetReadableProperties() const = 0; virtual bool NeedCompact() const { return false; } @@ -60,6 +64,10 @@ class UserKeyTablePropertiesCollector : public IntTblPropCollector { virtual Status InternalAdd(const Slice& key, const Slice& value, uint64_t file_size) override; + virtual void BlockAdd(uint64_t blockRawBytes, + uint64_t blockCompressedBytesFast, + uint64_t blockCompressedBytesSlow) override; + virtual Status Finish(UserCollectedProperties* properties) override; virtual const char* Name() const override { return collector_->Name(); } diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index c0ae0bf93..ea561e982 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -52,7 +52,8 @@ void MakeBuilder(const Options& options, const ImmutableCFOptions& ioptions, builder->reset(NewTableBuilder( ioptions, moptions, internal_comparator, int_tbl_prop_collector_factories, kTestColumnFamilyId, kTestColumnFamilyName, writable->get(), - options.compression, options.compression_opts, unknown_level)); + options.compression, options.sample_for_compression, + options.compression_opts, unknown_level)); } } // namespace @@ -172,6 +173,13 @@ class RegularKeysStartWithAInternal : public IntTblPropCollector { return Status::OK(); } + void BlockAdd(uint64_t /* blockRawBytes */, + uint64_t /* blockCompressedBytesFast */, + uint64_t /* blockCompressedBytesSlow */) override { + // Nothing to do. + return; + } + UserCollectedProperties GetReadableProperties() const override { return UserCollectedProperties{}; } diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 5da33ffec..b7ab7c584 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -644,6 +644,12 @@ struct AdvancedColumnFamilyOptions { // Dynamically changeable through SetOptions() API uint64_t ttl = 0; + // If this option is set then 1 in N blocks are compressed + // using a fast (lz4) and slow (zstd) compression algorithm. + // The compressibility is reported as stats and the stored + // data is left uncompressed (unless compression is also requested). + uint64_t sample_for_compression = 0; + // Create ColumnFamilyOptions with default values for all fields AdvancedColumnFamilyOptions(); // Create ColumnFamilyOptions from Options diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 75c180ff4..dacfd1436 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -92,6 +92,14 @@ class TablePropertiesCollector { return Add(key, value); } + // Called after each new block is cut + virtual void BlockAdd(uint64_t /* blockRawBytes */, + uint64_t /* blockCompressedBytesFast */, + uint64_t /* blockCompressedBytesSlow */) { + // Nothing to do here. Callback registers can override. + return; + } + // Finish() will be called when a table has already been built and is ready // for writing the properties block. // @params properties User will add their collected statistics to diff --git a/options/cf_options.h b/options/cf_options.h index 6653e59f5..fed144e4c 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -159,7 +159,8 @@ struct MutableCFOptions { options.max_sequential_skip_in_iterations), paranoid_file_checks(options.paranoid_file_checks), report_bg_io_stats(options.report_bg_io_stats), - compression(options.compression) { + compression(options.compression), + sample_for_compression(options.sample_for_compression) { RefreshDerivedOptions(options.num_levels, options.compaction_style); } @@ -189,7 +190,8 @@ struct MutableCFOptions { max_sequential_skip_in_iterations(0), paranoid_file_checks(false), report_bg_io_stats(false), - compression(Snappy_Supported() ? kSnappyCompression : kNoCompression) {} + compression(Snappy_Supported() ? kSnappyCompression : kNoCompression), + sample_for_compression(0) {} explicit MutableCFOptions(const Options& options); @@ -243,6 +245,7 @@ struct MutableCFOptions { bool paranoid_file_checks; bool report_bg_io_stats; CompressionType compression; + uint64_t sample_for_compression; // Derived options // Per-level target file size. diff --git a/options/options.cc b/options/options.cc index 55cc6777e..3076590cf 100644 --- a/options/options.cc +++ b/options/options.cc @@ -87,7 +87,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) paranoid_file_checks(options.paranoid_file_checks), force_consistency_checks(options.force_consistency_checks), report_bg_io_stats(options.report_bg_io_stats), - ttl(options.ttl) { + ttl(options.ttl), + sample_for_compression(options.sample_for_compression) { assert(memtable_factory.get() != nullptr); if (max_bytes_for_level_multiplier_additional.size() < static_cast(num_levels)) { diff --git a/options/options_helper.cc b/options/options_helper.cc index b54d589a3..d8b094021 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -193,6 +193,7 @@ ColumnFamilyOptions BuildColumnFamilyOptions( cf_opts.paranoid_file_checks = mutable_cf_options.paranoid_file_checks; cf_opts.report_bg_io_stats = mutable_cf_options.report_bg_io_stats; cf_opts.compression = mutable_cf_options.compression; + cf_opts.sample_for_compression = mutable_cf_options.sample_for_compression; cf_opts.table_factory = options.table_factory; // TODO(yhchiang): find some way to handle the following derived options @@ -1927,7 +1928,11 @@ std::unordered_map {"ttl", {offset_of(&ColumnFamilyOptions::ttl), OptionType::kUInt64T, OptionVerificationType::kNormal, true, - offsetof(struct MutableCFOptions, ttl)}}}; + offsetof(struct MutableCFOptions, ttl)}}, + {"sample_for_compression", + {offset_of(&ColumnFamilyOptions::sample_for_compression), + OptionType::kUInt64T, OptionVerificationType::kNormal, true, + offsetof(struct MutableCFOptions, sample_for_compression)}}}; std::unordered_map OptionsHelper::fifo_compaction_options_type_info = { diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 7da8380cf..cf1ce4e21 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -451,6 +451,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "disable_auto_compactions=false;" "report_bg_io_stats=true;" "ttl=60;" + "sample_for_compression=0;" "compaction_options_fifo={max_table_files_size=3;allow_" "compaction=false;};", new_options)); diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 1b6829623..c23a8ba10 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -101,83 +101,105 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { return compressed_size < raw_size - (raw_size / 8u); } -} // namespace - -// format_version is the block format as defined in include/rocksdb/table.h -Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info, - CompressionType* type, uint32_t format_version, - std::string* compressed_output) { - *type = compression_info.type(); - if (compression_info.type() == kNoCompression) { - return raw; - } - +bool CompressBlockInternal(const Slice& raw, + const CompressionInfo& compression_info, + uint32_t format_version, + std::string* compressed_output) { // Will return compressed block contents if (1) the compression method is // supported in this platform and (2) the compression rate is "good enough". switch (compression_info.type()) { case kSnappyCompression: - if (Snappy_Compress(compression_info, raw.data(), raw.size(), - compressed_output) && - GoodCompressionRatio(compressed_output->size(), raw.size())) { - return *compressed_output; - } - break; // fall back to no compression. + return Snappy_Compress(compression_info, raw.data(), raw.size(), + compressed_output); case kZlibCompression: - if (Zlib_Compress( - compression_info, - GetCompressFormatForVersion(kZlibCompression, format_version), - raw.data(), raw.size(), compressed_output) && - GoodCompressionRatio(compressed_output->size(), raw.size())) { - return *compressed_output; - } - break; // fall back to no compression. + return Zlib_Compress( + compression_info, + GetCompressFormatForVersion(kZlibCompression, format_version), + raw.data(), raw.size(), compressed_output); case kBZip2Compression: - if (BZip2_Compress( - compression_info, - GetCompressFormatForVersion(kBZip2Compression, format_version), - raw.data(), raw.size(), compressed_output) && - GoodCompressionRatio(compressed_output->size(), raw.size())) { - return *compressed_output; - } - break; // fall back to no compression. + return BZip2_Compress( + compression_info, + GetCompressFormatForVersion(kBZip2Compression, format_version), + raw.data(), raw.size(), compressed_output); case kLZ4Compression: - if (LZ4_Compress( - compression_info, - GetCompressFormatForVersion(kLZ4Compression, format_version), - raw.data(), raw.size(), compressed_output) && - GoodCompressionRatio(compressed_output->size(), raw.size())) { - return *compressed_output; - } - break; // fall back to no compression. + return LZ4_Compress( + compression_info, + GetCompressFormatForVersion(kLZ4Compression, format_version), + raw.data(), raw.size(), compressed_output); case kLZ4HCCompression: - if (LZ4HC_Compress( - compression_info, - GetCompressFormatForVersion(kLZ4HCCompression, format_version), - raw.data(), raw.size(), compressed_output) && - GoodCompressionRatio(compressed_output->size(), raw.size())) { - return *compressed_output; - } - break; // fall back to no compression. + return LZ4HC_Compress( + compression_info, + GetCompressFormatForVersion(kLZ4HCCompression, format_version), + raw.data(), raw.size(), compressed_output); case kXpressCompression: - if (XPRESS_Compress(raw.data(), raw.size(), - compressed_output) && - GoodCompressionRatio(compressed_output->size(), raw.size())) { - return *compressed_output; - } - break; + return XPRESS_Compress(raw.data(), raw.size(), compressed_output); case kZSTD: case kZSTDNotFinalCompression: - if (ZSTD_Compress(compression_info, raw.data(), raw.size(), - compressed_output) && - GoodCompressionRatio(compressed_output->size(), raw.size())) { - return *compressed_output; - } - break; // fall back to no compression. - default: {} // Do not recognize this compression type + return ZSTD_Compress(compression_info, raw.data(), raw.size(), + compressed_output); + default: + // Do not recognize this compression type + return false; + } +} + +} // namespace + +// format_version is the block format as defined in include/rocksdb/table.h +Slice CompressBlock(const Slice& raw, const CompressionInfo& info, + CompressionType* type, uint32_t format_version, + bool do_sample, std::string* compressed_output, + std::string* sampled_output_fast, + std::string* sampled_output_slow) { + *type = info.type(); + + if (info.type() == kNoCompression && !info.SampleForCompression()) { + return raw; } - // Compression method is not supported, or not good compression ratio, so just - // fall back to uncompressed form. + // If requested, we sample one in every N block with a + // fast and slow compression algorithm and report the stats. + // The users can use these stats to decide if it is worthwhile + // enabling compression and they also get a hint about which + // compression algorithm wil be beneficial. + if (do_sample && info.SampleForCompression() && + Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) && + sampled_output_fast && sampled_output_slow) { + // Sampling with a fast compression algorithm + if (LZ4_Supported() || Snappy_Supported()) { + CompressionType c = + LZ4_Supported() ? kLZ4Compression : kSnappyCompression; + CompressionContext context(c); + CompressionOptions options; + CompressionInfo info_tmp(options, context, + CompressionDict::GetEmptyDict(), c, + info.SampleForCompression()); + + CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast); + } + + // Sampling with a slow but high-compression algorithm + if (ZSTD_Supported() || Zlib_Supported()) { + CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression; + CompressionContext context(c); + CompressionOptions options; + CompressionInfo info_tmp(options, context, + CompressionDict::GetEmptyDict(), c, + info.SampleForCompression()); + CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow); + } + } + + // Actually compress the data + if (*type != kNoCompression) { + if (CompressBlockInternal(raw, info, format_version, compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + } + + // Compression method is not supported, or not good + // compression ratio, so just fall back to uncompressed form. *type = kNoCompression; return raw; } @@ -217,6 +239,14 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector return Status::OK(); } + virtual void BlockAdd(uint64_t /* blockRawBytes */, + uint64_t /* blockCompressedBytesFast */, + uint64_t /* blockCompressedBytesSlow */) override { + // Intentionally left blank. No interest in collecting stats for + // blocks. + return; + } + Status Finish(UserCollectedProperties* properties) override { std::string val; PutFixed32(&val, static_cast(index_type_)); @@ -269,6 +299,7 @@ struct BlockBasedTableBuilder::Rep { std::string last_key; CompressionType compression_type; + uint64_t sample_for_compression; CompressionOptions compression_opts; std::unique_ptr compression_dict; CompressionContext compression_ctx; @@ -328,6 +359,7 @@ struct BlockBasedTableBuilder::Rep { int_tbl_prop_collector_factories, uint32_t _column_family_id, WritableFileWriter* f, const CompressionType _compression_type, + const uint64_t _sample_for_compression, const CompressionOptions& _compression_opts, const bool skip_filters, const std::string& _column_family_name, const uint64_t _creation_time, const uint64_t _oldest_key_time, const uint64_t _target_file_size) @@ -350,6 +382,7 @@ struct BlockBasedTableBuilder::Rep { range_del_block(1 /* block_restart_interval */), internal_prefix_transform(_moptions.prefix_extractor.get()), compression_type(_compression_type), + sample_for_compression(_sample_for_compression), compression_opts(_compression_opts), compression_dict(), compression_ctx(_compression_type), @@ -415,6 +448,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, WritableFileWriter* file, const CompressionType compression_type, + const uint64_t sample_for_compression, const CompressionOptions& compression_opts, const bool skip_filters, const std::string& column_family_name, const uint64_t creation_time, const uint64_t oldest_key_time, const uint64_t target_file_size) { @@ -430,11 +464,11 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( sanitized_table_options.format_version = 1; } - rep_ = new Rep(ioptions, moptions, sanitized_table_options, - internal_comparator, int_tbl_prop_collector_factories, - column_family_id, file, compression_type, compression_opts, - skip_filters, column_family_name, creation_time, - oldest_key_time, target_file_size); + rep_ = new Rep( + ioptions, moptions, sanitized_table_options, internal_comparator, + int_tbl_prop_collector_factories, column_family_id, file, + compression_type, sample_for_compression, compression_opts, skip_filters, + column_family_name, creation_time, oldest_key_time, target_file_size); if (rep_->filter_builder != nullptr) { rep_->filter_builder->StartBlock(0); @@ -558,6 +592,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, Rep* r = rep_; auto type = r->compression_type; + uint64_t sample_for_compression = r->sample_for_compression; Slice block_contents; bool abort_compression = false; @@ -581,10 +616,20 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, } assert(compression_dict != nullptr); CompressionInfo compression_info(r->compression_opts, r->compression_ctx, - *compression_dict, r->compression_type); - block_contents = - CompressBlock(raw_block_contents, compression_info, &type, - r->table_options.format_version, &r->compressed_output); + *compression_dict, type, + sample_for_compression); + + std::string sampled_output_fast; + std::string sampled_output_slow; + block_contents = CompressBlock( + raw_block_contents, compression_info, &type, + r->table_options.format_version, is_data_block /* do_sample */, + &r->compressed_output, &sampled_output_fast, &sampled_output_slow); + + // notify collectors on block add + NotifyCollectTableCollectorsOnBlockAdd( + r->table_properties_collectors, raw_block_contents.size(), + sampled_output_fast.size(), sampled_output_slow.size()); // Some of the compression algorithms are known to be unreliable. If // the verify_compression flag is set then try to de-compress the diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index b82bec16f..b10494e7b 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -45,6 +45,7 @@ class BlockBasedTableBuilder : public TableBuilder { int_tbl_prop_collector_factories, uint32_t column_family_id, WritableFileWriter* file, const CompressionType compression_type, + const uint64_t sample_for_compression, const CompressionOptions& compression_opts, const bool skip_filters, const std::string& column_family_name, const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0); @@ -137,6 +138,8 @@ class BlockBasedTableBuilder : public TableBuilder { Slice CompressBlock(const Slice& raw, const CompressionInfo& info, CompressionType* type, uint32_t format_version, - std::string* compressed_output); + bool do_sample, std::string* compressed_output, + std::string* sampled_output_fast, + std::string* sampled_output_slow); } // namespace rocksdb diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index e5ba4edf7..cda8d1e27 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -214,6 +214,7 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( table_options_, table_builder_options.internal_comparator, table_builder_options.int_tbl_prop_collector_factories, column_family_id, file, table_builder_options.compression_type, + table_builder_options.sample_for_compression, table_builder_options.compression_opts, table_builder_options.skip_filters, table_builder_options.column_family_name, diff --git a/table/data_block_hash_index_test.cc b/table/data_block_hash_index_test.cc index ad0224bbb..11226648e 100644 --- a/table/data_block_hash_index_test.cc +++ b/table/data_block_hash_index_test.cc @@ -558,8 +558,9 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2, builder.reset(ioptions.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, moptions, internal_comparator, &int_tbl_prop_collector_factories, - options.compression, CompressionOptions(), - false /* skip_filters */, column_family_name, level_), + options.compression, options.sample_for_compression, + CompressionOptions(), false /* skip_filters */, + column_family_name, level_), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer.get())); diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index d8e241d0a..6c04d079a 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -151,6 +151,16 @@ bool NotifyCollectTableCollectorsOnAdd( return all_succeeded; } +void NotifyCollectTableCollectorsOnBlockAdd( + const std::vector>& collectors, + const uint64_t blockRawBytes, const uint64_t blockCompressedBytesFast, + const uint64_t blockCompressedBytesSlow) { + for (auto& collector : collectors) { + collector->BlockAdd(blockRawBytes, blockCompressedBytesFast, + blockCompressedBytesSlow); + } +} + bool NotifyCollectTableCollectorsOnFinish( const std::vector>& collectors, Logger* info_log, PropertyBlockBuilder* builder) { diff --git a/table/meta_blocks.h b/table/meta_blocks.h index 14ea6916a..6efd1225e 100644 --- a/table/meta_blocks.h +++ b/table/meta_blocks.h @@ -83,6 +83,11 @@ bool NotifyCollectTableCollectorsOnAdd( const std::vector>& collectors, Logger* info_log); +void NotifyCollectTableCollectorsOnBlockAdd( + const std::vector>& collectors, + uint64_t blockRawBytes, uint64_t blockCompressedBytesFast, + uint64_t blockCompressedBytesSlow); + // NotifyCollectTableCollectorsOnAdd() triggers the `Finish` event for all // property collectors. The collected properties will be added to `builder`. bool NotifyCollectTableCollectorsOnFinish( diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index d7d3290ae..b9a7273e0 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -202,6 +202,8 @@ Status SstFileWriter::Open(const std::string& file_path) { compression_type = r->mutable_cf_options.compression; compression_opts = r->ioptions.compression_opts; } + uint64_t sample_for_compression = + r->mutable_cf_options.sample_for_compression; std::vector> int_tbl_prop_collector_factories; @@ -234,8 +236,9 @@ Status SstFileWriter::Open(const std::string& file_path) { TableBuilderOptions table_builder_options( r->ioptions, r->mutable_cf_options, r->internal_comparator, - &int_tbl_prop_collector_factories, compression_type, compression_opts, - r->skip_filters, r->column_family_name, unknown_level); + &int_tbl_prop_collector_factories, compression_type, + sample_for_compression, compression_opts, r->skip_filters, + r->column_family_name, unknown_level); r->file_writer.reset(new WritableFileWriter( std::move(sst_file), file_path, r->env_options, r->ioptions.env, nullptr /* stats */, r->ioptions.listeners)); diff --git a/table/sst_file_writer_collectors.h b/table/sst_file_writer_collectors.h index 89e0970d8..b3970151d 100644 --- a/table/sst_file_writer_collectors.h +++ b/table/sst_file_writer_collectors.h @@ -33,6 +33,14 @@ class SstFileWriterPropertiesCollector : public IntTblPropCollector { return Status::OK(); } + virtual void BlockAdd(uint64_t /* blockRawBytes */, + uint64_t /* blockCompressedBytesFast */, + uint64_t /* blockCompressedBytesSlow */) override { + // Intentionally left blank. No interest in collecting stats for + // blocks. + return; + } + virtual Status Finish(UserCollectedProperties* properties) override { // File version std::string version_val; diff --git a/table/table_builder.h b/table/table_builder.h index 2ec06f773..20d9a55f2 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -73,7 +73,7 @@ struct TableBuilderOptions { const InternalKeyComparator& _internal_comparator, const std::vector>* _int_tbl_prop_collector_factories, - CompressionType _compression_type, + CompressionType _compression_type, uint64_t _sample_for_compression, const CompressionOptions& _compression_opts, bool _skip_filters, const std::string& _column_family_name, int _level, const uint64_t _creation_time = 0, const int64_t _oldest_key_time = 0, @@ -83,6 +83,7 @@ struct TableBuilderOptions { internal_comparator(_internal_comparator), int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories), compression_type(_compression_type), + sample_for_compression(_sample_for_compression), compression_opts(_compression_opts), skip_filters(_skip_filters), column_family_name(_column_family_name), @@ -96,6 +97,7 @@ struct TableBuilderOptions { const std::vector>* int_tbl_prop_collector_factories; CompressionType compression_type; + uint64_t sample_for_compression; const CompressionOptions& compression_opts; bool skip_filters; // only used by BlockBasedTableBuilder const std::string& column_family_name; diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index f44ca3779..a9b75715b 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -100,8 +100,9 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, tb = opts.table_factory->NewTableBuilder( TableBuilderOptions( ioptions, moptions, ikc, &int_tbl_prop_collector_factories, - CompressionType::kNoCompression, CompressionOptions(), - false /* skip_filters */, kDefaultColumnFamilyName, unknown_level), + CompressionType::kNoCompression, 0 /* sample_for_compression */, + CompressionOptions(), false /* skip_filters */, + kDefaultColumnFamilyName, unknown_level), 0 /* column_family_id */, file_writer.get()); } else { s = DB::Open(opts, dbname, &db); diff --git a/table/table_test.cc b/table/table_test.cc index 04e11de8e..666628f44 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -328,9 +328,9 @@ class TableConstructor: public Constructor { builder.reset(ioptions.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, moptions, internal_comparator, &int_tbl_prop_collector_factories, - options.compression, CompressionOptions(), - false /* skip_filters */, column_family_name, - level_), + options.compression, options.sample_for_compression, + CompressionOptions(), false /* skip_filters */, + column_family_name, level_), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer_.get())); @@ -2627,10 +2627,10 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) { std::string column_family_name; int unknown_level = -1; std::unique_ptr builder(factory.NewTableBuilder( - TableBuilderOptions(ioptions, moptions, ikc, - &int_tbl_prop_collector_factories, kNoCompression, - CompressionOptions(), false /* skip_filters */, - column_family_name, unknown_level), + TableBuilderOptions( + ioptions, moptions, ikc, &int_tbl_prop_collector_factories, + kNoCompression, 0 /* sample_for_compression */, CompressionOptions(), + false /* skip_filters */, column_family_name, unknown_level), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer.get())); @@ -3256,8 +3256,8 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) { std::unique_ptr builder(options.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, moptions, ikc, &int_tbl_prop_collector_factories, kNoCompression, - CompressionOptions(), false /* skip_filters */, - column_family_name, -1), + 0 /* sample_for_compression */, CompressionOptions(), + false /* skip_filters */, column_family_name, -1), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer.get())); @@ -3436,8 +3436,8 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { std::unique_ptr builder(options.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, moptions, ikc, &int_tbl_prop_collector_factories, kNoCompression, - CompressionOptions(), false /* skip_filters */, - column_family_name, -1), + 0 /* sample_for_compression */, CompressionOptions(), + false /* skip_filters */, column_family_name, -1), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer.get())); @@ -3529,8 +3529,8 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { std::unique_ptr builder(options.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, moptions, ikc, &int_tbl_prop_collector_factories, kNoCompression, - CompressionOptions(), false /* skip_filters */, - column_family_name, -1), + 0 /* sample_for_compression */, CompressionOptions(), + false /* skip_filters */, column_family_name, -1), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer.get())); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index ed969c0de..ac8e47459 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -801,6 +801,8 @@ DEFINE_string(compression_type, "snappy", static enum rocksdb::CompressionType FLAGS_compression_type_e = rocksdb::kSnappyCompression; +DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression"); + DEFINE_int32(compression_level, rocksdb::CompressionOptions().level, "Compression level. The meaning of this value is library-" "dependent. If unset, we try to use the default for the library " @@ -2195,6 +2197,8 @@ class Benchmark { auto compression = CompressionTypeToString(FLAGS_compression_type_e); fprintf(stdout, "Compression: %s\n", compression.c_str()); + fprintf(stdout, "Compression sampling rate: %" PRId64 "\n", + FLAGS_sample_for_compression); switch (FLAGS_rep_factory) { case kPrefixHash: @@ -2234,7 +2238,8 @@ class Benchmark { CompressionOptions opts; CompressionContext context(FLAGS_compression_type_e); CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), - FLAGS_compression_type_e); + FLAGS_compression_type_e, + FLAGS_sample_for_compression); bool result = CompressSlice(info, Slice(input_str), &compressed); if (!result) { @@ -3101,7 +3106,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { CompressionOptions opts; CompressionContext context(FLAGS_compression_type_e); CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), - FLAGS_compression_type_e); + FLAGS_compression_type_e, + FLAGS_sample_for_compression); // Compress 1G while (ok && bytes < int64_t(1) << 30) { compressed.clear(); @@ -3129,9 +3135,9 @@ void VerifyDBFromDB(std::string& truth_db_name) { CompressionContext compression_ctx(FLAGS_compression_type_e); CompressionOptions compression_opts; - CompressionInfo compression_info(compression_opts, compression_ctx, - CompressionDict::GetEmptyDict(), - FLAGS_compression_type_e); + CompressionInfo compression_info( + compression_opts, compression_ctx, CompressionDict::GetEmptyDict(), + FLAGS_compression_type_e, FLAGS_sample_for_compression); UncompressionContext uncompression_ctx(FLAGS_compression_type_e); UncompressionInfo uncompression_info(uncompression_ctx, UncompressionDict::GetEmptyDict(), @@ -3488,6 +3494,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type_e; + options.sample_for_compression = FLAGS_sample_for_compression; options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds; options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB; options.max_total_wal_size = FLAGS_max_total_wal_size; diff --git a/tools/sst_dump_test.cc b/tools/sst_dump_test.cc index b2729852c..6bf3e3b97 100644 --- a/tools/sst_dump_test.cc +++ b/tools/sst_dump_test.cc @@ -59,8 +59,9 @@ void createSST(const Options& opts, const std::string& file_name) { tb.reset(opts.table_factory->NewTableBuilder( TableBuilderOptions( imoptions, moptions, ikc, &int_tbl_prop_collector_factories, - CompressionType::kNoCompression, CompressionOptions(), - false /* skip_filters */, column_family_name, unknown_level), + CompressionType::kNoCompression, 0 /* sample_for_compression */, + CompressionOptions(), false /* skip_filters */, column_family_name, + unknown_level), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer.get())); diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc index 3312c1819..5cbbfc385 100644 --- a/tools/sst_dump_tool.cc +++ b/tools/sst_dump_tool.cc @@ -214,10 +214,10 @@ int SstFileDumper::ShowAllCompressionSizes( CompressionOptions compress_opt; std::string column_family_name; int unknown_level = -1; - TableBuilderOptions tb_opts(imoptions, moptions, ikc, - &block_based_table_factories, i.first, - compress_opt, false /* skip_filters */, - column_family_name, unknown_level); + TableBuilderOptions tb_opts( + imoptions, moptions, ikc, &block_based_table_factories, i.first, + 0 /* sample_for_compression */, compress_opt, + false /* skip_filters */, column_family_name, unknown_level); uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size); fprintf(stdout, "Compression: %s", i.second); fprintf(stdout, " Size: %" PRIu64 "\n", file_size); diff --git a/util/compression.h b/util/compression.h index a4d95a1db..b568238e6 100644 --- a/util/compression.h +++ b/util/compression.h @@ -333,17 +333,24 @@ class CompressionInfo { const CompressionContext& context_; const CompressionDict& dict_; const CompressionType type_; + const uint64_t sample_for_compression_; public: CompressionInfo(const CompressionOptions& _opts, const CompressionContext& _context, - const CompressionDict& _dict, CompressionType _type) - : opts_(_opts), context_(_context), dict_(_dict), type_(_type) {} + const CompressionDict& _dict, CompressionType _type, + uint64_t _sample_for_compression) + : opts_(_opts), + context_(_context), + dict_(_dict), + type_(_type), + sample_for_compression_(_sample_for_compression) {} const CompressionOptions& options() const { return opts_; } const CompressionContext& context() const { return context_; } const CompressionDict& dict() const { return dict_; } CompressionType type() const { return type_; } + uint64_t SampleForCompression() const { return sample_for_compression_; } }; class UncompressionContext { diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 109ea06ce..ccfbafb0d 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -755,9 +755,10 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, CompressionType type = bdb_options_.compression; CompressionOptions opts; CompressionContext context(type); - CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type); - CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, - compression_output); + CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type, + 0 /* sample_for_compression */); + CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false, + compression_output, nullptr, nullptr); return *compression_output; }