Reduce scope of compression dictionary to single SST (#4952)

Summary:
Our previous approach was to train one compression dictionary per compaction, using the first output SST to train a dictionary, and then applying it on subsequent SSTs in the same compaction. While this was great for minimizing CPU/memory/I/O overhead, it did not achieve good compression ratios in practice. In our most promising potential use case, moderate reductions in a dictionary's scope make a major difference on compression ratio.

So, this PR changes compression dictionary to be scoped per-SST. It accepts the tradeoff during table building to use more memory and CPU. Important changes include:

- The `BlockBasedTableBuilder` has a new state when dictionary compression is in-use: `kBuffered`. In that state it accumulates uncompressed data in-memory whenever `Add` is called.
- After accumulating target file size bytes or calling `BlockBasedTableBuilder::Finish`, a `BlockBasedTableBuilder` moves to the `kUnbuffered` state. The transition (`EnterUnbuffered()`) involves sampling the buffered data, training a dictionary, and compressing/writing out all buffered data. In the `kUnbuffered` state, a `BlockBasedTableBuilder` behaves the same as before -- blocks are compressed/written out as soon as they fill up.
- Samples are now whole uncompressed data blocks, except the final sample may be a partial data block so we don't breach the user's configured `max_dict_bytes` or `zstd_max_train_bytes`. The dictionary trainer is supposed to work better when we pass it real units of compression. Previously we were passing 64-byte KV samples which was not realistic.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4952

Differential Revision: D13967980

Pulled By: ajkr

fbshipit-source-id: 82bea6f7537e1529c7a1a4cdee84585f5949300f
This commit is contained in:
Andrew Kryczka 2019-02-11 19:42:25 -08:00 committed by Facebook Github Bot
parent 79496d71ed
commit 62f70f6d14
17 changed files with 332 additions and 230 deletions

View File

@ -10,6 +10,7 @@
* Add a place holder in manifest which indicate a record from future that can be safely ignored.
* Add support for trace sampling.
* Enable properties block checksum verification for block-based tables.
* For all users of dictionary compression, we now generate a separate dictionary for compressing each bottom-level SST file. Previously we reused a single dictionary for a whole compaction to bottom level. The new approach achieves better compression ratios; however, it uses more memory and CPU for buffering/sampling data blocks and training dictionaries.
### Public API Change
* Disallow CompactionFilter::IgnoreSnapshots() = false, because it is not very useful and the behavior is confusing. The filter will filter everything if there is no snapshot declared by the time the compaction starts. However, users can define a snapshot after the compaction starts and before it finishes and this new snapshot won't be repeatable, because after the compaction finishes, some keys may be dropped.

View File

@ -48,17 +48,18 @@ TableBuilder* NewTableBuilder(
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level,
const std::string* compression_dict, const bool skip_filters,
const uint64_t creation_time, const uint64_t oldest_key_time) {
const bool skip_filters, const uint64_t creation_time,
const uint64_t oldest_key_time, const bool is_bottommost_level,
const uint64_t target_file_size) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, compression_dict, skip_filters,
column_family_name, level, creation_time,
oldest_key_time),
compression_opts, skip_filters, column_family_name,
level, creation_time, oldest_key_time,
is_bottommost_level, target_file_size),
column_family_id, file);
}
@ -128,8 +129,7 @@ Status BuildTable(
ioptions, mutable_cf_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id,
column_family_name, file_writer.get(), compression, compression_opts,
level, nullptr /* compression_dict */, false /* skip_filters */,
creation_time, oldest_key_time);
level, false /* skip_filters */, creation_time, oldest_key_time);
}
MergeHelper merge(env, internal_comparator.user_comparator(),

View File

@ -40,8 +40,6 @@ class InternalStats;
// @param column_family_name Name of the column family that is also identified
// by column_family_id, or empty string if unknown. It must outlive the
// TableBuilder returned by this function.
// @param compression_dict Data for presetting the compression library's
// dictionary, or nullptr.
TableBuilder* NewTableBuilder(
const ImmutableCFOptions& options, const MutableCFOptions& moptions,
const InternalKeyComparator& internal_comparator,
@ -50,9 +48,9 @@ TableBuilder* NewTableBuilder(
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level,
const std::string* compression_dict = nullptr,
const bool skip_filters = false, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = 0);
const uint64_t oldest_key_time = 0, const bool is_bottommost_level = false,
const uint64_t target_file_size = 0);
// Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of

View File

@ -157,7 +157,6 @@ struct CompactionJob::SubcompactionState {
uint64_t overlapped_bytes = 0;
// A flag determine whether the key has been seen in ShouldStopBefore()
bool seen_key = false;
std::string compression_dict;
SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
uint64_t size = 0)
@ -173,8 +172,7 @@ struct CompactionJob::SubcompactionState {
approx_size(size),
grandparent_index(0),
overlapped_bytes(0),
seen_key(false),
compression_dict() {
seen_key(false) {
assert(compaction != nullptr);
}
@ -197,7 +195,6 @@ struct CompactionJob::SubcompactionState {
grandparent_index = std::move(o.grandparent_index);
overlapped_bytes = std::move(o.overlapped_bytes);
seen_key = std::move(o.seen_key);
compression_dict = std::move(o.compression_dict);
return *this;
}
@ -865,42 +862,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
const MutableCFOptions* mutable_cf_options =
sub_compact->compaction->mutable_cf_options();
// To build compression dictionary, we sample the first output file, assuming
// it'll reach the maximum length. We optionally pass these samples through
// zstd's dictionary trainer, or just use them directly. Then, the dictionary
// is used for compressing subsequent output files in the same subcompaction.
const bool kUseZstdTrainer =
sub_compact->compaction->output_compression_opts().zstd_max_train_bytes >
0;
const size_t kSampleBytes =
kUseZstdTrainer
? sub_compact->compaction->output_compression_opts()
.zstd_max_train_bytes
: sub_compact->compaction->output_compression_opts().max_dict_bytes;
const int kSampleLenShift = 6; // 2^6 = 64-byte samples
std::set<size_t> sample_begin_offsets;
if (bottommost_level_ && kSampleBytes > 0) {
const size_t kMaxSamples = kSampleBytes >> kSampleLenShift;
const size_t kOutFileLen =
static_cast<size_t>(MaxFileSizeForLevel(*mutable_cf_options,
compact_->compaction->output_level(),
cfd->ioptions()->compaction_style,
compact_->compaction->GetInputBaseLevel(),
cfd->ioptions()->level_compaction_dynamic_level_bytes));
if (kOutFileLen != port::kMaxSizet) {
const size_t kOutFileNumSamples = kOutFileLen >> kSampleLenShift;
Random64 generator{versions_->NewFileNumber()};
for (size_t i = 0; i < kMaxSamples; ++i) {
sample_begin_offsets.insert(
static_cast<size_t>(generator.Uniform(kOutFileNumSamples))
<< kSampleLenShift);
}
}
}
MergeHelper merge(
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
compaction_filter, db_options_.info_log.get(),
@ -938,12 +899,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->current_output_file_size);
}
const auto& c_iter_stats = c_iter->iter_stats();
auto sample_begin_offset_iter = sample_begin_offsets.cbegin();
// data_begin_offset and dict_sample_data are only valid while generating
// dictionary from the first output file.
size_t data_begin_offset = 0;
std::string dict_sample_data;
dict_sample_data.reserve(kSampleBytes);
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
@ -979,55 +934,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
key, c_iter->ikey().sequence);
sub_compact->num_output_records++;
if (sub_compact->outputs.size() == 1) { // first output file
// Check if this key/value overlaps any sample intervals; if so, appends
// overlapping portions to the dictionary.
for (const auto& data_elmt : {key, value}) {
size_t data_end_offset = data_begin_offset + data_elmt.size();
while (sample_begin_offset_iter != sample_begin_offsets.cend() &&
*sample_begin_offset_iter < data_end_offset) {
size_t sample_end_offset =
*sample_begin_offset_iter + (1 << kSampleLenShift);
// Invariant: Because we advance sample iterator while processing the
// data_elmt containing the sample's last byte, the current sample
// cannot end before the current data_elmt.
assert(data_begin_offset < sample_end_offset);
size_t data_elmt_copy_offset, data_elmt_copy_len;
if (*sample_begin_offset_iter <= data_begin_offset) {
// The sample starts before data_elmt starts, so take bytes starting
// at the beginning of data_elmt.
data_elmt_copy_offset = 0;
} else {
// data_elmt starts before the sample starts, so take bytes starting
// at the below offset into data_elmt.
data_elmt_copy_offset =
*sample_begin_offset_iter - data_begin_offset;
}
if (sample_end_offset <= data_end_offset) {
// The sample ends before data_elmt ends, so take as many bytes as
// needed.
data_elmt_copy_len =
sample_end_offset - (data_begin_offset + data_elmt_copy_offset);
} else {
// data_elmt ends before the sample ends, so take all remaining
// bytes in data_elmt.
data_elmt_copy_len =
data_end_offset - (data_begin_offset + data_elmt_copy_offset);
}
dict_sample_data.append(&data_elmt.data()[data_elmt_copy_offset],
data_elmt_copy_len);
if (sample_end_offset > data_end_offset) {
// Didn't finish sample. Try to finish it with the next data_elmt.
break;
}
// Next sample may require bytes from same data_elmt.
sample_begin_offset_iter++;
}
data_begin_offset = data_end_offset;
}
}
// Close output file if it is big enough. Two possibilities determine it's
// time to close it: (1) the current key should be this file's last key, (2)
// the next key should not be in this file.
@ -1069,18 +975,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&range_del_out_stats, next_key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
if (sub_compact->outputs.size() == 1) {
// Use samples from first output file to create dictionary for
// compression of subsequent files.
if (kUseZstdTrainer) {
sub_compact->compression_dict = ZSTD_TrainDictionary(
dict_sample_data, kSampleLenShift,
sub_compact->compaction->output_compression_opts()
.max_dict_bytes);
} else {
sub_compact->compression_dict = std::move(dict_sample_data);
}
}
}
}
@ -1606,8 +1500,9 @@ Status CompactionJob::OpenCompactionOutputFile(
cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
sub_compact->compaction->output_compression(),
sub_compact->compaction->output_compression_opts(),
sub_compact->compaction->output_level(), &sub_compact->compression_dict,
skip_filters, output_file_creation_time));
sub_compact->compaction->output_level(), skip_filters,
output_file_creation_time, 0 /* oldest_key_time */, bottommost_level_,
sub_compact->compaction->max_output_file_size()));
LogFlush(db_options_.info_log);
return s;
}

