fix SstFileWriter with dictionary compression (#7323)

Summary:
In block-based table builder, the cut-over from buffered to unbuffered
mode involves sampling the buffered blocks and generating a dictionary.
There was a bug where `SstFileWriter` passed zero as the `target_file_size`
causing the cutover to happen immediately, so there were no samples
available for generating the dictionary.

This PR changes the meaning of `target_file_size == 0` to mean buffer
the whole file before cutting over. It also adds dictionary compression
support to `sst_dump --command=recompress` for easy evaluation.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7323

Reviewed By: cheng-chang

Differential Revision: D23412158

Pulled By: ajkr

fbshipit-source-id: 3b232050e70ef3c2ee85a4b5f6fadb139c569873
This commit is contained in:
Andrew Kryczka 2020-09-03 15:48:29 -07:00 committed by Facebook GitHub Bot
parent 5b1ccdc191
commit af54c4092a
5 changed files with 42 additions and 6 deletions

View File

@ -9,6 +9,7 @@
* Fix useless no-op compactions scheduled upon snapshot release when options.disable-auto-compactions = true. * Fix useless no-op compactions scheduled upon snapshot release when options.disable-auto-compactions = true.
* Fix a bug when max_write_buffer_size_to_maintain is set, immutable flushed memtable destruction is delayed until the next super version is installed. A memtable is not added to delete list because of its reference hold by super version and super version doesn't switch because of empt delete list. So memory usage keeps on increasing beyond write_buffer_size + max_write_buffer_size_to_maintain. * Fix a bug when max_write_buffer_size_to_maintain is set, immutable flushed memtable destruction is delayed until the next super version is installed. A memtable is not added to delete list because of its reference hold by super version and super version doesn't switch because of empt delete list. So memory usage keeps on increasing beyond write_buffer_size + max_write_buffer_size_to_maintain.
* Avoid converting MERGES to PUTS when allow_ingest_behind is true. * Avoid converting MERGES to PUTS when allow_ingest_behind is true.
* Fix compression dictionary sampling together with `SstFileWriter`. Previously, the dictionary would be trained/finalized immediately with zero samples. Now, the whole `SstFileWriter` file is buffered in memory and then sampled.
### New Features ### New Features
* A new option `std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory` is added to `BackupableDBOptions`. The default value for this option is `nullptr`. If this option is null, the default backup engine checksum function (crc32c) will be used for creating, verifying, or restoring backups. If it is not null and is set to the DB custom checksum factory, the custom checksum function used in DB will also be used for creating, verifying, or restoring backups, in addition to the default checksum function (crc32c). If it is not null and is set to a custom checksum factory different than the DB custom checksum factory (which may be null), BackupEngine will return `Status::InvalidArgument()`. * A new option `std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory` is added to `BackupableDBOptions`. The default value for this option is `nullptr`. If this option is null, the default backup engine checksum function (crc32c) will be used for creating, verifying, or restoring backups. If it is not null and is set to the DB custom checksum factory, the custom checksum function used in DB will also be used for creating, verifying, or restoring backups, in addition to the default checksum function (crc32c). If it is not null and is set to a custom checksum factory different than the DB custom checksum factory (which may be null), BackupEngine will return `Status::InvalidArgument()`.

View File

@ -730,7 +730,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
r->first_key_in_next_block = &key; r->first_key_in_next_block = &key;
Flush(); Flush();
if (r->state == Rep::State::kBuffered && if (r->state == Rep::State::kBuffered && r->target_file_size != 0 &&
r->data_begin_offset > r->target_file_size) { r->data_begin_offset > r->target_file_size) {
EnterUnbuffered(); EnterUnbuffered();
} }

View File

@ -226,12 +226,15 @@ int SstFileDumper::ShowAllCompressionSizes(
size_t block_size, size_t block_size,
const std::vector<std::pair<CompressionType, const char*>>& const std::vector<std::pair<CompressionType, const char*>>&
compression_types, compression_types,
int32_t compress_level_from, int32_t compress_level_to) { int32_t compress_level_from, int32_t compress_level_to,
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes) {
fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size); fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size);
for (auto& i : compression_types) { for (auto& i : compression_types) {
if (CompressionTypeSupported(i.first)) { if (CompressionTypeSupported(i.first)) {
fprintf(stdout, "Compression: %-24s\n", i.second); fprintf(stdout, "Compression: %-24s\n", i.second);
CompressionOptions compress_opt; CompressionOptions compress_opt;
compress_opt.max_dict_bytes = max_dict_bytes;
compress_opt.zstd_max_train_bytes = zstd_max_train_bytes;
for (int32_t j = compress_level_from; j <= compress_level_to; j++) { for (int32_t j = compress_level_from; j <= compress_level_to; j++) {
fprintf(stdout, "Compression level: %d", j); fprintf(stdout, "Compression level: %d", j);
compress_opt.level = j; compress_opt.level = j;

View File

@ -38,9 +38,9 @@ class SstFileDumper {
int ShowAllCompressionSizes( int ShowAllCompressionSizes(
size_t block_size, size_t block_size,
const std::vector<std::pair<CompressionType, const char*>>& const std::vector<std::pair<CompressionType, const char*>>&
compression_types, compression_types,
int32_t compress_level_from, int32_t compress_level_from, int32_t compress_level_to,
int32_t compress_level_to); uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes);
int ShowCompressionSize( int ShowCompressionSize(
size_t block_size, size_t block_size,

View File

@ -97,6 +97,12 @@ void print_help(bool to_stderr) {
--compression_level_to=<compression_level> --compression_level_to=<compression_level>
Compression level to stop compressing when executing recompress. One compression type Compression level to stop compressing when executing recompress. One compression type
and compression_level_from must also be specified and compression_level_from must also be specified
--compression_max_dict_bytes=<uint32_t>
Maximum size of dictionary used to prime the compression library
--compression_zstd_max_train_bytes=<uint32_t>
Maximum size of training data passed to zstd's dictionary trainer
)"); )");
} }
@ -156,6 +162,10 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
uint64_t total_filter_block_size = 0; uint64_t total_filter_block_size = 0;
int32_t compress_level_from = CompressionOptions::kDefaultCompressionLevel; int32_t compress_level_from = CompressionOptions::kDefaultCompressionLevel;
int32_t compress_level_to = CompressionOptions::kDefaultCompressionLevel; int32_t compress_level_to = CompressionOptions::kDefaultCompressionLevel;
uint32_t compression_max_dict_bytes =
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_bytes;
uint32_t compression_zstd_max_train_bytes =
ROCKSDB_NAMESPACE::CompressionOptions().zstd_max_train_bytes;
int64_t tmp_val; int64_t tmp_val;
@ -244,6 +254,27 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
"compression_level_to must be numeric", &tmp_val)) { "compression_level_to must be numeric", &tmp_val)) {
has_compression_level_to = true; has_compression_level_to = true;
compress_level_to = static_cast<int>(tmp_val); compress_level_to = static_cast<int>(tmp_val);
} else if (ParseIntArg(argv[i], "--compression_max_dict_bytes=",
"compression_max_dict_bytes must be numeric",
&tmp_val)) {
if (tmp_val < 0 || tmp_val > port::kMaxUint32) {
fprintf(stderr, "compression_max_dict_bytes must be a uint32_t: '%s'\n",
argv[i]);
print_help(/*to_stderr*/ true);
return 1;
}
compression_max_dict_bytes = static_cast<uint32_t>(tmp_val);
} else if (ParseIntArg(argv[i], "--compression_zstd_max_train_bytes=",
"compression_zstd_max_train_bytes must be numeric",
&tmp_val)) {
if (tmp_val < 0 || tmp_val > port::kMaxUint32) {
fprintf(stderr,
"compression_zstd_max_train_bytes must be a uint32_t: '%s'\n",
argv[i]);
print_help(/*to_stderr*/ true);
return 1;
}
compression_zstd_max_train_bytes = static_cast<uint32_t>(tmp_val);
} else if (strcmp(argv[i], "--help") == 0) { } else if (strcmp(argv[i], "--help") == 0) {
print_help(/*to_stderr*/ false); print_help(/*to_stderr*/ false);
return 0; return 0;
@ -371,7 +402,8 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
dumper.ShowAllCompressionSizes( dumper.ShowAllCompressionSizes(
set_block_size ? block_size : 16384, set_block_size ? block_size : 16384,
compression_types.empty() ? kCompressions : compression_types, compression_types.empty() ? kCompressions : compression_types,
compress_level_from, compress_level_to); compress_level_from, compress_level_to, compression_max_dict_bytes,
compression_zstd_max_train_bytes);
return 0; return 0;
} }