This commit is contained in:
Andrew Kryczka 2021-02-17 10:42:56 -08:00
parent 553aafaf8e
commit 58ec8cf304
22 changed files with 289 additions and 73 deletions

View File

@ -1,4 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Public API Change
* Add an option, `CompressionOptions::max_dict_buffer_bytes`, to limit the in-memory buffering for selecting samples for generating/training a dictionary. The limit is currently loosely adhered to.
## 6.17.2 (02/05/2021)
### Bug Fixes
* Since 6.15.0, `TransactionDB` returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. There are certain cases where range deletion can still be used on such DBs; see the API doc on `TransactionDB::DeleteRange()` for details.

13
db/c.cc
View File

@ -2774,6 +2774,14 @@ void rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes(
opt->rep.bottommost_compression_opts.enabled = enabled;
}
void rocksdb_options_set_bottommost_compression_options_max_dict_buffer_bytes(
rocksdb_options_t* opt, uint64_t max_dict_buffer_bytes,
unsigned char enabled) {
opt->rep.bottommost_compression_opts.max_dict_buffer_bytes =
max_dict_buffer_bytes;
opt->rep.bottommost_compression_opts.enabled = enabled;
}
void rocksdb_options_set_compression_options(rocksdb_options_t* opt, int w_bits,
int level, int strategy,
int max_dict_bytes) {
@ -2788,6 +2796,11 @@ void rocksdb_options_set_compression_options_zstd_max_train_bytes(
opt->rep.compression_opts.zstd_max_train_bytes = zstd_max_train_bytes;
}
void rocksdb_options_set_compression_options_max_dict_buffer_bytes(
rocksdb_options_t* opt, uint64_t max_dict_buffer_bytes) {
opt->rep.compression_opts.max_dict_buffer_bytes = max_dict_buffer_bytes;
}
void rocksdb_options_set_prefix_extractor(
rocksdb_options_t* opt, rocksdb_slicetransform_t* prefix_extractor) {
opt->rep.prefix_extractor.reset(prefix_extractor);

View File

@ -1410,67 +1410,88 @@ INSTANTIATE_TEST_CASE_P(
TEST_P(PresetCompressionDictTest, Flush) {
// Verifies that dictionary is generated and written during flush only when
// `ColumnFamilyOptions::compression` enables dictionary.
// `ColumnFamilyOptions::compression` enables dictionary. Also verifies the
// size of the dictionary is within expectations according to the limit on
// buffering set by `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.memtable_factory.reset(new SpecialSkipListFactory(kKeysPerFile));
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
uint64_t prev_compression_dict_misses =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
Random rnd(301);
for (size_t i = 0; i <= kKeysPerFile; ++i) {
ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// If there's a compression dictionary, it should have been loaded when the
// flush finished, incurring a cache miss.
uint64_t expected_compression_dict_misses;
// We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
// compression dictionary exists since dictionaries would be preloaded when
// the flush finishes.
if (bottommost_) {
expected_compression_dict_misses = prev_compression_dict_misses;
// Flush is never considered bottommost. This should change in the future
// since flushed files may have nothing underneath them, like the one in
// this test case.
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
} else {
expected_compression_dict_misses = prev_compression_dict_misses + 1;
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit after
// each block is built.
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
2 * kBlockLen);
}
ASSERT_EQ(expected_compression_dict_misses,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
}
TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
// Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when `ColumnFamilyOptions::compression` enables
// dictionary.
// dictionary. Also verifies the size of the dictionary is within expectations
// according to the limit on buffering set by
// `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
@ -1492,8 +1513,8 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
ASSERT_EQ("2,0,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
uint64_t prev_compression_dict_misses =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
uint64_t prev_compression_dict_bytes_inserted =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
// This L0->L1 compaction merges the two L0 files into L1. The produced L1
// file is not bottommost due to the existing L2 file covering the same key-
// range.
@ -1501,38 +1522,52 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,1,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
// If there's a compression dictionary, it should have been loaded when the
// compaction finished, incurring a cache miss.
uint64_t expected_compression_dict_misses;
// We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
// compression dictionary exists since dictionaries would be preloaded when
// the compaction finishes.
if (bottommost_) {
expected_compression_dict_misses = prev_compression_dict_misses;
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
} else {
expected_compression_dict_misses = prev_compression_dict_misses + 1;
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit after
// each block is built.
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted + 2 * kBlockLen);
}
ASSERT_EQ(expected_compression_dict_misses,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
}
TEST_P(PresetCompressionDictTest, CompactBottommost) {
// Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when either `ColumnFamilyOptions::compression` or
// `ColumnFamilyOptions::bottommost_compression` enables dictionary.
// `ColumnFamilyOptions::bottommost_compression` enables dictionary. Also
// verifies the size of the dictionary is within expectations according to the
// limit on buffering set by `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
@ -1548,17 +1583,22 @@ TEST_P(PresetCompressionDictTest, CompactBottommost) {
ASSERT_EQ("2", FilesPerLevel(0));
#endif // ROCKSDB_LITE
uint64_t prev_compression_dict_misses =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
uint64_t prev_compression_dict_bytes_inserted =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
// If there's a compression dictionary, it should have been loaded when the
// compaction finished, incurring a cache miss.
ASSERT_EQ(prev_compression_dict_misses + 1,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit after
// each block is built.
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted + 2 * kBlockLen);
}
class CompactionCompressionListener : public EventListener {

View File

@ -206,6 +206,7 @@ DECLARE_string(bottommost_compression_type);
DECLARE_int32(compression_max_dict_bytes);
DECLARE_int32(compression_zstd_max_train_bytes);
DECLARE_int32(compression_parallel_threads);
DECLARE_uint64(compression_max_dict_buffer_bytes);
DECLARE_string(checksum_type);
DECLARE_string(hdfs);
DECLARE_string(env_uri);

View File

@ -626,6 +626,10 @@ DEFINE_int32(compression_zstd_max_train_bytes, 0,
DEFINE_int32(compression_parallel_threads, 1,
"Number of threads for parallel compression.");
DEFINE_uint64(compression_max_dict_buffer_bytes, 0,
"Buffering limit for SST file data to sample for dictionary "
"compression.");
DEFINE_string(bottommost_compression_type, "disable",
"Algorithm to use to compress bottommost level of the database. "
"\"disable\" means disabling the feature");

View File

@ -2052,6 +2052,8 @@ void StressTest::Open() {
FLAGS_compression_zstd_max_train_bytes;
options_.compression_opts.parallel_threads =
FLAGS_compression_parallel_threads;
options_.compression_opts.max_dict_buffer_bytes =
FLAGS_compression_max_dict_buffer_bytes;
options_.create_if_missing = true;
options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
options_.inplace_update_support = FLAGS_in_place_update;

View File

@ -143,6 +143,28 @@ struct CompressionOptions {
// Default: false.
bool enabled;
// Limit on data buffering, which is used to gather samples to build a
// dictionary. Zero means no limit. When dictionary is disabled
// (`max_dict_bytes == 0`), enabling this limit (`max_dict_buffer_bytes != 0`)
// has no effect.
//
// In compaction, the buffering is limited to the target file size (see
// `target_file_size_base` and `target_file_size_multiplier`) even if this
// setting permits more buffering. Since we cannot determine where the file
// should be cut until data blocks are compressed with dictionary, buffering
// more than the target file size could lead to selecting samples that belong
// to a later output SST.
//
// Limiting too strictly may harm dictionary effectiveness since it forces
// RocksDB to pick samples from the initial portion of the output SST, which
// may not be representative of the whole file. Configuring this limit below
// `zstd_max_train_bytes` (when enabled) can restrict how many samples we can
// pass to the dictionary trainer. Configuring it below `max_dict_bytes` can
// restrict the size of the final dictionary.
//
// Default: 0 (unlimited)
uint64_t max_dict_buffer_bytes;
CompressionOptions()
: window_bits(-14),
level(kDefaultCompressionLevel),
@ -150,17 +172,19 @@ struct CompressionOptions {
max_dict_bytes(0),
zstd_max_train_bytes(0),
parallel_threads(1),
enabled(false) {}
enabled(false),
max_dict_buffer_bytes(0) {}
CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes,
int _zstd_max_train_bytes, int _parallel_threads,
bool _enabled)
bool _enabled, uint64_t _max_dict_buffer_bytes)
: window_bits(wbits),
level(_lev),
strategy(_strategy),
max_dict_bytes(_max_dict_bytes),
zstd_max_train_bytes(_zstd_max_train_bytes),
parallel_threads(_parallel_threads),
enabled(_enabled) {}
enabled(_enabled),
max_dict_buffer_bytes(_max_dict_buffer_bytes) {}
};
enum UpdateStatus { // Return status For inplace update callback

View File

@ -998,11 +998,17 @@ extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_compression_options_zstd_max_train_bytes(rocksdb_options_t*,
int);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_compression_options_max_dict_buffer_bytes(
rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_bottommost_compression_options(rocksdb_options_t*, int, int,
int, int, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes(
rocksdb_options_t*, int, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_bottommost_compression_options_max_dict_buffer_bytes(
rocksdb_options_t*, uint64_t, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_prefix_extractor(
rocksdb_options_t*, rocksdb_slicetransform_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_num_levels(

View File

@ -132,6 +132,27 @@ jint Java_org_rocksdb_CompressionOptions_zstdMaxTrainBytes(
return static_cast<jint>(opt->zstd_max_train_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setMaxDictBufferBytes
* Signature: (JJ)V
*/
void Java_org_rocksdb_CompressionOptions_setMaxDictBufferBytes(
JNIEnv*, jobject, jlong jhandle, jlong jmax_dict_buffer_bytes) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
opt->max_dict_buffer_bytes = static_cast<uint64_t>(jmax_dict_buffer_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: maxDictBufferBytes
* Signature: (J)J
*/
jlong Java_org_rocksdb_CompressionOptions_maxDictBufferBytes(JNIEnv*, jobject,
jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
return static_cast<jlong>(opt->max_dict_buffer_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setEnabled

View File

@ -105,7 +105,7 @@ static Status ParseCompressionOptions(const std::string& value,
}
// Since parallel_threads comes before enabled but was added optionally
// later, we need to check if this is the final token (meaning it is the
// enabled bit), or if there is another token (meaning this one is
// enabled bit), or if there are more tokens (meaning this one is
// parallel_threads)
end = value.find(':', start);
if (end != std::string::npos) {
@ -113,7 +113,6 @@ static Status ParseCompressionOptions(const std::string& value,
ParseInt(value.substr(start, value.size() - start));
} else {
// parallel_threads is not serialized with this format, but enabled is
compression_opts.parallel_threads = CompressionOptions().parallel_threads;
compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start));
}
@ -128,6 +127,18 @@ static Status ParseCompressionOptions(const std::string& value,
}
compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start));
end = value.find(':', start);
}
// max_dict_buffer_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.max_dict_buffer_bytes =
ParseUint64(value.substr(start, value.size() - start));
}
return Status::OK();
}
@ -161,6 +172,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{"enabled",
{offsetof(struct CompressionOptions, enabled), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable}},
{"max_dict_buffer_bytes",
{offsetof(struct CompressionOptions, max_dict_buffer_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
};
static std::unordered_map<std::string, OptionTypeInfo>

View File

@ -201,6 +201,11 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(
log, " Options.bottommost_compression_opts.enabled: %s",
bottommost_compression_opts.enabled ? "true" : "false");
ROCKS_LOG_HEADER(
log,
" Options.bottommost_compression_opts.max_dict_buffer_bytes: "
"%" PRIu64,
bottommost_compression_opts.max_dict_buffer_bytes);
ROCKS_LOG_HEADER(log, " Options.compression_opts.window_bits: %d",
compression_opts.window_bits);
ROCKS_LOG_HEADER(log, " Options.compression_opts.level: %d",
@ -222,6 +227,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log,
" Options.compression_opts.enabled: %s",
compression_opts.enabled ? "true" : "false");
ROCKS_LOG_HEADER(log,
" Options.compression_opts.max_dict_buffer_bytes: "
"%" PRIu64,
compression_opts.max_dict_buffer_bytes);
ROCKS_LOG_HEADER(log, " Options.level0_file_num_compaction_trigger: %d",
level0_file_num_compaction_trigger);
ROCKS_LOG_HEADER(log, " Options.level0_slowdown_writes_trigger: %d",

View File

@ -409,10 +409,11 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
FillWithSpecialChar(options_ptr, sizeof(ColumnFamilyOptions),
kColumnFamilyOptionsExcluded);
// It based on the behavior of compiler that padding bytes are not changed
// when copying the struct. It's prone to failure when compiler behavior
// changes. We verify there is unset bytes to detect the case.
*options = ColumnFamilyOptions();
// Invoke a user-defined constructor in the hope that it does not overwrite
// padding bytes. Note that previously we relied on the implicitly-defined
// copy-assignment operator (i.e., `*options = ColumnFamilyOptions();`) here,
// which did in fact modify padding bytes.
options = new (options_ptr) ColumnFamilyOptions();
// Deprecatd option which is not initialized. Need to set it to avoid
// Valgrind error
@ -470,8 +471,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"max_bytes_for_level_multiplier=60;"
"memtable_factory=SkipListFactory;"
"compression=kNoCompression;"
"compression_opts=5:6:7:8:9:true;"
"bottommost_compression_opts=4:5:6:7:8:true;"
"compression_opts=5:6:7:8:9:10:true:11;"
"bottommost_compression_opts=4:5:6:7:8:9:true:10;"
"bottommost_compression=kDisableCompressionOption;"
"level0_stop_writes_trigger=33;"
"num_levels=99;"

View File

@ -725,7 +725,7 @@ TEST_F(OptionsTest, CompressionOptionsFromString) {
ASSERT_OK(GetColumnFamilyOptionsFromString(
ignore, ColumnFamilyOptions(), "compression_opts=5:6:7:8:9:x:false",
&base_cf_opt));
ASSERT_NOK(GetColumnFamilyOptionsFromString(
ASSERT_OK(GetColumnFamilyOptionsFromString(
config_options, ColumnFamilyOptions(),
"compression_opts=1:2:3:4:5:6:true:8", &base_cf_opt));
ASSERT_OK(GetColumnFamilyOptionsFromString(

View File

@ -11,24 +11,25 @@
#include <assert.h>
#include <stdio.h>
#include <atomic>
#include <list>
#include <map>
#include <memory>
#include <numeric>
#include <string>
#include <unordered_map>
#include <utility>
#include "db/dbformat.h"
#include "index_builder.h"
#include "memory/memory_allocator.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/table.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_filter_block.h"
#include "table/block_based/block_based_table_factory.h"
@ -40,8 +41,6 @@
#include "table/block_based/partitioned_filter_block.h"
#include "table/format.h"
#include "table/table_builder.h"
#include "memory/memory_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -306,6 +305,10 @@ struct BlockBasedTableBuilder::Rep {
kClosed,
};
State state;
// `kBuffered` state is allowed only as long as the buffering of uncompressed
// data blocks (see `data_block_and_keys_buffers`) does not exceed
// `buffer_limit`.
uint64_t buffer_limit;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
@ -321,7 +324,6 @@ struct BlockBasedTableBuilder::Rep {
const std::string& column_family_name;
uint64_t creation_time = 0;
uint64_t oldest_key_time = 0;
const uint64_t target_file_size;
uint64_t file_creation_time = 0;
// DB IDs
@ -407,7 +409,7 @@ struct BlockBasedTableBuilder::Rep {
const CompressionOptions& _compression_opts, const bool skip_filters,
const int _level_at_creation, const std::string& _column_family_name,
const uint64_t _creation_time, const uint64_t _oldest_key_time,
const uint64_t _target_file_size, const uint64_t _file_creation_time,
const uint64_t target_file_size, const uint64_t _file_creation_time,
const std::string& _db_id, const std::string& _db_session_id)
: ioptions(_ioptions),
moptions(_moptions),
@ -448,13 +450,20 @@ struct BlockBasedTableBuilder::Rep {
column_family_name(_column_family_name),
creation_time(_creation_time),
oldest_key_time(_oldest_key_time),
target_file_size(_target_file_size),
file_creation_time(_file_creation_time),
db_id(_db_id),
db_session_id(_db_session_id),
db_host_id(ioptions.db_host_id),
status_ok(true),
io_status_ok(true) {
if (target_file_size == 0) {
buffer_limit = compression_opts.max_dict_buffer_bytes;
} else if (compression_opts.max_dict_buffer_bytes == 0) {
buffer_limit = target_file_size;
} else {
buffer_limit =
std::min(target_file_size, compression_opts.max_dict_buffer_bytes);
}
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type));
}
@ -896,8 +905,8 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
r->first_key_in_next_block = &key;
Flush();
if (r->state == Rep::State::kBuffered && r->target_file_size != 0 &&
r->data_begin_offset > r->target_file_size) {
if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 &&
r->data_begin_offset > r->buffer_limit) {
EnterUnbuffered();
}
@ -997,23 +1006,28 @@ void BlockBasedTableBuilder::Flush() {
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
BlockHandle* handle,
bool is_data_block) {
WriteBlock(block->Finish(), handle, is_data_block);
block->Reset();
block->Finish();
std::string raw_block_contents;
block->SwapAndReset(raw_block_contents);
if (rep_->state == Rep::State::kBuffered) {
assert(is_data_block);
assert(!rep_->data_block_and_keys_buffers.empty());
rep_->data_block_and_keys_buffers.back().first =
std::move(raw_block_contents);
rep_->data_begin_offset +=
rep_->data_block_and_keys_buffers.back().first.size();
return;
}
WriteBlock(raw_block_contents, handle, is_data_block);
}
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockHandle* handle,
bool is_data_block) {
Rep* r = rep_;
assert(r->state == Rep::State::kUnbuffered);
Slice block_contents;
CompressionType type;
if (r->state == Rep::State::kBuffered) {
assert(is_data_block);
assert(!r->data_block_and_keys_buffers.empty());
r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
return;
}
Status compress_status;
CompressAndVerifyBlock(raw_block_contents, is_data_block,
*(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
@ -1629,14 +1643,37 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
? r->compression_opts.zstd_max_train_bytes
: r->compression_opts.max_dict_bytes;
// If buffer size is reasonable, we pre-generate a permutation to enforce
// uniqueness. This prevents wasting samples on duplicates, which is
// particularly likely when not many blocks were buffered.
std::vector<uint16_t> data_block_order;
size_t data_block_order_idx = 0;
if (r->data_block_and_keys_buffers.size() <= ((1 << 16) - 1)) {
data_block_order.resize(r->data_block_and_keys_buffers.size());
std::iota(data_block_order.begin(), data_block_order.end(),
static_cast<uint16_t>(0));
// We could be smarter and interleave the shuffling and sample appending
// logic. Then we could terminate as soon as `kSampleBytes` is reached,
// saving some shuffling computation.
RandomShuffle(data_block_order.begin(), data_block_order.end());
}
Random64 generator{r->creation_time};
std::string compression_dict_samples;
std::vector<size_t> compression_dict_sample_lens;
if (!r->data_block_and_keys_buffers.empty()) {
while (compression_dict_samples.size() < kSampleBytes) {
size_t rand_idx =
static_cast<size_t>(
generator.Uniform(r->data_block_and_keys_buffers.size()));
while ((data_block_order.empty() ||
data_block_order_idx < data_block_order.size()) &&
compression_dict_samples.size() < kSampleBytes) {
size_t rand_idx;
if (data_block_order.empty()) {
rand_idx = static_cast<size_t>(
generator.Uniform(r->data_block_and_keys_buffers.size()));
} else {
rand_idx = data_block_order[data_block_order_idx];
++data_block_order_idx;
}
size_t copy_len =
std::min(kSampleBytes - compression_dict_samples.size(),
r->data_block_and_keys_buffers[rand_idx].first.size());

View File

@ -117,8 +117,9 @@ class BlockBasedTableBuilder : public TableBuilder {
// REQUIRES: `rep_->state == kBuffered`
void EnterUnbuffered();
// Call block's Finish() method
// and then write the compressed block contents to file.
// Call block's Finish() method and then
// - in buffered mode, buffer the uncompressed block contents.
// - in unbuffered mode, write the compressed block contents to file.
void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block);
// Compress and write block content to the file.

View File

@ -235,7 +235,8 @@ Status SstFileDumper::ShowAllCompressionSizes(
const std::vector<std::pair<CompressionType, const char*>>&
compression_types,
int32_t compress_level_from, int32_t compress_level_to,
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes) {
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes,
uint64_t max_dict_buffer_bytes) {
fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size);
for (auto& i : compression_types) {
if (CompressionTypeSupported(i.first)) {
@ -243,6 +244,7 @@ Status SstFileDumper::ShowAllCompressionSizes(
CompressionOptions compress_opt;
compress_opt.max_dict_bytes = max_dict_bytes;
compress_opt.zstd_max_train_bytes = zstd_max_train_bytes;
compress_opt.max_dict_buffer_bytes = max_dict_buffer_bytes;
for (int32_t j = compress_level_from; j <= compress_level_to; j++) {
fprintf(stdout, "Compression level: %d", j);
compress_opt.level = j;

View File

@ -40,7 +40,8 @@ class SstFileDumper {
const std::vector<std::pair<CompressionType, const char*>>&
compression_types,
int32_t compress_level_from, int32_t compress_level_to,
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes);
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes,
uint64_t max_dict_buffer_bytes);
Status ShowCompressionSize(size_t block_size, CompressionType compress_type,
const CompressionOptions& compress_opt);

View File

@ -948,6 +948,10 @@ DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
DEFINE_int32(compression_parallel_threads, 1,
"Number of threads for parallel compression.");
DEFINE_uint64(compression_max_dict_buffer_bytes,
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_buffer_bytes,
"Maximum bytes to buffer to collect samples for dictionary.");
static bool ValidateTableCacheNumshardbits(const char* flagname,
int32_t value) {
if (0 >= value || value > 20) {
@ -4053,6 +4057,8 @@ class Benchmark {
FLAGS_compression_zstd_max_train_bytes;
options.compression_opts.parallel_threads =
FLAGS_compression_parallel_threads;
options.compression_opts.max_dict_buffer_bytes =
FLAGS_compression_max_dict_buffer_bytes;
// If this is a block based table, set some related options
auto table_options =
options.table_factory->GetOptions<BlockBasedTableOptions>();

View File

@ -50,6 +50,7 @@ default_params = {
# Disabled compression_parallel_threads as the feature is not stable
# lambda: random.choice([1] * 9 + [4])
"compression_parallel_threads": 1,
"compression_max_dict_buffer_bytes": lambda: 4096 * random.randint(0, 32),
"clear_column_family_one_in": 0,
"compact_files_one_in": 1000000,
"compact_range_one_in": 1000000,
@ -267,8 +268,10 @@ best_efforts_recovery_params = {
def finalize_and_sanitize(src_params):
dest_params = dict([(k, v() if callable(v) else v)
for (k, v) in src_params.items()])
if dest_params.get("compression_type") != "zstd" or \
dest_params.get("compression_max_dict_bytes") == 0:
if dest_params.get("compression_max_dict_bytes") == 0:
dest_params["compression_zstd_max_train_bytes"] = 0
dest_params["compression_max_dict_buffer_bytes"] = 0
if dest_params.get("compression_type") != "zstd":
dest_params["compression_zstd_max_train_bytes"] = 0
if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
dest_params["memtablerep"] = "skip_list"

View File

@ -103,6 +103,9 @@ void print_help(bool to_stderr) {
--compression_zstd_max_train_bytes=<uint32_t>
Maximum size of training data passed to zstd's dictionary trainer
--compression_max_dict_buffer_bytes=<int64_t>
Limit on buffer size from which we collect samples for dictionary generation.
)");
}
@ -166,6 +169,8 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_bytes;
uint32_t compression_zstd_max_train_bytes =
ROCKSDB_NAMESPACE::CompressionOptions().zstd_max_train_bytes;
uint64_t compression_max_dict_buffer_bytes =
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_buffer_bytes;
int64_t tmp_val;
@ -276,6 +281,17 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
return 1;
}
compression_zstd_max_train_bytes = static_cast<uint32_t>(tmp_val);
} else if (ParseIntArg(argv[i], "--compression_max_dict_buffer_bytes=",
"compression_max_dict_buffer_bytes must be numeric",
&tmp_val)) {
if (tmp_val < 0) {
fprintf(stderr,
"compression_max_dict_buffer_bytes must be positive: '%s'\n",
argv[i]);
print_help(/*to_stderr*/ true);
return 1;
}
compression_max_dict_buffer_bytes = static_cast<uint64_t>(tmp_val);
} else if (strcmp(argv[i], "--help") == 0) {
print_help(/*to_stderr*/ false);
return 0;
@ -404,7 +420,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
set_block_size ? block_size : 16384,
compression_types.empty() ? kCompressions : compression_types,
compress_level_from, compress_level_to, compression_max_dict_bytes,
compression_zstd_max_train_bytes);
compression_zstd_max_train_bytes, compression_max_dict_buffer_bytes);
if (!st.ok()) {
fprintf(stderr, "Failed to recompress: %s\n", st.ToString().c_str());
exit(1);

View File

@ -627,6 +627,9 @@ inline std::string CompressionOptionsToString(
result.append("enabled=")
.append(ToString(compression_options.enabled))
.append("; ");
result.append("max_dict_buffer_bytes=")
.append(ToString(compression_options.max_dict_buffer_bytes))
.append("; ");
return result;
}

View File

@ -279,9 +279,16 @@ bool StartsWith(const std::string& string, const std::string& pattern) {
#ifndef ROCKSDB_LITE
bool ParseBoolean(const std::string& type, const std::string& value) {
if (value == "true" || value == "1") {
const static std::string kTrue = "true", kFalse = "false";
if (value.compare(0 /* pos */, kTrue.size(), kTrue) == 0) {
return true;
} else if (value == "false" || value == "0") {
} else if (value.compare(0 /* pos */, kFalse.size(), kFalse) == 0) {
return false;
}
int num = ParseInt(value);
if (num == 1) {
return true;
} else if (num == 0) {
return false;
}
throw std::invalid_argument(type);