View File

@ -633,7 +633,7 @@ TEST_F(DBBlockCacheTest, CompressedCache) {
TEST_F(DBBlockCacheTest, CacheCompressionDict) {
const int kNumFiles = 4;
const int kNumEntriesPerFile = 32;
const int kNumEntriesPerFile = 128;
const int kNumBytesPerEntry = 1024;
// Try all the available libraries that support dictionary compression

View File

@ -1039,17 +1039,17 @@ TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
}
TEST_F(DBTest2, PresetCompressionDict) {
// Verifies that compression ratio improves when dictionary is enabled, and
// improves even further when the dictionary is trained by ZSTD.
const size_t kBlockSizeBytes = 4 << 10;
const size_t kL0FileBytes = 128 << 10;
const size_t kApproxPerBlockOverheadBytes = 50;
const int kNumL0Files = 5;
const int kZstdTrainFactor = 16;
Options options;
options.env = CurrentOptions().env; // Make sure to use any custom env that the test is configured with.
options.allow_concurrent_memtable_write = false;
options.arena_block_size = kBlockSizeBytes;
options.compaction_style = kCompactionStyleUniversal;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
@ -1091,16 +1091,15 @@ TEST_F(DBTest2, PresetCompressionDict) {
options.compression_opts.zstd_max_train_bytes = 0;
break;
case 1:
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.max_dict_bytes = 4 * kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = 0;
break;
case 2:
if (compression_type != kZSTD) {
continue;
}
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes =
kZstdTrainFactor * kBlockSizeBytes;
options.compression_opts.max_dict_bytes = 4 * kBlockSizeBytes;
options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
break;
default:
assert(false);
@ -1110,20 +1109,24 @@ TEST_F(DBTest2, PresetCompressionDict) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
std::string seq_data =
RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);
std::string seq_datas[10];
for (int j = 0; j < 10; ++j) {
seq_datas[j] =
RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);
}
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
for (int j = 0; j < kNumL0Files; ++j) {
for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
ASSERT_OK(Put(1, Key(static_cast<int>(
j * (kL0FileBytes / kBlockSizeBytes) + k)),
seq_data));
auto key_num = j * (kL0FileBytes / kBlockSizeBytes) + k;
ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
seq_datas[(key_num / 10) % 10]));
}
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
}
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
true /* disallow_trivial_move */);
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
@ -1138,7 +1141,7 @@ TEST_F(DBTest2, PresetCompressionDict) {
for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
j++) {
ASSERT_EQ(seq_data, Get(1, Key(static_cast<int>(j))));
ASSERT_EQ(seq_datas[(j / 10) % 10], Get(1, Key(static_cast<int>(j))));
}
if (i) {
ASSERT_GT(prev_out_bytes, out_bytes);
@ -1149,6 +1152,70 @@ TEST_F(DBTest2, PresetCompressionDict) {
}
}
TEST_F(DBTest2, PresetCompressionDictLocality) {
if (!ZSTD_Supported()) {
return;
}
// Verifies that compression dictionary is generated from local data. The
// verification simply checks all output SSTs have different compression
// dictionaries. We do not verify effectiveness as that'd likely be flaky in
// the future.
const int kNumEntriesPerFile = 1 << 10; // 1KB
const int kNumBytesPerEntry = 1 << 10; // 1KB
const int kNumFiles = 4;
Options options = CurrentOptions();
options.compression = kZSTD;
options.compression_opts.max_dict_bytes = 1 << 14; // 16KB
options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
options.statistics = rocksdb::CreateDBStatistics();
options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry;
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
Reopen(options);
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumEntriesPerFile; ++j) {
ASSERT_OK(Put(Key(i * kNumEntriesPerFile + j),
RandomString(&rnd, kNumBytesPerEntry)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_EQ(NumTableFilesAtLevel(1), i + 1);
}
// Store all the dictionaries generated during a full compaction.
std::vector<std::string> compression_dicts;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
[&](void* arg) {
compression_dicts.emplace_back(static_cast<Slice*>(arg)->ToString());
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
CompactRangeOptions compact_range_opts;
compact_range_opts.bottommost_level_compaction =
BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
// Dictionary compression should not be so good as to compress four totally
// random files into one. If it does then there's probably something wrong
// with the test.
ASSERT_GT(NumTableFilesAtLevel(1), 1);
// Furthermore, there should be one compression dictionary generated per file.
// And they should all be different from each other.
ASSERT_EQ(NumTableFilesAtLevel(1),
static_cast<int>(compression_dicts.size()));
for (size_t i = 1; i < compression_dicts.size(); ++i) {
std::string& a = compression_dicts[i - 1];
std::string& b = compression_dicts[i];
size_t alen = a.size();
size_t blen = b.size();
ASSERT_TRUE(alen != blen || memcmp(a.data(), b.data(), alen) != 0);
}
}
class CompactionCompressionListener : public EventListener {
public:
explicit CompactionCompressionListener(Options* db_options)

View File

@ -254,6 +254,13 @@ struct BlockBasedTableBuilder::Rep {
Status status;
size_t alignment;
BlockBuilder data_block;
// Buffers uncompressed data blocks and keys to replay later. Needed when
// compression dictionary is enabled so we can finalize the dictionary before
// compressing any data blocks.
// TODO(ajkr): ideally we don't buffer all keys and all uncompressed data
// blocks as it's redundant, but it's easier to implement for now.
std::vector<std::pair<std::string, std::vector<std::string>>>
data_block_and_keys_buffers;
BlockBuilder range_del_block;
InternalKeySliceTransform internal_prefix_transform;
@ -263,13 +270,40 @@ struct BlockBasedTableBuilder::Rep {
std::string last_key;
CompressionType compression_type;
CompressionOptions compression_opts;
CompressionDict compression_dict;
std::unique_ptr<CompressionDict> compression_dict;
CompressionContext compression_ctx;
std::unique_ptr<UncompressionContext> verify_ctx;
UncompressionDict verify_dict;
std::unique_ptr<UncompressionDict> verify_dict;
size_t data_begin_offset = 0;
TableProperties props;
bool closed = false; // Either Finish() or Abandon() has been called.
// States of the builder.
//
// - `kBuffered`: This is the initial state where zero or more data blocks are
// accumulated uncompressed in-memory. From this state, call
// `EnterUnbuffered()` to finalize the compression dictionary if enabled,
// compress/write out any buffered blocks, and proceed to the `kUnbuffered`
// state.
//
// - `kUnbuffered`: This is the state when compression dictionary is finalized
// either because it wasn't enabled in the first place or it's been created
// from sampling previously buffered data. In this state, blocks are simply
// compressed/written out as they fill up. From this state, call `Finish()`
// to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
// the partially created file.
//
// - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
// called, so the table builder is no longer usable. We must be in this
// state by the time the destructor runs.
enum class State {
kBuffered,
kUnbuffered,
kClosed,
};
State state;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
@ -283,6 +317,8 @@ struct BlockBasedTableBuilder::Rep {
const std::string& column_family_name;
uint64_t creation_time = 0;
uint64_t oldest_key_time = 0;
const bool is_bottommost_level;
const uint64_t target_file_size;
std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
@ -293,10 +329,10 @@ struct BlockBasedTableBuilder::Rep {
int_tbl_prop_collector_factories,
uint32_t _column_family_id, WritableFileWriter* f,
const CompressionType _compression_type,
const CompressionOptions& _compression_opts,
const std::string* _compression_dict, const bool skip_filters,
const CompressionOptions& _compression_opts, const bool skip_filters,
const std::string& _column_family_name, const uint64_t _creation_time,
const uint64_t _oldest_key_time)
const uint64_t _oldest_key_time, const bool _is_bottommost_level,
const uint64_t _target_file_size)
: ioptions(_ioptions),
moptions(_moptions),
table_options(table_opt),
@ -317,14 +353,12 @@ struct BlockBasedTableBuilder::Rep {
internal_prefix_transform(_moptions.prefix_extractor.get()),
compression_type(_compression_type),
compression_opts(_compression_opts),
compression_dict(
_compression_dict == nullptr ? Slice() : Slice(*_compression_dict),
_compression_type, _compression_opts.level),
compression_dict(),
compression_ctx(_compression_type),
verify_dict(
_compression_dict == nullptr ? std::string() : *_compression_dict,
_compression_type == kZSTD ||
_compression_type == kZSTDNotFinalCompression),
verify_dict(),
state((_is_bottommost_level && _compression_opts.max_dict_bytes > 0)
? State::kBuffered
: State::kUnbuffered),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align),
compressed_cache_key_prefix_size(0),
@ -334,7 +368,9 @@ struct BlockBasedTableBuilder::Rep {
column_family_id(_column_family_id),
column_family_name(_column_family_name),
creation_time(_creation_time),
oldest_key_time(_oldest_key_time) {
oldest_key_time(_oldest_key_time),
is_bottommost_level(_is_bottommost_level),
target_file_size(_target_file_size) {
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
@ -383,10 +419,10 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const std::string* compression_dict, const bool skip_filters,
const CompressionOptions& compression_opts, const bool skip_filters,
const std::string& column_family_name, const uint64_t creation_time,
const uint64_t oldest_key_time) {
const uint64_t oldest_key_time, const bool is_bottommost_level,
const uint64_t target_file_size) {
BlockBasedTableOptions sanitized_table_options(table_options);
if (sanitized_table_options.format_version == 0 &&
sanitized_table_options.checksum != kCRC32c) {
@ -399,11 +435,11 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
sanitized_table_options.format_version = 1;
}
rep_ =
new Rep(ioptions, moptions, sanitized_table_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id, file,
compression_type, compression_opts, compression_dict,
skip_filters, column_family_name, creation_time, oldest_key_time);
rep_ = new Rep(ioptions, moptions, sanitized_table_options,
internal_comparator, int_tbl_prop_collector_factories,
column_family_id, file, compression_type, compression_opts,
skip_filters, column_family_name, creation_time,
oldest_key_time, is_bottommost_level, target_file_size);
if (rep_->filter_builder != nullptr) {
rep_->filter_builder->StartBlock(0);
@ -417,13 +453,14 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
}
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
assert(rep_->closed); // Catch errors where caller forgot to call Finish()
// Catch errors where caller forgot to call Finish()
assert(rep_->state == Rep::State::kClosed);
delete rep_;
}
void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
assert(rep_->state != Rep::State::kClosed);
if (!ok()) return;
ValueType value_type = ExtractValueType(key);
if (IsValueType(value_type)) {
@ -438,6 +475,11 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
assert(!r->data_block.empty());
Flush();
if (r->state == Rep::State::kBuffered &&
r->data_begin_offset > r->target_file_size) {
EnterUnbuffered();
}
// Add item to index block.
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
@ -446,20 +488,29 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
if (ok()) {
if (ok() && r->state == Rep::State::kUnbuffered) {
r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
}
}
// Note: PartitionedFilterBlockBuilder requires key being added to filter
// builder after being added to index builder.
if (r->filter_builder != nullptr) {
if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
r->filter_builder->Add(ExtractUserKey(key));
}
r->last_key.assign(key.data(), key.size());
r->data_block.Add(key, value);
r->index_builder->OnKeyAdded(key);
if (r->state == Rep::State::kBuffered) {
// Buffer keys to be replayed during `Finish()` once compression
// dictionary has been finalized.
if (r->data_block_and_keys_buffers.empty() || should_flush) {
r->data_block_and_keys_buffers.emplace_back();
}
r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
} else {
r->index_builder->OnKeyAdded(key);
}
NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
r->table_properties_collectors,
r->ioptions.info_log);
@ -488,15 +539,10 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
void BlockBasedTableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
assert(rep_->state != Rep::State::kClosed);
if (!ok()) return;
if (r->data_block.empty()) return;
WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
if (r->filter_builder != nullptr) {
r->filter_builder->StartBlock(r->offset);
}
r->props.data_size = r->offset;
++r->props.num_data_blocks;
}
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
@ -523,11 +569,24 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
StopWatchNano timer(r->ioptions.env,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
if (r->state == Rep::State::kBuffered) {
assert(is_data_block);
assert(!r->data_block_and_keys_buffers.empty());
r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
return;
}
if (raw_block_contents.size() < kCompressionSizeLimit) {
CompressionInfo compression_info(
r->compression_opts, r->compression_ctx,
is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(),
r->compression_type);
const CompressionDict* compression_dict;
if (!is_data_block || r->compression_dict == nullptr) {
compression_dict = &CompressionDict::GetEmptyDict();
} else {
compression_dict = r->compression_dict.get();
}
assert(compression_dict != nullptr);
CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
*compression_dict, r->compression_type);
block_contents =
CompressBlock(raw_block_contents, compression_info, &type,
r->table_options.format_version, &r->compressed_output);
@ -537,11 +596,16 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
// compressed data and compare to the input.
if (type != kNoCompression && r->table_options.verify_compression) {
// Retrieve the uncompressed contents into a new buffer
const UncompressionDict* verify_dict;
if (!is_data_block || r->verify_dict == nullptr) {
verify_dict = &UncompressionDict::GetEmptyDict();
} else {
verify_dict = r->verify_dict.get();
}
assert(verify_dict != nullptr);
BlockContents contents;
UncompressionInfo uncompression_info(
*r->verify_ctx,
is_data_block ? r->verify_dict : UncompressionDict::GetEmptyDict(),
r->compression_type);
UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
r->compression_type);
Status stat = UncompressBlockContentsForCompressionType(
uncompression_info, block_contents.data(), block_contents.size(),
&contents, r->table_options.format_version, r->ioptions);
@ -587,6 +651,13 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
WriteRawBlock(block_contents, type, handle, is_data_block);
r->compressed_output.clear();
if (is_data_block) {
if (r->filter_builder != nullptr) {
r->filter_builder->StartBlock(r->offset);
}
r->props.data_size = r->offset;
++r->props.num_data_blocks;
}
}
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
@ -873,11 +944,18 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
void BlockBasedTableBuilder::WriteCompressionDictBlock(
MetaIndexBuilder* meta_index_builder) {
if (rep_->compression_dict.GetRawDict().size()) {
if (rep_->compression_dict != nullptr &&
rep_->compression_dict->GetRawDict().size()) {
BlockHandle compression_dict_block_handle;
if (ok()) {
WriteRawBlock(rep_->compression_dict.GetRawDict(), kNoCompression,
WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
&compression_dict_block_handle);
#ifndef NDEBUG
Slice compression_dict = rep_->compression_dict->GetRawDict();
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
&compression_dict);
#endif // NDEBUG
}
if (ok()) {
meta_index_builder->Add(kCompressionDictBlock,
@ -925,13 +1003,77 @@ void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
}
}
void BlockBasedTableBuilder::EnterUnbuffered() {
Rep* r = rep_;
assert(r->state == Rep::State::kBuffered);
r->state = Rep::State::kUnbuffered;
const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
? r->compression_opts.zstd_max_train_bytes
: r->compression_opts.max_dict_bytes;
Random64 generator{r->creation_time};
std::string compression_dict_samples;
std::vector<size_t> compression_dict_sample_lens;
if (!r->data_block_and_keys_buffers.empty()) {
while (compression_dict_samples.size() < kSampleBytes) {
size_t rand_idx =
generator.Uniform(r->data_block_and_keys_buffers.size());
size_t copy_len =
std::min(kSampleBytes - compression_dict_samples.size(),
r->data_block_and_keys_buffers[rand_idx].first.size());
compression_dict_samples.append(
r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len);
compression_dict_sample_lens.emplace_back(copy_len);
}
}
// final data block flushed, now we can generate dictionary from the samples.
// OK if compression_dict_samples is empty, we'll just get empty dictionary.
std::string dict;
if (r->compression_opts.zstd_max_train_bytes > 0) {
dict = ZSTD_TrainDictionary(compression_dict_samples,
compression_dict_sample_lens,
r->compression_opts.max_dict_bytes);
} else {
dict = std::move(compression_dict_samples);
}
r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
r->compression_opts.level));
r->verify_dict.reset(new UncompressionDict(
dict, r->compression_type == kZSTD ||
r->compression_type == kZSTDNotFinalCompression));
for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
const auto& data_block = r->data_block_and_keys_buffers[i].first;
auto& keys = r->data_block_and_keys_buffers[i].second;
assert(!data_block.empty());
assert(!keys.empty());
for (const auto& key : keys) {
if (r->filter_builder != nullptr) {
r->filter_builder->Add(ExtractUserKey(key));
}
r->index_builder->OnKeyAdded(key);
}
WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
Slice first_key_in_next_block =
r->data_block_and_keys_buffers[i + 1].second.front();
Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
r->pending_handle);
}
}
r->data_block_and_keys_buffers.clear();
}
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
assert(r->state != Rep::State::kClosed);
bool empty_data_block = r->data_block.empty();
Flush();
assert(!r->closed);
r->closed = true;
if (r->state == Rep::State::kBuffered) {
EnterUnbuffered();
}
// To make sure properties block is able to keep the accurate size of index
// block, we will finish writing all index entries first.
if (ok() && !empty_data_block) {
@ -962,23 +1104,20 @@ Status BlockBasedTableBuilder::Finish() {
if (ok()) {
WriteFooter(metaindex_block_handle, index_block_handle);
}
r->state = Rep::State::kClosed;
return r->status;
}
void BlockBasedTableBuilder::Abandon() {
Rep* r = rep_;
assert(!r->closed);
r->closed = true;
assert(rep_->state != Rep::State::kClosed);
rep_->state = Rep::State::kClosed;
}
uint64_t BlockBasedTableBuilder::NumEntries() const {
return rep_->props.num_entries;
}
uint64_t BlockBasedTableBuilder::FileSize() const {
return rep_->offset;
}
uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
bool BlockBasedTableBuilder::NeedCompact() const {
for (const auto& collector : rep_->table_properties_collectors) {

View File

@ -37,8 +37,6 @@ class BlockBasedTableBuilder : public TableBuilder {
// Create a builder that will store the contents of the table it is
// building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish().
// @param compression_dict Data for presetting the compression library's
// dictionary, or nullptr.
BlockBasedTableBuilder(
const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
const BlockBasedTableOptions& table_options,
@ -47,10 +45,11 @@ class BlockBasedTableBuilder : public TableBuilder {
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const std::string* compression_dict, const bool skip_filters,
const CompressionOptions& compression_opts, const bool skip_filters,
const std::string& column_family_name, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = 0);
const uint64_t oldest_key_time = 0,
const bool is_bottommost_level = false,
const uint64_t target_file_size = 0);
// REQUIRES: Either Finish() or Abandon() has been called.
~BlockBasedTableBuilder();
@ -94,6 +93,11 @@ class BlockBasedTableBuilder : public TableBuilder {
private:
bool ok() const { return status().ok(); }
// Transition state from buffered to unbuffered. See `Rep::State` API comment
// for details of the states.
// REQUIRES: `rep_->state == kBuffered`
void EnterUnbuffered();
// Call block's Finish() method
// and then write the compressed block contents to file.
void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block);

View File

@ -215,11 +215,12 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder(
table_builder_options.int_tbl_prop_collector_factories, column_family_id,
file, table_builder_options.compression_type,
table_builder_options.compression_opts,
table_builder_options.compression_dict,
table_builder_options.skip_filters,
table_builder_options.column_family_name,
table_builder_options.creation_time,
table_builder_options.oldest_key_time);
table_builder_options.oldest_key_time,
table_builder_options.is_bottommost_level,
table_builder_options.target_file_size);
return table_builder;
}

View File

@ -559,7 +559,6 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2,
TableBuilderOptions(ioptions, moptions, internal_comparator,
&int_tbl_prop_collector_factories,
options.compression, CompressionOptions(),
nullptr /* compression_dict */,
false /* skip_filters */, column_family_name, level_),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));

View File

@ -235,8 +235,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
TableBuilderOptions table_builder_options(
r->ioptions, r->mutable_cf_options, r->internal_comparator,
&int_tbl_prop_collector_factories, compression_type, compression_opts,
nullptr /* compression_dict */, r->skip_filters, r->column_family_name,
unknown_level);
r->skip_filters, r->column_family_name, unknown_level);
r->file_writer.reset(new WritableFileWriter(
std::move(sst_file), file_path, r->env_options, r->ioptions.env,
nullptr /* stats */, r->ioptions.listeners));

View File

@ -74,22 +74,23 @@ struct TableBuilderOptions {
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
_int_tbl_prop_collector_factories,
CompressionType _compression_type,
const CompressionOptions& _compression_opts,
const std::string* _compression_dict, bool _skip_filters,
const CompressionOptions& _compression_opts, bool _skip_filters,
const std::string& _column_family_name, int _level,
const uint64_t _creation_time = 0, const int64_t _oldest_key_time = 0)
const uint64_t _creation_time = 0, const int64_t _oldest_key_time = 0,
bool _is_bottommost_level = false, const uint64_t _target_file_size = 0)
: ioptions(_ioptions),
moptions(_moptions),
internal_comparator(_internal_comparator),
int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories),
compression_type(_compression_type),
compression_opts(_compression_opts),
compression_dict(_compression_dict),
skip_filters(_skip_filters),
column_family_name(_column_family_name),
level(_level),
creation_time(_creation_time),
oldest_key_time(_oldest_key_time) {}
oldest_key_time(_oldest_key_time),
is_bottommost_level(_is_bottommost_level),
target_file_size(_target_file_size) {}
const ImmutableCFOptions& ioptions;
const MutableCFOptions& moptions;
const InternalKeyComparator& internal_comparator;
@ -97,13 +98,13 @@ struct TableBuilderOptions {
int_tbl_prop_collector_factories;
CompressionType compression_type;
const CompressionOptions& compression_opts;
// Data for presetting the compression library's dictionary, or nullptr.
const std::string* compression_dict;
bool skip_filters; // only used by BlockBasedTableBuilder
const std::string& column_family_name;
int level; // what level this table/file is on, -1 for "not set, don't know"
const uint64_t creation_time;
const int64_t oldest_key_time;
const bool is_bottommost_level;
const uint64_t target_file_size;
};
// TableBuilder provides the interface used to build a Table

View File

@ -101,8 +101,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
TableBuilderOptions(
ioptions, moptions, ikc, &int_tbl_prop_collector_factories,
CompressionType::kNoCompression, CompressionOptions(),
nullptr /* compression_dict */, false /* skip_filters */,
kDefaultColumnFamilyName, unknown_level),
false /* skip_filters */, kDefaultColumnFamilyName, unknown_level),
0 /* column_family_id */, file_writer.get());
} else {
s = DB::Open(opts, dbname, &db);

View File

@ -329,11 +329,11 @@ class TableConstructor: public Constructor {
int_tbl_prop_collector_factories;
std::string column_family_name;
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(
ioptions, moptions, internal_comparator,
&int_tbl_prop_collector_factories, options.compression,
CompressionOptions(), nullptr /* compression_dict */,
false /* skip_filters */, column_family_name, level_),
TableBuilderOptions(ioptions, moptions, internal_comparator,
&int_tbl_prop_collector_factories,
options.compression, CompressionOptions(),
false /* skip_filters */, column_family_name,
level_),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer_.get()));
@ -2640,10 +2640,10 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
std::string column_family_name;
int unknown_level = -1;
std::unique_ptr<TableBuilder> builder(factory.NewTableBuilder(
TableBuilderOptions(
ioptions, moptions, ikc, &int_tbl_prop_collector_factories,
kNoCompression, CompressionOptions(), nullptr /* compression_dict */,
false /* skip_filters */, column_family_name, unknown_level),
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), false /* skip_filters */,
column_family_name, unknown_level),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));
@ -3269,8 +3269,8 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), nullptr /* compression_dict */,
false /* skip_filters */, column_family_name, -1),
CompressionOptions(), false /* skip_filters */,
column_family_name, -1),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));
@ -3449,8 +3449,8 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), nullptr /* compression_dict */,
false /* skip_filters */, column_family_name, -1),
CompressionOptions(), false /* skip_filters */,
column_family_name, -1),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));
@ -3542,8 +3542,8 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), nullptr /* compression_dict */,
false /* skip_filters */, column_family_name, -1),
CompressionOptions(), false /* skip_filters */,
column_family_name, -1),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));

View File

@ -60,8 +60,7 @@ void createSST(const Options& opts, const std::string& file_name) {
TableBuilderOptions(
imoptions, moptions, ikc, &int_tbl_prop_collector_factories,
CompressionType::kNoCompression, CompressionOptions(),
nullptr /* compression_dict */, false /* skip_filters */,
column_family_name, unknown_level),
false /* skip_filters */, column_family_name, unknown_level),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));

View File

@ -214,10 +214,10 @@ int SstFileDumper::ShowAllCompressionSizes(
CompressionOptions compress_opt;
std::string column_family_name;
int unknown_level = -1;
TableBuilderOptions tb_opts(
imoptions, moptions, ikc, &block_based_table_factories, i.first,
compress_opt, nullptr /* compression_dict */,
false /* skip_filters */, column_family_name, unknown_level);
TableBuilderOptions tb_opts(imoptions, moptions, ikc,
&block_based_table_factories, i.first,
compress_opt, false /* skip_filters */,
column_family_name, unknown_level);
uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size);
fprintf(stdout, "Compression: %s", i.second);
fprintf(stdout, " Size: %" PRIu64 "\n", file_size);

View File

@ -158,13 +158,13 @@ struct CompressionDict {
#if ZSTD_VERSION_NUMBER >= 700
ZSTD_CDict* zstd_cdict_ = nullptr;
#endif // ZSTD_VERSION_NUMBER >= 700
Slice dict_;
std::string dict_;
public:
#if ZSTD_VERSION_NUMBER >= 700
CompressionDict(Slice dict, CompressionType type, int level) {
CompressionDict(std::string dict, CompressionType type, int level) {
#else // ZSTD_VERSION_NUMBER >= 700
CompressionDict(Slice dict, CompressionType /*type*/, int /*level*/) {
CompressionDict(std::string dict, CompressionType /*type*/, int /*level*/) {
#endif // ZSTD_VERSION_NUMBER >= 700
dict_ = std::move(dict);
#if ZSTD_VERSION_NUMBER >= 700