cc23b46da1
Summary:
An untrained dictionary is currently simply the concatenation of several samples. The ZSTD API, ZDICT_finalizeDictionary(), can improve such a dictionary's effectiveness at low cost. This PR changes how dictionary is created by calling the ZSTD ZDICT_finalizeDictionary() API instead of creating raw content dictionary (when max_dict_buffer_bytes > 0), and pass in all buffered uncompressed data blocks as samples.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9857
Test Plan:
#### db_bench test for cpu/memory of compression+decompression and space saving on synthetic data:
Set up: change the parameter [here](fb9a167a55/tools/db_bench_tool.cc (L1766)
) to 16384 to make synthetic data more compressible.
```
# linked local ZSTD with version 1.5.2
# DEBUG_LEVEL=0 ROCKSDB_NO_FBCODE=1 ROCKSDB_DISABLE_ZSTD=1 EXTRA_CXXFLAGS="-DZSTD_STATIC_LINKING_ONLY -DZSTD -I/data/users/changyubi/install/include/" EXTRA_LDFLAGS="-L/data/users/changyubi/install/lib/ -l:libzstd.a" make -j32 db_bench
dict_bytes=16384
train_bytes=1048576
echo "========== No Dictionary =========="
TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=filluniquerandom,compact -num=10000000 -compression_type=zstd -compression_max_dict_bytes=0 -block_size=4096 -max_background_jobs=24 -memtablerep=vector -allow_concurrent_memtable_write=false -disable_wal=true -max_write_buffer_number=8 >/dev/null 2>&1
TEST_TMPDIR=/dev/shm /usr/bin/time ./db_bench -use_existing_db=true -benchmarks=compact -compression_type=zstd -compression_max_dict_bytes=0 -block_size=4096 2>&1 | grep elapsed
du -hc /dev/shm/dbbench/*sst | grep total
echo "========== Raw Content Dictionary =========="
TEST_TMPDIR=/dev/shm ./db_bench_main -benchmarks=filluniquerandom,compact -num=10000000 -compression_type=zstd -compression_max_dict_bytes=$dict_bytes -block_size=4096 -max_background_jobs=24 -memtablerep=vector -allow_concurrent_memtable_write=false -disable_wal=true -max_write_buffer_number=8 >/dev/null 2>&1
TEST_TMPDIR=/dev/shm /usr/bin/time ./db_bench_main -use_existing_db=true -benchmarks=compact -compression_type=zstd -compression_max_dict_bytes=$dict_bytes -block_size=4096 2>&1 | grep elapsed
du -hc /dev/shm/dbbench/*sst | grep total
echo "========== FinalizeDictionary =========="
TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=filluniquerandom,compact -num=10000000 -compression_type=zstd -compression_max_dict_bytes=$dict_bytes -compression_zstd_max_train_bytes=$train_bytes -compression_use_zstd_dict_trainer=false -block_size=4096 -max_background_jobs=24 -memtablerep=vector -allow_concurrent_memtable_write=false -disable_wal=true -max_write_buffer_number=8 >/dev/null 2>&1
TEST_TMPDIR=/dev/shm /usr/bin/time ./db_bench -use_existing_db=true -benchmarks=compact -compression_type=zstd -compression_max_dict_bytes=$dict_bytes -compression_zstd_max_train_bytes=$train_bytes -compression_use_zstd_dict_trainer=false -block_size=4096 2>&1 | grep elapsed
du -hc /dev/shm/dbbench/*sst | grep total
echo "========== TrainDictionary =========="
TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=filluniquerandom,compact -num=10000000 -compression_type=zstd -compression_max_dict_bytes=$dict_bytes -compression_zstd_max_train_bytes=$train_bytes -block_size=4096 -max_background_jobs=24 -memtablerep=vector -allow_concurrent_memtable_write=false -disable_wal=true -max_write_buffer_number=8 >/dev/null 2>&1
TEST_TMPDIR=/dev/shm /usr/bin/time ./db_bench -use_existing_db=true -benchmarks=compact -compression_type=zstd -compression_max_dict_bytes=$dict_bytes -compression_zstd_max_train_bytes=$train_bytes -block_size=4096 2>&1 | grep elapsed
du -hc /dev/shm/dbbench/*sst | grep total
# Result: TrainDictionary is much better on space saving, but FinalizeDictionary seems to use less memory.
# before compression data size: 1.2GB
dict_bytes=16384
max_dict_buffer_bytes = 1048576
space cpu/memory
No Dictionary 468M 14.93user 1.00system 0:15.92elapsed 100%CPU (0avgtext+0avgdata 23904maxresident)k
Raw Dictionary 251M 15.81user 0.80system 0:16.56elapsed 100%CPU (0avgtext+0avgdata 156808maxresident)k
FinalizeDictionary 236M 11.93user 0.64system 0:12.56elapsed 100%CPU (0avgtext+0avgdata 89548maxresident)k
TrainDictionary 84M 7.29user 0.45system 0:07.75elapsed 100%CPU (0avgtext+0avgdata 97288maxresident)k
```
#### Benchmark on 10 sample SST files for spacing saving and CPU time on compression:
FinalizeDictionary is comparable to TrainDictionary in terms of space saving, and takes less time in compression.
```
dict_bytes=16384
train_bytes=1048576
for sst_file in `ls ../temp/myrock-sst/`
do
echo "********** $sst_file **********"
echo "========== No Dictionary =========="
./sst_dump --file="../temp/myrock-sst/$sst_file" --command=recompress --compression_level_from=6 --compression_level_to=6 --compression_types=kZSTD
echo "========== Raw Content Dictionary =========="
./sst_dump --file="../temp/myrock-sst/$sst_file" --command=recompress --compression_level_from=6 --compression_level_to=6 --compression_types=kZSTD --compression_max_dict_bytes=$dict_bytes
echo "========== FinalizeDictionary =========="
./sst_dump --file="../temp/myrock-sst/$sst_file" --command=recompress --compression_level_from=6 --compression_level_to=6 --compression_types=kZSTD --compression_max_dict_bytes=$dict_bytes --compression_zstd_max_train_bytes=$train_bytes --compression_use_zstd_finalize_dict
echo "========== TrainDictionary =========="
./sst_dump --file="../temp/myrock-sst/$sst_file" --command=recompress --compression_level_from=6 --compression_level_to=6 --compression_types=kZSTD --compression_max_dict_bytes=$dict_bytes --compression_zstd_max_train_bytes=$train_bytes
done
010240.sst (Size/Time) 011029.sst 013184.sst 021552.sst 185054.sst 185137.sst 191666.sst 7560381.sst 7604174.sst 7635312.sst
No Dictionary 28165569 / 2614419 32899411 / 2976832 32977848 / 3055542 31966329 / 2004590 33614351 / 1755877 33429029 / 1717042 33611933 / 1776936 33634045 / 2771417 33789721 / 2205414 33592194 / 388254
Raw Content Dictionary 28019950 / 2697961 33748665 / 3572422 33896373 / 3534701 26418431 / 2259658 28560825 / 1839168 28455030 / 1846039 28494319 / 1861349 32391599 / 3095649 33772142 / 2407843 33592230 / 474523
FinalizeDictionary 27896012 / 2650029 33763886 / 3719427 33904283 / 3552793 26008225 / 2198033 28111872 / 1869530 28014374 / 1789771 28047706 / 1848300 32296254 / 3204027 33698698 / 2381468 33592344 / 517433
TrainDictionary 28046089 / 2740037 33706480 / 3679019 33885741 / 3629351 25087123 / 2204558 27194353 / 1970207 27234229 / 1896811 27166710 / 1903119 32011041 / 3322315 32730692 / 2406146 33608631 / 570593
```
#### Decompression/Read test:
With FinalizeDictionary/TrainDictionary, some data structure used for decompression are in stored in dictionary, so they are expected to be faster in terms of decompression/reads.
```
dict_bytes=16384
train_bytes=1048576
echo "No Dictionary"
TEST_TMPDIR=/dev/shm/ ./db_bench -benchmarks=filluniquerandom,compact -compression_type=zstd -compression_max_dict_bytes=0 > /dev/null 2>&1
TEST_TMPDIR=/dev/shm/ ./db_bench -use_existing_db=true -benchmarks=readrandom -cache_size=0 -compression_type=zstd -compression_max_dict_bytes=0 2>&1 | grep MB/s
echo "Raw Dictionary"
TEST_TMPDIR=/dev/shm/ ./db_bench -benchmarks=filluniquerandom,compact -compression_type=zstd -compression_max_dict_bytes=$dict_bytes > /dev/null 2>&1
TEST_TMPDIR=/dev/shm/ ./db_bench -use_existing_db=true -benchmarks=readrandom -cache_size=0 -compression_type=zstd -compression_max_dict_bytes=$dict_bytes 2>&1 | grep MB/s
echo "FinalizeDict"
TEST_TMPDIR=/dev/shm/ ./db_bench -benchmarks=filluniquerandom,compact -compression_type=zstd -compression_max_dict_bytes=$dict_bytes -compression_zstd_max_train_bytes=$train_bytes -compression_use_zstd_dict_trainer=false > /dev/null 2>&1
TEST_TMPDIR=/dev/shm/ ./db_bench -use_existing_db=true -benchmarks=readrandom -cache_size=0 -compression_type=zstd -compression_max_dict_bytes=$dict_bytes -compression_zstd_max_train_bytes=$train_bytes -compression_use_zstd_dict_trainer=false 2>&1 | grep MB/s
echo "Train Dictionary"
TEST_TMPDIR=/dev/shm/ ./db_bench -benchmarks=filluniquerandom,compact -compression_type=zstd -compression_max_dict_bytes=$dict_bytes -compression_zstd_max_train_bytes=$train_bytes > /dev/null 2>&1
TEST_TMPDIR=/dev/shm/ ./db_bench -use_existing_db=true -benchmarks=readrandom -cache_size=0 -compression_type=zstd -compression_max_dict_bytes=$dict_bytes -compression_zstd_max_train_bytes=$train_bytes 2>&1 | grep MB/s
No Dictionary
readrandom : 12.183 micros/op 82082 ops/sec 12.183 seconds 1000000 operations; 9.1 MB/s (1000000 of 1000000 found)
Raw Dictionary
readrandom : 12.314 micros/op 81205 ops/sec 12.314 seconds 1000000 operations; 9.0 MB/s (1000000 of 1000000 found)
FinalizeDict
readrandom : 9.787 micros/op 102180 ops/sec 9.787 seconds 1000000 operations; 11.3 MB/s (1000000 of 1000000 found)
Train Dictionary
readrandom : 9.698 micros/op 103108 ops/sec 9.699 seconds 1000000 operations; 11.4 MB/s (1000000 of 1000000 found)
```
Reviewed By: ajkr
Differential Revision: D35720026
Pulled By: cbi42
fbshipit-source-id: 24d230fdff0fd28a1bb650658798f00dfcfb2a1f
2126 lines
80 KiB
C++
2126 lines
80 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "table/block_based/block_based_table_builder.h"
|
|
|
|
#include <assert.h>
|
|
#include <stdio.h>
|
|
|
|
#include <atomic>
|
|
#include <list>
|
|
#include <map>
|
|
#include <memory>
|
|
#include <numeric>
|
|
#include <string>
|
|
#include <unordered_map>
|
|
#include <utility>
|
|
|
|
#include "cache/cache_entry_roles.h"
|
|
#include "cache/cache_key.h"
|
|
#include "cache/cache_reservation_manager.h"
|
|
#include "db/dbformat.h"
|
|
#include "index_builder.h"
|
|
#include "logging/logging.h"
|
|
#include "memory/memory_allocator.h"
|
|
#include "rocksdb/cache.h"
|
|
#include "rocksdb/comparator.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/filter_policy.h"
|
|
#include "rocksdb/flush_block_policy.h"
|
|
#include "rocksdb/merge_operator.h"
|
|
#include "rocksdb/table.h"
|
|
#include "rocksdb/types.h"
|
|
#include "table/block_based/block.h"
|
|
#include "table/block_based/block_based_filter_block.h"
|
|
#include "table/block_based/block_based_table_factory.h"
|
|
#include "table/block_based/block_based_table_reader.h"
|
|
#include "table/block_based/block_builder.h"
|
|
#include "table/block_based/block_like_traits.h"
|
|
#include "table/block_based/filter_block.h"
|
|
#include "table/block_based/filter_policy_internal.h"
|
|
#include "table/block_based/full_filter_block.h"
|
|
#include "table/block_based/partitioned_filter_block.h"
|
|
#include "table/format.h"
|
|
#include "table/meta_blocks.h"
|
|
#include "table/table_builder.h"
|
|
#include "util/coding.h"
|
|
#include "util/compression.h"
|
|
#include "util/stop_watch.h"
|
|
#include "util/string_util.h"
|
|
#include "util/work_queue.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
extern const std::string kHashIndexPrefixesBlock;
|
|
extern const std::string kHashIndexPrefixesMetadataBlock;
|
|
|
|
|
|
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
|
|
namespace {
|
|
|
|
constexpr size_t kBlockTrailerSize = BlockBasedTable::kBlockTrailerSize;
|
|
|
|
// Create a filter block builder based on its type.
|
|
FilterBlockBuilder* CreateFilterBlockBuilder(
|
|
const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
|
|
const FilterBuildingContext& context,
|
|
const bool use_delta_encoding_for_index_values,
|
|
PartitionedIndexBuilder* const p_index_builder) {
|
|
const BlockBasedTableOptions& table_opt = context.table_options;
|
|
assert(table_opt.filter_policy); // precondition
|
|
|
|
FilterBitsBuilder* filter_bits_builder =
|
|
BloomFilterPolicy::GetBuilderFromContext(context);
|
|
if (filter_bits_builder == nullptr) {
|
|
return nullptr;
|
|
} else {
|
|
// Check for backdoor deprecated block-based bloom config
|
|
size_t starting_est = filter_bits_builder->EstimateEntriesAdded();
|
|
constexpr auto kSecretStart =
|
|
DeprecatedBlockBasedBloomFilterPolicy::kSecretBitsPerKeyStart;
|
|
if (starting_est >= kSecretStart && starting_est < kSecretStart + 100) {
|
|
int bits_per_key = static_cast<int>(starting_est - kSecretStart);
|
|
delete filter_bits_builder;
|
|
return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
|
|
table_opt, bits_per_key);
|
|
}
|
|
// END check for backdoor deprecated block-based bloom config
|
|
if (table_opt.partition_filters) {
|
|
assert(p_index_builder != nullptr);
|
|
// Since after partition cut request from filter builder it takes time
|
|
// until index builder actully cuts the partition, until the end of a
|
|
// data block potentially with many keys, we take the lower bound as
|
|
// partition size.
|
|
assert(table_opt.block_size_deviation <= 100);
|
|
auto partition_size =
|
|
static_cast<uint32_t>(((table_opt.metadata_block_size *
|
|
(100 - table_opt.block_size_deviation)) +
|
|
99) /
|
|
100);
|
|
partition_size = std::max(partition_size, static_cast<uint32_t>(1));
|
|
return new PartitionedFilterBlockBuilder(
|
|
mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
|
|
filter_bits_builder, table_opt.index_block_restart_interval,
|
|
use_delta_encoding_for_index_values, p_index_builder, partition_size);
|
|
} else {
|
|
return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
|
|
table_opt.whole_key_filtering,
|
|
filter_bits_builder);
|
|
}
|
|
}
|
|
}
|
|
|
|
bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
|
|
// Check to see if compressed less than 12.5%
|
|
return compressed_size < raw_size - (raw_size / 8u);
|
|
}
|
|
|
|
} // namespace
|
|
|
|
// format_version is the block format as defined in include/rocksdb/table.h
|
|
Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
|
|
CompressionType* type, uint32_t format_version,
|
|
bool do_sample, std::string* compressed_output,
|
|
std::string* sampled_output_fast,
|
|
std::string* sampled_output_slow) {
|
|
assert(type);
|
|
assert(compressed_output);
|
|
assert(compressed_output->empty());
|
|
|
|
// If requested, we sample one in every N block with a
|
|
// fast and slow compression algorithm and report the stats.
|
|
// The users can use these stats to decide if it is worthwhile
|
|
// enabling compression and they also get a hint about which
|
|
// compression algorithm wil be beneficial.
|
|
if (do_sample && info.SampleForCompression() &&
|
|
Random::GetTLSInstance()->OneIn(
|
|
static_cast<int>(info.SampleForCompression()))) {
|
|
// Sampling with a fast compression algorithm
|
|
if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) {
|
|
CompressionType c =
|
|
LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
|
|
CompressionContext context(c);
|
|
CompressionOptions options;
|
|
CompressionInfo info_tmp(options, context,
|
|
CompressionDict::GetEmptyDict(), c,
|
|
info.SampleForCompression());
|
|
|
|
CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
|
|
sampled_output_fast);
|
|
}
|
|
|
|
// Sampling with a slow but high-compression algorithm
|
|
if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) {
|
|
CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
|
|
CompressionContext context(c);
|
|
CompressionOptions options;
|
|
CompressionInfo info_tmp(options, context,
|
|
CompressionDict::GetEmptyDict(), c,
|
|
info.SampleForCompression());
|
|
|
|
CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
|
|
sampled_output_slow);
|
|
}
|
|
}
|
|
|
|
if (info.type() == kNoCompression) {
|
|
*type = kNoCompression;
|
|
return raw;
|
|
}
|
|
|
|
// Actually compress the data; if the compression method is not supported,
|
|
// or the compression fails etc., just fall back to uncompressed
|
|
if (!CompressData(raw, info, GetCompressFormatForVersion(format_version),
|
|
compressed_output)) {
|
|
*type = kNoCompression;
|
|
return raw;
|
|
}
|
|
|
|
// Check the compression ratio; if it's not good enough, just fall back to
|
|
// uncompressed
|
|
if (!GoodCompressionRatio(compressed_output->size(), raw.size())) {
|
|
*type = kNoCompression;
|
|
return raw;
|
|
}
|
|
|
|
*type = info.type();
|
|
return *compressed_output;
|
|
}
|
|
|
|
// kBlockBasedTableMagicNumber was picked by running
|
|
// echo rocksdb.table.block_based | sha1sum
|
|
// and taking the leading 64 bits.
|
|
// Please note that kBlockBasedTableMagicNumber may also be accessed by other
|
|
// .cc files
|
|
// for that reason we declare it extern in the header but to get the space
|
|
// allocated
|
|
// it must be not extern in one place.
|
|
const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
|
|
// We also support reading and writing legacy block based table format (for
|
|
// backwards compatibility)
|
|
const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
|
|
|
|
// A collector that collects properties of interest to block-based table.
|
|
// For now this class looks heavy-weight since we only write one additional
|
|
// property.
|
|
// But in the foreseeable future, we will add more and more properties that are
|
|
// specific to block-based table.
|
|
class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
|
|
: public IntTblPropCollector {
|
|
public:
|
|
explicit BlockBasedTablePropertiesCollector(
|
|
BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
|
|
bool prefix_filtering)
|
|
: index_type_(index_type),
|
|
whole_key_filtering_(whole_key_filtering),
|
|
prefix_filtering_(prefix_filtering) {}
|
|
|
|
Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
|
|
uint64_t /*file_size*/) override {
|
|
// Intentionally left blank. Have no interest in collecting stats for
|
|
// individual key/value pairs.
|
|
return Status::OK();
|
|
}
|
|
|
|
virtual void BlockAdd(uint64_t /* block_raw_bytes */,
|
|
uint64_t /* block_compressed_bytes_fast */,
|
|
uint64_t /* block_compressed_bytes_slow */) override {
|
|
// Intentionally left blank. No interest in collecting stats for
|
|
// blocks.
|
|
return;
|
|
}
|
|
|
|
Status Finish(UserCollectedProperties* properties) override {
|
|
std::string val;
|
|
PutFixed32(&val, static_cast<uint32_t>(index_type_));
|
|
properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
|
|
properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
|
|
whole_key_filtering_ ? kPropTrue : kPropFalse});
|
|
properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
|
|
prefix_filtering_ ? kPropTrue : kPropFalse});
|
|
return Status::OK();
|
|
}
|
|
|
|
// The name of the properties collector can be used for debugging purpose.
|
|
const char* Name() const override {
|
|
return "BlockBasedTablePropertiesCollector";
|
|
}
|
|
|
|
UserCollectedProperties GetReadableProperties() const override {
|
|
// Intentionally left blank.
|
|
return UserCollectedProperties();
|
|
}
|
|
|
|
private:
|
|
BlockBasedTableOptions::IndexType index_type_;
|
|
bool whole_key_filtering_;
|
|
bool prefix_filtering_;
|
|
};
|
|
|
|
struct BlockBasedTableBuilder::Rep {
|
|
const ImmutableOptions ioptions;
|
|
const MutableCFOptions moptions;
|
|
const BlockBasedTableOptions table_options;
|
|
const InternalKeyComparator& internal_comparator;
|
|
WritableFileWriter* file;
|
|
std::atomic<uint64_t> offset;
|
|
size_t alignment;
|
|
BlockBuilder data_block;
|
|
// Buffers uncompressed data blocks to replay later. Needed when
|
|
// compression dictionary is enabled so we can finalize the dictionary before
|
|
// compressing any data blocks.
|
|
std::vector<std::string> data_block_buffers;
|
|
BlockBuilder range_del_block;
|
|
|
|
InternalKeySliceTransform internal_prefix_transform;
|
|
std::unique_ptr<IndexBuilder> index_builder;
|
|
PartitionedIndexBuilder* p_index_builder_ = nullptr;
|
|
|
|
std::string last_key;
|
|
const Slice* first_key_in_next_block = nullptr;
|
|
CompressionType compression_type;
|
|
uint64_t sample_for_compression;
|
|
std::atomic<uint64_t> compressible_input_data_bytes;
|
|
std::atomic<uint64_t> uncompressible_input_data_bytes;
|
|
std::atomic<uint64_t> sampled_input_data_bytes;
|
|
std::atomic<uint64_t> sampled_output_slow_data_bytes;
|
|
std::atomic<uint64_t> sampled_output_fast_data_bytes;
|
|
CompressionOptions compression_opts;
|
|
std::unique_ptr<CompressionDict> compression_dict;
|
|
std::vector<std::unique_ptr<CompressionContext>> compression_ctxs;
|
|
std::vector<std::unique_ptr<UncompressionContext>> verify_ctxs;
|
|
std::unique_ptr<UncompressionDict> verify_dict;
|
|
|
|
size_t data_begin_offset = 0;
|
|
|
|
TableProperties props;
|
|
|
|
// States of the builder.
|
|
//
|
|
// - `kBuffered`: This is the initial state where zero or more data blocks are
|
|
// accumulated uncompressed in-memory. From this state, call
|
|
// `EnterUnbuffered()` to finalize the compression dictionary if enabled,
|
|
// compress/write out any buffered blocks, and proceed to the `kUnbuffered`
|
|
// state.
|
|
//
|
|
// - `kUnbuffered`: This is the state when compression dictionary is finalized
|
|
// either because it wasn't enabled in the first place or it's been created
|
|
// from sampling previously buffered data. In this state, blocks are simply
|
|
// compressed/written out as they fill up. From this state, call `Finish()`
|
|
// to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
|
|
// the partially created file.
|
|
//
|
|
// - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
|
|
// called, so the table builder is no longer usable. We must be in this
|
|
// state by the time the destructor runs.
|
|
enum class State {
|
|
kBuffered,
|
|
kUnbuffered,
|
|
kClosed,
|
|
};
|
|
State state;
|
|
// `kBuffered` state is allowed only as long as the buffering of uncompressed
|
|
// data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
|
|
uint64_t buffer_limit;
|
|
std::shared_ptr<CacheReservationManager>
|
|
compression_dict_buffer_cache_res_mgr;
|
|
const bool use_delta_encoding_for_index_values;
|
|
std::unique_ptr<FilterBlockBuilder> filter_builder;
|
|
OffsetableCacheKey base_cache_key;
|
|
const TableFileCreationReason reason;
|
|
|
|
BlockHandle pending_handle; // Handle to add to index block
|
|
|
|
std::string compressed_output;
|
|
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
|
|
|
|
std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
|
|
|
|
std::unique_ptr<ParallelCompressionRep> pc_rep;
|
|
|
|
uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
|
|
void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
|
|
|
|
bool IsParallelCompressionEnabled() const {
|
|
return compression_opts.parallel_threads > 1;
|
|
}
|
|
|
|
Status GetStatus() {
|
|
// We need to make modifications of status visible when status_ok is set
|
|
// to false, and this is ensured by status_mutex, so no special memory
|
|
// order for status_ok is required.
|
|
if (status_ok.load(std::memory_order_relaxed)) {
|
|
return Status::OK();
|
|
} else {
|
|
return CopyStatus();
|
|
}
|
|
}
|
|
|
|
Status CopyStatus() {
|
|
std::lock_guard<std::mutex> lock(status_mutex);
|
|
return status;
|
|
}
|
|
|
|
IOStatus GetIOStatus() {
|
|
// We need to make modifications of io_status visible when status_ok is set
|
|
// to false, and this is ensured by io_status_mutex, so no special memory
|
|
// order for io_status_ok is required.
|
|
if (io_status_ok.load(std::memory_order_relaxed)) {
|
|
return IOStatus::OK();
|
|
} else {
|
|
return CopyIOStatus();
|
|
}
|
|
}
|
|
|
|
IOStatus CopyIOStatus() {
|
|
std::lock_guard<std::mutex> lock(io_status_mutex);
|
|
return io_status;
|
|
}
|
|
|
|
// Never erase an existing status that is not OK.
|
|
void SetStatus(Status s) {
|
|
if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
|
|
// Locking is an overkill for non compression_opts.parallel_threads
|
|
// case but since it's unlikely that s is not OK, we take this cost
|
|
// to be simplicity.
|
|
std::lock_guard<std::mutex> lock(status_mutex);
|
|
status = s;
|
|
status_ok.store(false, std::memory_order_relaxed);
|
|
}
|
|
}
|
|
|
|
// Never erase an existing I/O status that is not OK.
|
|
// Calling this will also SetStatus(ios)
|
|
void SetIOStatus(IOStatus ios) {
|
|
if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
|
|
// Locking is an overkill for non compression_opts.parallel_threads
|
|
// case but since it's unlikely that s is not OK, we take this cost
|
|
// to be simplicity.
|
|
std::lock_guard<std::mutex> lock(io_status_mutex);
|
|
io_status = ios;
|
|
io_status_ok.store(false, std::memory_order_relaxed);
|
|
}
|
|
SetStatus(ios);
|
|
}
|
|
|
|
Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo,
|
|
WritableFileWriter* f)
|
|
: ioptions(tbo.ioptions),
|
|
moptions(tbo.moptions),
|
|
table_options(table_opt),
|
|
internal_comparator(tbo.internal_comparator),
|
|
file(f),
|
|
offset(0),
|
|
alignment(table_options.block_align
|
|
? std::min(static_cast<size_t>(table_options.block_size),
|
|
kDefaultPageSize)
|
|
: 0),
|
|
data_block(table_options.block_restart_interval,
|
|
table_options.use_delta_encoding,
|
|
false /* use_value_delta_encoding */,
|
|
tbo.internal_comparator.user_comparator()
|
|
->CanKeysWithDifferentByteContentsBeEqual()
|
|
? BlockBasedTableOptions::kDataBlockBinarySearch
|
|
: table_options.data_block_index_type,
|
|
table_options.data_block_hash_table_util_ratio),
|
|
range_del_block(1 /* block_restart_interval */),
|
|
internal_prefix_transform(tbo.moptions.prefix_extractor.get()),
|
|
compression_type(tbo.compression_type),
|
|
sample_for_compression(tbo.moptions.sample_for_compression),
|
|
compressible_input_data_bytes(0),
|
|
uncompressible_input_data_bytes(0),
|
|
sampled_input_data_bytes(0),
|
|
sampled_output_slow_data_bytes(0),
|
|
sampled_output_fast_data_bytes(0),
|
|
compression_opts(tbo.compression_opts),
|
|
compression_dict(),
|
|
compression_ctxs(tbo.compression_opts.parallel_threads),
|
|
verify_ctxs(tbo.compression_opts.parallel_threads),
|
|
verify_dict(),
|
|
state((tbo.compression_opts.max_dict_bytes > 0) ? State::kBuffered
|
|
: State::kUnbuffered),
|
|
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
|
|
!table_opt.block_align),
|
|
reason(tbo.reason),
|
|
flush_block_policy(
|
|
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
|
|
table_options, data_block)),
|
|
status_ok(true),
|
|
io_status_ok(true) {
|
|
if (tbo.target_file_size == 0) {
|
|
buffer_limit = compression_opts.max_dict_buffer_bytes;
|
|
} else if (compression_opts.max_dict_buffer_bytes == 0) {
|
|
buffer_limit = tbo.target_file_size;
|
|
} else {
|
|
buffer_limit = std::min(tbo.target_file_size,
|
|
compression_opts.max_dict_buffer_bytes);
|
|
}
|
|
|
|
const auto compress_dict_build_buffer_charged =
|
|
table_options.cache_usage_options.options_overrides
|
|
.at(CacheEntryRole::kCompressionDictionaryBuildingBuffer)
|
|
.charged;
|
|
if (table_options.block_cache &&
|
|
(compress_dict_build_buffer_charged ==
|
|
CacheEntryRoleOptions::Decision::kEnabled ||
|
|
compress_dict_build_buffer_charged ==
|
|
CacheEntryRoleOptions::Decision::kFallback)) {
|
|
compression_dict_buffer_cache_res_mgr =
|
|
std::make_shared<CacheReservationManagerImpl<
|
|
CacheEntryRole::kCompressionDictionaryBuildingBuffer>>(
|
|
table_options.block_cache);
|
|
} else {
|
|
compression_dict_buffer_cache_res_mgr = nullptr;
|
|
}
|
|
|
|
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
|
|
compression_ctxs[i].reset(new CompressionContext(compression_type));
|
|
}
|
|
if (table_options.index_type ==
|
|
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
|
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
|
|
&internal_comparator, use_delta_encoding_for_index_values,
|
|
table_options);
|
|
index_builder.reset(p_index_builder_);
|
|
} else {
|
|
index_builder.reset(IndexBuilder::CreateIndexBuilder(
|
|
table_options.index_type, &internal_comparator,
|
|
&this->internal_prefix_transform, use_delta_encoding_for_index_values,
|
|
table_options));
|
|
}
|
|
if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
|
|
// Apply optimize_filters_for_hits setting here when applicable by
|
|
// skipping filter generation
|
|
filter_builder.reset();
|
|
} else if (tbo.skip_filters) {
|
|
// For SstFileWriter skip_filters
|
|
filter_builder.reset();
|
|
} else if (!table_options.filter_policy) {
|
|
// Null filter_policy -> no filter
|
|
filter_builder.reset();
|
|
} else {
|
|
FilterBuildingContext filter_context(table_options);
|
|
|
|
filter_context.info_log = ioptions.logger;
|
|
filter_context.column_family_name = tbo.column_family_name;
|
|
filter_context.reason = reason;
|
|
|
|
// Only populate other fields if known to be in LSM rather than
|
|
// generating external SST file
|
|
if (reason != TableFileCreationReason::kMisc) {
|
|
filter_context.compaction_style = ioptions.compaction_style;
|
|
filter_context.num_levels = ioptions.num_levels;
|
|
filter_context.level_at_creation = tbo.level_at_creation;
|
|
filter_context.is_bottommost = tbo.is_bottommost;
|
|
assert(filter_context.level_at_creation < filter_context.num_levels);
|
|
}
|
|
|
|
filter_builder.reset(CreateFilterBlockBuilder(
|
|
ioptions, moptions, filter_context,
|
|
use_delta_encoding_for_index_values, p_index_builder_));
|
|
}
|
|
|
|
assert(tbo.int_tbl_prop_collector_factories);
|
|
for (auto& factory : *tbo.int_tbl_prop_collector_factories) {
|
|
assert(factory);
|
|
|
|
table_properties_collectors.emplace_back(
|
|
factory->CreateIntTblPropCollector(tbo.column_family_id,
|
|
tbo.level_at_creation));
|
|
}
|
|
table_properties_collectors.emplace_back(
|
|
new BlockBasedTablePropertiesCollector(
|
|
table_options.index_type, table_options.whole_key_filtering,
|
|
moptions.prefix_extractor != nullptr));
|
|
const Comparator* ucmp = tbo.internal_comparator.user_comparator();
|
|
assert(ucmp);
|
|
if (ucmp->timestamp_size() > 0) {
|
|
table_properties_collectors.emplace_back(
|
|
new TimestampTablePropertiesCollector(ucmp));
|
|
}
|
|
if (table_options.verify_compression) {
|
|
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
|
|
verify_ctxs[i].reset(new UncompressionContext(compression_type));
|
|
}
|
|
}
|
|
|
|
// These are only needed for populating table properties
|
|
props.column_family_id = tbo.column_family_id;
|
|
props.column_family_name = tbo.column_family_name;
|
|
props.creation_time = tbo.creation_time;
|
|
props.oldest_key_time = tbo.oldest_key_time;
|
|
props.file_creation_time = tbo.file_creation_time;
|
|
props.orig_file_number = tbo.cur_file_num;
|
|
props.db_id = tbo.db_id;
|
|
props.db_session_id = tbo.db_session_id;
|
|
props.db_host_id = ioptions.db_host_id;
|
|
if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) {
|
|
ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set");
|
|
}
|
|
}
|
|
|
|
Rep(const Rep&) = delete;
|
|
Rep& operator=(const Rep&) = delete;
|
|
|
|
private:
|
|
// Synchronize status & io_status accesses across threads from main thread,
|
|
// compression thread and write thread in parallel compression.
|
|
std::mutex status_mutex;
|
|
std::atomic<bool> status_ok;
|
|
Status status;
|
|
std::mutex io_status_mutex;
|
|
std::atomic<bool> io_status_ok;
|
|
IOStatus io_status;
|
|
};
|
|
|
|
struct BlockBasedTableBuilder::ParallelCompressionRep {
|
|
// Keys is a wrapper of vector of strings avoiding
|
|
// releasing string memories during vector clear()
|
|
// in order to save memory allocation overhead
|
|
class Keys {
|
|
public:
|
|
Keys() : keys_(kKeysInitSize), size_(0) {}
|
|
void PushBack(const Slice& key) {
|
|
if (size_ == keys_.size()) {
|
|
keys_.emplace_back(key.data(), key.size());
|
|
} else {
|
|
keys_[size_].assign(key.data(), key.size());
|
|
}
|
|
size_++;
|
|
}
|
|
void SwapAssign(std::vector<std::string>& keys) {
|
|
size_ = keys.size();
|
|
std::swap(keys_, keys);
|
|
}
|
|
void Clear() { size_ = 0; }
|
|
size_t Size() { return size_; }
|
|
std::string& Back() { return keys_[size_ - 1]; }
|
|
std::string& operator[](size_t idx) {
|
|
assert(idx < size_);
|
|
return keys_[idx];
|
|
}
|
|
|
|
private:
|
|
const size_t kKeysInitSize = 32;
|
|
std::vector<std::string> keys_;
|
|
size_t size_;
|
|
};
|
|
std::unique_ptr<Keys> curr_block_keys;
|
|
|
|
class BlockRepSlot;
|
|
|
|
// BlockRep instances are fetched from and recycled to
|
|
// block_rep_pool during parallel compression.
|
|
struct BlockRep {
|
|
Slice contents;
|
|
Slice compressed_contents;
|
|
std::unique_ptr<std::string> data;
|
|
std::unique_ptr<std::string> compressed_data;
|
|
CompressionType compression_type;
|
|
std::unique_ptr<std::string> first_key_in_next_block;
|
|
std::unique_ptr<Keys> keys;
|
|
std::unique_ptr<BlockRepSlot> slot;
|
|
Status status;
|
|
};
|
|
// Use a vector of BlockRep as a buffer for a determined number
|
|
// of BlockRep structures. All data referenced by pointers in
|
|
// BlockRep will be freed when this vector is destructed.
|
|
using BlockRepBuffer = std::vector<BlockRep>;
|
|
BlockRepBuffer block_rep_buf;
|
|
// Use a thread-safe queue for concurrent access from block
|
|
// building thread and writer thread.
|
|
using BlockRepPool = WorkQueue<BlockRep*>;
|
|
BlockRepPool block_rep_pool;
|
|
|
|
// Use BlockRepSlot to keep block order in write thread.
|
|
// slot_ will pass references to BlockRep
|
|
class BlockRepSlot {
|
|
public:
|
|
BlockRepSlot() : slot_(1) {}
|
|
template <typename T>
|
|
void Fill(T&& rep) {
|
|
slot_.push(std::forward<T>(rep));
|
|
};
|
|
void Take(BlockRep*& rep) { slot_.pop(rep); }
|
|
|
|
private:
|
|
// slot_ will pass references to BlockRep in block_rep_buf,
|
|
// and those references are always valid before the destruction of
|
|
// block_rep_buf.
|
|
WorkQueue<BlockRep*> slot_;
|
|
};
|
|
|
|
// Compression queue will pass references to BlockRep in block_rep_buf,
|
|
// and those references are always valid before the destruction of
|
|
// block_rep_buf.
|
|
using CompressQueue = WorkQueue<BlockRep*>;
|
|
CompressQueue compress_queue;
|
|
std::vector<port::Thread> compress_thread_pool;
|
|
|
|
// Write queue will pass references to BlockRep::slot in block_rep_buf,
|
|
// and those references are always valid before the corresponding
|
|
// BlockRep::slot is destructed, which is before the destruction of
|
|
// block_rep_buf.
|
|
using WriteQueue = WorkQueue<BlockRepSlot*>;
|
|
WriteQueue write_queue;
|
|
std::unique_ptr<port::Thread> write_thread;
|
|
|
|
// Estimate output file size when parallel compression is enabled. This is
|
|
// necessary because compression & flush are no longer synchronized,
|
|
// and BlockBasedTableBuilder::FileSize() is no longer accurate.
|
|
// memory_order_relaxed suffices because accurate statistics is not required.
|
|
class FileSizeEstimator {
|
|
public:
|
|
explicit FileSizeEstimator()
|
|
: raw_bytes_compressed(0),
|
|
raw_bytes_curr_block(0),
|
|
raw_bytes_curr_block_set(false),
|
|
raw_bytes_inflight(0),
|
|
blocks_inflight(0),
|
|
curr_compression_ratio(0),
|
|
estimated_file_size(0) {}
|
|
|
|
// Estimate file size when a block is about to be emitted to
|
|
// compression thread
|
|
void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) {
|
|
uint64_t new_raw_bytes_inflight =
|
|
raw_bytes_inflight.fetch_add(raw_block_size,
|
|
std::memory_order_relaxed) +
|
|
raw_block_size;
|
|
|
|
uint64_t new_blocks_inflight =
|
|
blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
|
|
|
|
estimated_file_size.store(
|
|
curr_file_size +
|
|
static_cast<uint64_t>(
|
|
static_cast<double>(new_raw_bytes_inflight) *
|
|
curr_compression_ratio.load(std::memory_order_relaxed)) +
|
|
new_blocks_inflight * kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
}
|
|
|
|
// Estimate file size when a block is already reaped from
|
|
// compression thread
|
|
void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
|
|
assert(raw_bytes_curr_block_set);
|
|
|
|
uint64_t new_raw_bytes_compressed =
|
|
raw_bytes_compressed + raw_bytes_curr_block;
|
|
assert(new_raw_bytes_compressed > 0);
|
|
|
|
curr_compression_ratio.store(
|
|
(curr_compression_ratio.load(std::memory_order_relaxed) *
|
|
raw_bytes_compressed +
|
|
compressed_block_size) /
|
|
static_cast<double>(new_raw_bytes_compressed),
|
|
std::memory_order_relaxed);
|
|
raw_bytes_compressed = new_raw_bytes_compressed;
|
|
|
|
uint64_t new_raw_bytes_inflight =
|
|
raw_bytes_inflight.fetch_sub(raw_bytes_curr_block,
|
|
std::memory_order_relaxed) -
|
|
raw_bytes_curr_block;
|
|
|
|
uint64_t new_blocks_inflight =
|
|
blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
|
|
|
|
estimated_file_size.store(
|
|
curr_file_size +
|
|
static_cast<uint64_t>(
|
|
static_cast<double>(new_raw_bytes_inflight) *
|
|
curr_compression_ratio.load(std::memory_order_relaxed)) +
|
|
new_blocks_inflight * kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
|
|
raw_bytes_curr_block_set = false;
|
|
}
|
|
|
|
void SetEstimatedFileSize(uint64_t size) {
|
|
estimated_file_size.store(size, std::memory_order_relaxed);
|
|
}
|
|
|
|
uint64_t GetEstimatedFileSize() {
|
|
return estimated_file_size.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
void SetCurrBlockRawSize(uint64_t size) {
|
|
raw_bytes_curr_block = size;
|
|
raw_bytes_curr_block_set = true;
|
|
}
|
|
|
|
private:
|
|
// Raw bytes compressed so far.
|
|
uint64_t raw_bytes_compressed;
|
|
// Size of current block being appended.
|
|
uint64_t raw_bytes_curr_block;
|
|
// Whether raw_bytes_curr_block has been set for next
|
|
// ReapBlock call.
|
|
bool raw_bytes_curr_block_set;
|
|
// Raw bytes under compression and not appended yet.
|
|
std::atomic<uint64_t> raw_bytes_inflight;
|
|
// Number of blocks under compression and not appended yet.
|
|
std::atomic<uint64_t> blocks_inflight;
|
|
// Current compression ratio, maintained by BGWorkWriteRawBlock.
|
|
std::atomic<double> curr_compression_ratio;
|
|
// Estimated SST file size.
|
|
std::atomic<uint64_t> estimated_file_size;
|
|
};
|
|
FileSizeEstimator file_size_estimator;
|
|
|
|
// Facilities used for waiting first block completion. Need to Wait for
|
|
// the completion of first block compression and flush to get a non-zero
|
|
// compression ratio.
|
|
std::atomic<bool> first_block_processed;
|
|
std::condition_variable first_block_cond;
|
|
std::mutex first_block_mutex;
|
|
|
|
explicit ParallelCompressionRep(uint32_t parallel_threads)
|
|
: curr_block_keys(new Keys()),
|
|
block_rep_buf(parallel_threads),
|
|
block_rep_pool(parallel_threads),
|
|
compress_queue(parallel_threads),
|
|
write_queue(parallel_threads),
|
|
first_block_processed(false) {
|
|
for (uint32_t i = 0; i < parallel_threads; i++) {
|
|
block_rep_buf[i].contents = Slice();
|
|
block_rep_buf[i].compressed_contents = Slice();
|
|
block_rep_buf[i].data.reset(new std::string());
|
|
block_rep_buf[i].compressed_data.reset(new std::string());
|
|
block_rep_buf[i].compression_type = CompressionType();
|
|
block_rep_buf[i].first_key_in_next_block.reset(new std::string());
|
|
block_rep_buf[i].keys.reset(new Keys());
|
|
block_rep_buf[i].slot.reset(new BlockRepSlot());
|
|
block_rep_buf[i].status = Status::OK();
|
|
block_rep_pool.push(&block_rep_buf[i]);
|
|
}
|
|
}
|
|
|
|
~ParallelCompressionRep() { block_rep_pool.finish(); }
|
|
|
|
// Make a block prepared to be emitted to compression thread
|
|
// Used in non-buffered mode
|
|
BlockRep* PrepareBlock(CompressionType compression_type,
|
|
const Slice* first_key_in_next_block,
|
|
BlockBuilder* data_block) {
|
|
BlockRep* block_rep =
|
|
PrepareBlockInternal(compression_type, first_key_in_next_block);
|
|
assert(block_rep != nullptr);
|
|
data_block->SwapAndReset(*(block_rep->data));
|
|
block_rep->contents = *(block_rep->data);
|
|
std::swap(block_rep->keys, curr_block_keys);
|
|
curr_block_keys->Clear();
|
|
return block_rep;
|
|
}
|
|
|
|
// Used in EnterUnbuffered
|
|
BlockRep* PrepareBlock(CompressionType compression_type,
|
|
const Slice* first_key_in_next_block,
|
|
std::string* data_block,
|
|
std::vector<std::string>* keys) {
|
|
BlockRep* block_rep =
|
|
PrepareBlockInternal(compression_type, first_key_in_next_block);
|
|
assert(block_rep != nullptr);
|
|
std::swap(*(block_rep->data), *data_block);
|
|
block_rep->contents = *(block_rep->data);
|
|
block_rep->keys->SwapAssign(*keys);
|
|
return block_rep;
|
|
}
|
|
|
|
// Emit a block to compression thread
|
|
void EmitBlock(BlockRep* block_rep) {
|
|
assert(block_rep != nullptr);
|
|
assert(block_rep->status.ok());
|
|
if (!write_queue.push(block_rep->slot.get())) {
|
|
return;
|
|
}
|
|
if (!compress_queue.push(block_rep)) {
|
|
return;
|
|
}
|
|
|
|
if (!first_block_processed.load(std::memory_order_relaxed)) {
|
|
std::unique_lock<std::mutex> lock(first_block_mutex);
|
|
first_block_cond.wait(lock, [this] {
|
|
return first_block_processed.load(std::memory_order_relaxed);
|
|
});
|
|
}
|
|
}
|
|
|
|
// Reap a block from compression thread
|
|
void ReapBlock(BlockRep* block_rep) {
|
|
assert(block_rep != nullptr);
|
|
block_rep->compressed_data->clear();
|
|
block_rep_pool.push(block_rep);
|
|
|
|
if (!first_block_processed.load(std::memory_order_relaxed)) {
|
|
std::lock_guard<std::mutex> lock(first_block_mutex);
|
|
first_block_processed.store(true, std::memory_order_relaxed);
|
|
first_block_cond.notify_one();
|
|
}
|
|
}
|
|
|
|
private:
|
|
BlockRep* PrepareBlockInternal(CompressionType compression_type,
|
|
const Slice* first_key_in_next_block) {
|
|
BlockRep* block_rep = nullptr;
|
|
block_rep_pool.pop(block_rep);
|
|
assert(block_rep != nullptr);
|
|
|
|
assert(block_rep->data);
|
|
|
|
block_rep->compression_type = compression_type;
|
|
|
|
if (first_key_in_next_block == nullptr) {
|
|
block_rep->first_key_in_next_block.reset(nullptr);
|
|
} else {
|
|
block_rep->first_key_in_next_block->assign(
|
|
first_key_in_next_block->data(), first_key_in_next_block->size());
|
|
}
|
|
|
|
return block_rep;
|
|
}
|
|
};
|
|
|
|
BlockBasedTableBuilder::BlockBasedTableBuilder(
|
|
const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
|
|
WritableFileWriter* file) {
|
|
BlockBasedTableOptions sanitized_table_options(table_options);
|
|
if (sanitized_table_options.format_version == 0 &&
|
|
sanitized_table_options.checksum != kCRC32c) {
|
|
ROCKS_LOG_WARN(
|
|
tbo.ioptions.logger,
|
|
"Silently converting format_version to 1 because checksum is "
|
|
"non-default");
|
|
// silently convert format_version to 1 to keep consistent with current
|
|
// behavior
|
|
sanitized_table_options.format_version = 1;
|
|
}
|
|
|
|
rep_ = new Rep(sanitized_table_options, tbo, file);
|
|
|
|
if (rep_->filter_builder != nullptr) {
|
|
rep_->filter_builder->StartBlock(0);
|
|
}
|
|
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey",
|
|
const_cast<TableProperties*>(&rep_->props));
|
|
|
|
// Extremely large files use atypical cache key encoding, and we don't
|
|
// know ahead of time how big the file will be. But assuming it's less
|
|
// than 4TB, we will correctly predict the cache keys.
|
|
BlockBasedTable::SetupBaseCacheKey(
|
|
&rep_->props, tbo.db_session_id, tbo.cur_file_num,
|
|
BlockBasedTable::kMaxFileSizeStandardEncoding, &rep_->base_cache_key);
|
|
|
|
if (rep_->IsParallelCompressionEnabled()) {
|
|
StartParallelCompression();
|
|
}
|
|
}
|
|
|
|
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
|
|
// Catch errors where caller forgot to call Finish()
|
|
assert(rep_->state == Rep::State::kClosed);
|
|
delete rep_;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
|
|
Rep* r = rep_;
|
|
assert(rep_->state != Rep::State::kClosed);
|
|
if (!ok()) return;
|
|
ValueType value_type = ExtractValueType(key);
|
|
if (IsValueType(value_type)) {
|
|
#ifndef NDEBUG
|
|
if (r->props.num_entries > r->props.num_range_deletions) {
|
|
assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
|
|
}
|
|
#endif // !NDEBUG
|
|
|
|
auto should_flush = r->flush_block_policy->Update(key, value);
|
|
if (should_flush) {
|
|
assert(!r->data_block.empty());
|
|
r->first_key_in_next_block = &key;
|
|
Flush();
|
|
if (r->state == Rep::State::kBuffered) {
|
|
bool exceeds_buffer_limit =
|
|
(r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
|
|
bool exceeds_global_block_cache_limit = false;
|
|
|
|
// Increase cache charging for the last buffered data block
|
|
// only if the block is not going to be unbuffered immediately
|
|
// and there exists a cache reservation manager
|
|
if (!exceeds_buffer_limit &&
|
|
r->compression_dict_buffer_cache_res_mgr != nullptr) {
|
|
Status s =
|
|
r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
|
|
r->data_begin_offset);
|
|
exceeds_global_block_cache_limit = s.IsIncomplete();
|
|
}
|
|
|
|
if (exceeds_buffer_limit || exceeds_global_block_cache_limit) {
|
|
EnterUnbuffered();
|
|
}
|
|
}
|
|
|
|
// Add item to index block.
|
|
// We do not emit the index entry for a block until we have seen the
|
|
// first key for the next data block. This allows us to use shorter
|
|
// keys in the index block. For example, consider a block boundary
|
|
// between the keys "the quick brown fox" and "the who". We can use
|
|
// "the r" as the key for the index block entry since it is >= all
|
|
// entries in the first block and < all entries in subsequent
|
|
// blocks.
|
|
if (ok() && r->state == Rep::State::kUnbuffered) {
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
r->pc_rep->curr_block_keys->Clear();
|
|
} else {
|
|
r->index_builder->AddIndexEntry(&r->last_key, &key,
|
|
r->pending_handle);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Note: PartitionedFilterBlockBuilder requires key being added to filter
|
|
// builder after being added to index builder.
|
|
if (r->state == Rep::State::kUnbuffered) {
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
r->pc_rep->curr_block_keys->PushBack(key);
|
|
} else {
|
|
if (r->filter_builder != nullptr) {
|
|
size_t ts_sz =
|
|
r->internal_comparator.user_comparator()->timestamp_size();
|
|
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
|
|
}
|
|
}
|
|
}
|
|
|
|
r->data_block.AddWithLastKey(key, value, r->last_key);
|
|
r->last_key.assign(key.data(), key.size());
|
|
if (r->state == Rep::State::kBuffered) {
|
|
// Buffered keys will be replayed from data_block_buffers during
|
|
// `Finish()` once compression dictionary has been finalized.
|
|
} else {
|
|
if (!r->IsParallelCompressionEnabled()) {
|
|
r->index_builder->OnKeyAdded(key);
|
|
}
|
|
}
|
|
// TODO offset passed in is not accurate for parallel compression case
|
|
NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
|
|
r->table_properties_collectors,
|
|
r->ioptions.logger);
|
|
|
|
} else if (value_type == kTypeRangeDeletion) {
|
|
r->range_del_block.Add(key, value);
|
|
// TODO offset passed in is not accurate for parallel compression case
|
|
NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
|
|
r->table_properties_collectors,
|
|
r->ioptions.logger);
|
|
} else {
|
|
assert(false);
|
|
}
|
|
|
|
r->props.num_entries++;
|
|
r->props.raw_key_size += key.size();
|
|
r->props.raw_value_size += value.size();
|
|
if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
|
|
r->props.num_deletions++;
|
|
} else if (value_type == kTypeRangeDeletion) {
|
|
r->props.num_deletions++;
|
|
r->props.num_range_deletions++;
|
|
} else if (value_type == kTypeMerge) {
|
|
r->props.num_merge_operands++;
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::Flush() {
|
|
Rep* r = rep_;
|
|
assert(rep_->state != Rep::State::kClosed);
|
|
if (!ok()) return;
|
|
if (r->data_block.empty()) return;
|
|
if (r->IsParallelCompressionEnabled() &&
|
|
r->state == Rep::State::kUnbuffered) {
|
|
r->data_block.Finish();
|
|
ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
|
|
r->compression_type, r->first_key_in_next_block, &(r->data_block));
|
|
assert(block_rep != nullptr);
|
|
r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
|
|
r->get_offset());
|
|
r->pc_rep->EmitBlock(block_rep);
|
|
} else {
|
|
WriteBlock(&r->data_block, &r->pending_handle, BlockType::kData);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
|
|
BlockHandle* handle,
|
|
BlockType block_type) {
|
|
block->Finish();
|
|
std::string raw_block_contents;
|
|
raw_block_contents.reserve(rep_->table_options.block_size);
|
|
block->SwapAndReset(raw_block_contents);
|
|
if (rep_->state == Rep::State::kBuffered) {
|
|
assert(block_type == BlockType::kData);
|
|
rep_->data_block_buffers.emplace_back(std::move(raw_block_contents));
|
|
rep_->data_begin_offset += rep_->data_block_buffers.back().size();
|
|
return;
|
|
}
|
|
WriteBlock(raw_block_contents, handle, block_type);
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
|
|
BlockHandle* handle,
|
|
BlockType block_type) {
|
|
Rep* r = rep_;
|
|
assert(r->state == Rep::State::kUnbuffered);
|
|
Slice block_contents;
|
|
CompressionType type;
|
|
Status compress_status;
|
|
bool is_data_block = block_type == BlockType::kData;
|
|
CompressAndVerifyBlock(raw_block_contents, is_data_block,
|
|
*(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
|
|
&(r->compressed_output), &(block_contents), &type,
|
|
&compress_status);
|
|
r->SetStatus(compress_status);
|
|
if (!ok()) {
|
|
return;
|
|
}
|
|
|
|
WriteRawBlock(block_contents, type, handle, block_type, &raw_block_contents);
|
|
r->compressed_output.clear();
|
|
if (is_data_block) {
|
|
if (r->filter_builder != nullptr) {
|
|
r->filter_builder->StartBlock(r->get_offset());
|
|
}
|
|
r->props.data_size = r->get_offset();
|
|
++r->props.num_data_blocks;
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::BGWorkCompression(
|
|
const CompressionContext& compression_ctx,
|
|
UncompressionContext* verify_ctx) {
|
|
ParallelCompressionRep::BlockRep* block_rep = nullptr;
|
|
while (rep_->pc_rep->compress_queue.pop(block_rep)) {
|
|
assert(block_rep != nullptr);
|
|
CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/
|
|
compression_ctx, verify_ctx,
|
|
block_rep->compressed_data.get(),
|
|
&block_rep->compressed_contents,
|
|
&(block_rep->compression_type), &block_rep->status);
|
|
block_rep->slot->Fill(block_rep);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::CompressAndVerifyBlock(
|
|
const Slice& raw_block_contents, bool is_data_block,
|
|
const CompressionContext& compression_ctx, UncompressionContext* verify_ctx,
|
|
std::string* compressed_output, Slice* block_contents,
|
|
CompressionType* type, Status* out_status) {
|
|
// File format contains a sequence of blocks where each block has:
|
|
// block_data: uint8[n]
|
|
// type: uint8
|
|
// crc: uint32
|
|
Rep* r = rep_;
|
|
bool is_status_ok = ok();
|
|
if (!r->IsParallelCompressionEnabled()) {
|
|
assert(is_status_ok);
|
|
}
|
|
|
|
*type = r->compression_type;
|
|
uint64_t sample_for_compression = r->sample_for_compression;
|
|
bool abort_compression = false;
|
|
|
|
StopWatchNano timer(
|
|
r->ioptions.clock,
|
|
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
|
|
|
|
if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
|
|
if (is_data_block) {
|
|
r->compressible_input_data_bytes.fetch_add(raw_block_contents.size(),
|
|
std::memory_order_relaxed);
|
|
}
|
|
const CompressionDict* compression_dict;
|
|
if (!is_data_block || r->compression_dict == nullptr) {
|
|
compression_dict = &CompressionDict::GetEmptyDict();
|
|
} else {
|
|
compression_dict = r->compression_dict.get();
|
|
}
|
|
assert(compression_dict != nullptr);
|
|
CompressionInfo compression_info(r->compression_opts, compression_ctx,
|
|
*compression_dict, *type,
|
|
sample_for_compression);
|
|
|
|
std::string sampled_output_fast;
|
|
std::string sampled_output_slow;
|
|
*block_contents = CompressBlock(
|
|
raw_block_contents, compression_info, type,
|
|
r->table_options.format_version, is_data_block /* do_sample */,
|
|
compressed_output, &sampled_output_fast, &sampled_output_slow);
|
|
|
|
if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) {
|
|
// Currently compression sampling is only enabled for data block.
|
|
assert(is_data_block);
|
|
r->sampled_input_data_bytes.fetch_add(raw_block_contents.size(),
|
|
std::memory_order_relaxed);
|
|
r->sampled_output_slow_data_bytes.fetch_add(sampled_output_slow.size(),
|
|
std::memory_order_relaxed);
|
|
r->sampled_output_fast_data_bytes.fetch_add(sampled_output_fast.size(),
|
|
std::memory_order_relaxed);
|
|
}
|
|
// notify collectors on block add
|
|
NotifyCollectTableCollectorsOnBlockAdd(
|
|
r->table_properties_collectors, raw_block_contents.size(),
|
|
sampled_output_fast.size(), sampled_output_slow.size());
|
|
|
|
// Some of the compression algorithms are known to be unreliable. If
|
|
// the verify_compression flag is set then try to de-compress the
|
|
// compressed data and compare to the input.
|
|
if (*type != kNoCompression && r->table_options.verify_compression) {
|
|
// Retrieve the uncompressed contents into a new buffer
|
|
const UncompressionDict* verify_dict;
|
|
if (!is_data_block || r->verify_dict == nullptr) {
|
|
verify_dict = &UncompressionDict::GetEmptyDict();
|
|
} else {
|
|
verify_dict = r->verify_dict.get();
|
|
}
|
|
assert(verify_dict != nullptr);
|
|
BlockContents contents;
|
|
UncompressionInfo uncompression_info(*verify_ctx, *verify_dict,
|
|
r->compression_type);
|
|
Status stat = UncompressBlockContentsForCompressionType(
|
|
uncompression_info, block_contents->data(), block_contents->size(),
|
|
&contents, r->table_options.format_version, r->ioptions);
|
|
|
|
if (stat.ok()) {
|
|
bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
|
|
if (!compressed_ok) {
|
|
// The result of the compression was invalid. abort.
|
|
abort_compression = true;
|
|
ROCKS_LOG_ERROR(r->ioptions.logger,
|
|
"Decompressed block did not match raw block");
|
|
*out_status =
|
|
Status::Corruption("Decompressed block did not match raw block");
|
|
}
|
|
} else {
|
|
// Decompression reported an error. abort.
|
|
*out_status = Status::Corruption(std::string("Could not decompress: ") +
|
|
stat.getState());
|
|
abort_compression = true;
|
|
}
|
|
}
|
|
} else {
|
|
// Block is too big to be compressed.
|
|
if (is_data_block) {
|
|
r->uncompressible_input_data_bytes.fetch_add(raw_block_contents.size(),
|
|
std::memory_order_relaxed);
|
|
}
|
|
abort_compression = true;
|
|
}
|
|
if (is_data_block) {
|
|
r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize,
|
|
std::memory_order_relaxed);
|
|
}
|
|
|
|
// Abort compression if the block is too big, or did not pass
|
|
// verification.
|
|
if (abort_compression) {
|
|
RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED);
|
|
*type = kNoCompression;
|
|
*block_contents = raw_block_contents;
|
|
} else if (*type != kNoCompression) {
|
|
if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)) {
|
|
RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
|
|
timer.ElapsedNanos());
|
|
}
|
|
RecordInHistogram(r->ioptions.stats, BYTES_COMPRESSED,
|
|
raw_block_contents.size());
|
|
RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED);
|
|
} else if (*type != r->compression_type) {
|
|
RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
|
|
CompressionType type,
|
|
BlockHandle* handle,
|
|
BlockType block_type,
|
|
const Slice* raw_block_contents,
|
|
bool is_top_level_filter_block) {
|
|
Rep* r = rep_;
|
|
bool is_data_block = block_type == BlockType::kData;
|
|
StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
|
|
handle->set_offset(r->get_offset());
|
|
handle->set_size(block_contents.size());
|
|
assert(status().ok());
|
|
assert(io_status().ok());
|
|
|
|
{
|
|
IOStatus io_s = r->file->Append(block_contents);
|
|
if (!io_s.ok()) {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
std::array<char, kBlockTrailerSize> trailer;
|
|
trailer[0] = type;
|
|
uint32_t checksum = ComputeBuiltinChecksumWithLastByte(
|
|
r->table_options.checksum, block_contents.data(), block_contents.size(),
|
|
/*last_byte*/ type);
|
|
|
|
if (block_type == BlockType::kFilter) {
|
|
Status s = r->filter_builder->MaybePostVerifyFilter(block_contents);
|
|
if (!s.ok()) {
|
|
r->SetStatus(s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
EncodeFixed32(trailer.data() + 1, checksum);
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
|
|
trailer.data());
|
|
{
|
|
IOStatus io_s = r->file->Append(Slice(trailer.data(), trailer.size()));
|
|
if (!io_s.ok()) {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
{
|
|
Status s = Status::OK();
|
|
bool warm_cache;
|
|
switch (r->table_options.prepopulate_block_cache) {
|
|
case BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly:
|
|
warm_cache = (r->reason == TableFileCreationReason::kFlush);
|
|
break;
|
|
case BlockBasedTableOptions::PrepopulateBlockCache::kDisable:
|
|
warm_cache = false;
|
|
break;
|
|
default:
|
|
// missing case
|
|
assert(false);
|
|
warm_cache = false;
|
|
}
|
|
if (warm_cache) {
|
|
if (type == kNoCompression) {
|
|
s = InsertBlockInCacheHelper(block_contents, handle, block_type,
|
|
is_top_level_filter_block);
|
|
} else if (raw_block_contents != nullptr) {
|
|
s = InsertBlockInCacheHelper(*raw_block_contents, handle, block_type,
|
|
is_top_level_filter_block);
|
|
}
|
|
if (!s.ok()) {
|
|
r->SetStatus(s);
|
|
return;
|
|
}
|
|
}
|
|
s = InsertBlockInCompressedCache(block_contents, type, handle);
|
|
if (!s.ok()) {
|
|
r->SetStatus(s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
r->set_offset(r->get_offset() + block_contents.size() + kBlockTrailerSize);
|
|
if (r->table_options.block_align && is_data_block) {
|
|
size_t pad_bytes =
|
|
(r->alignment -
|
|
((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) &
|
|
(r->alignment - 1);
|
|
IOStatus io_s = r->file->Pad(pad_bytes);
|
|
if (io_s.ok()) {
|
|
r->set_offset(r->get_offset() + pad_bytes);
|
|
} else {
|
|
r->SetIOStatus(io_s);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
if (is_data_block) {
|
|
r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
|
|
r->get_offset());
|
|
} else {
|
|
r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
|
|
}
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
|
|
Rep* r = rep_;
|
|
ParallelCompressionRep::BlockRepSlot* slot = nullptr;
|
|
ParallelCompressionRep::BlockRep* block_rep = nullptr;
|
|
while (r->pc_rep->write_queue.pop(slot)) {
|
|
assert(slot != nullptr);
|
|
slot->Take(block_rep);
|
|
assert(block_rep != nullptr);
|
|
if (!block_rep->status.ok()) {
|
|
r->SetStatus(block_rep->status);
|
|
// Reap block so that blocked Flush() can finish
|
|
// if there is one, and Flush() will notice !ok() next time.
|
|
block_rep->status = Status::OK();
|
|
r->pc_rep->ReapBlock(block_rep);
|
|
continue;
|
|
}
|
|
|
|
for (size_t i = 0; i < block_rep->keys->Size(); i++) {
|
|
auto& key = (*block_rep->keys)[i];
|
|
if (r->filter_builder != nullptr) {
|
|
size_t ts_sz =
|
|
r->internal_comparator.user_comparator()->timestamp_size();
|
|
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
|
|
}
|
|
r->index_builder->OnKeyAdded(key);
|
|
}
|
|
|
|
r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());
|
|
WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
|
|
&r->pending_handle, BlockType::kData, &block_rep->contents);
|
|
if (!ok()) {
|
|
break;
|
|
}
|
|
|
|
if (r->filter_builder != nullptr) {
|
|
r->filter_builder->StartBlock(r->get_offset());
|
|
}
|
|
r->props.data_size = r->get_offset();
|
|
++r->props.num_data_blocks;
|
|
|
|
if (block_rep->first_key_in_next_block == nullptr) {
|
|
r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr,
|
|
r->pending_handle);
|
|
} else {
|
|
Slice first_key_in_next_block =
|
|
Slice(*block_rep->first_key_in_next_block);
|
|
r->index_builder->AddIndexEntry(&(block_rep->keys->Back()),
|
|
&first_key_in_next_block,
|
|
r->pending_handle);
|
|
}
|
|
|
|
r->pc_rep->ReapBlock(block_rep);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::StartParallelCompression() {
|
|
rep_->pc_rep.reset(
|
|
new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
|
|
rep_->pc_rep->compress_thread_pool.reserve(
|
|
rep_->compression_opts.parallel_threads);
|
|
for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
|
|
rep_->pc_rep->compress_thread_pool.emplace_back([this, i] {
|
|
BGWorkCompression(*(rep_->compression_ctxs[i]),
|
|
rep_->verify_ctxs[i].get());
|
|
});
|
|
}
|
|
rep_->pc_rep->write_thread.reset(
|
|
new port::Thread([this] { BGWorkWriteRawBlock(); }));
|
|
}
|
|
|
|
void BlockBasedTableBuilder::StopParallelCompression() {
|
|
rep_->pc_rep->compress_queue.finish();
|
|
for (auto& thread : rep_->pc_rep->compress_thread_pool) {
|
|
thread.join();
|
|
}
|
|
rep_->pc_rep->write_queue.finish();
|
|
rep_->pc_rep->write_thread->join();
|
|
}
|
|
|
|
Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
|
|
|
|
IOStatus BlockBasedTableBuilder::io_status() const {
|
|
return rep_->GetIOStatus();
|
|
}
|
|
|
|
namespace {
|
|
// Delete the entry resided in the cache.
|
|
template <class Entry>
|
|
void DeleteEntryCached(const Slice& /*key*/, void* value) {
|
|
auto entry = reinterpret_cast<Entry*>(value);
|
|
delete entry;
|
|
}
|
|
} // namespace
|
|
|
|
//
|
|
// Make a copy of the block contents and insert into compressed block cache
|
|
//
|
|
Status BlockBasedTableBuilder::InsertBlockInCompressedCache(
|
|
const Slice& block_contents, const CompressionType type,
|
|
const BlockHandle* handle) {
|
|
Rep* r = rep_;
|
|
Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
|
|
Status s;
|
|
if (type != kNoCompression && block_cache_compressed != nullptr) {
|
|
size_t size = block_contents.size();
|
|
|
|
auto ubuf =
|
|
AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
|
|
memcpy(ubuf.get(), block_contents.data(), size);
|
|
ubuf[size] = type;
|
|
|
|
BlockContents* block_contents_to_cache =
|
|
new BlockContents(std::move(ubuf), size);
|
|
#ifndef NDEBUG
|
|
block_contents_to_cache->is_raw_block = true;
|
|
#endif // NDEBUG
|
|
|
|
CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
|
|
|
|
s = block_cache_compressed->Insert(
|
|
key.AsSlice(), block_contents_to_cache,
|
|
block_contents_to_cache->ApproximateMemoryUsage(),
|
|
&DeleteEntryCached<BlockContents>);
|
|
if (s.ok()) {
|
|
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_COMPRESSED_ADD);
|
|
} else {
|
|
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
|
|
}
|
|
// Invalidate OS cache.
|
|
r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size)
|
|
.PermitUncheckedError();
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status BlockBasedTableBuilder::InsertBlockInCacheHelper(
|
|
const Slice& block_contents, const BlockHandle* handle,
|
|
BlockType block_type, bool is_top_level_filter_block) {
|
|
Status s;
|
|
if (block_type == BlockType::kData || block_type == BlockType::kIndex) {
|
|
s = InsertBlockInCache<Block>(block_contents, handle, block_type);
|
|
} else if (block_type == BlockType::kFilter) {
|
|
if (rep_->filter_builder->IsBlockBased()) {
|
|
// for block-based filter which is deprecated.
|
|
s = InsertBlockInCache<BlockContents>(block_contents, handle, block_type);
|
|
} else if (is_top_level_filter_block) {
|
|
// for top level filter block in partitioned filter.
|
|
s = InsertBlockInCache<Block>(block_contents, handle, block_type);
|
|
} else {
|
|
// for second level partitioned filters and full filters.
|
|
s = InsertBlockInCache<ParsedFullFilterBlock>(block_contents, handle,
|
|
block_type);
|
|
}
|
|
} else if (block_type == BlockType::kCompressionDictionary) {
|
|
s = InsertBlockInCache<UncompressionDict>(block_contents, handle,
|
|
block_type);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
template <typename TBlocklike>
|
|
Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
|
|
const BlockHandle* handle,
|
|
BlockType block_type) {
|
|
// Uncompressed regular block cache
|
|
Cache* block_cache = rep_->table_options.block_cache.get();
|
|
Status s;
|
|
if (block_cache != nullptr) {
|
|
size_t size = block_contents.size();
|
|
auto buf = AllocateBlock(size, block_cache->memory_allocator());
|
|
memcpy(buf.get(), block_contents.data(), size);
|
|
BlockContents results(std::move(buf), size);
|
|
|
|
CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
|
|
|
|
const size_t read_amp_bytes_per_bit =
|
|
rep_->table_options.read_amp_bytes_per_bit;
|
|
|
|
// TODO akanksha:: Dedup below code by calling
|
|
// BlockBasedTable::PutDataBlockToCache.
|
|
std::unique_ptr<TBlocklike> block_holder(
|
|
BlocklikeTraits<TBlocklike>::Create(
|
|
std::move(results), read_amp_bytes_per_bit,
|
|
rep_->ioptions.statistics.get(),
|
|
false /*rep_->blocks_definitely_zstd_compressed*/,
|
|
rep_->table_options.filter_policy.get()));
|
|
|
|
assert(block_holder->own_bytes());
|
|
size_t charge = block_holder->ApproximateMemoryUsage();
|
|
s = block_cache->Insert(
|
|
key.AsSlice(), block_holder.get(),
|
|
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), charge,
|
|
nullptr, Cache::Priority::LOW);
|
|
|
|
if (s.ok()) {
|
|
// Release ownership of block_holder.
|
|
block_holder.release();
|
|
BlockBasedTable::UpdateCacheInsertionMetrics(
|
|
block_type, nullptr /*get_context*/, charge, s.IsOkOverwritten(),
|
|
rep_->ioptions.stats);
|
|
} else {
|
|
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteFilterBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
BlockHandle filter_block_handle;
|
|
bool empty_filter_block =
|
|
(rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty());
|
|
if (ok() && !empty_filter_block) {
|
|
rep_->props.num_filter_entries +=
|
|
rep_->filter_builder->EstimateEntriesAdded();
|
|
Status s = Status::Incomplete();
|
|
while (ok() && s.IsIncomplete()) {
|
|
// filter_data is used to store the transferred filter data payload from
|
|
// FilterBlockBuilder and deallocate the payload by going out of scope.
|
|
// Otherwise, the payload will unnecessarily remain until
|
|
// BlockBasedTableBuilder is deallocated.
|
|
//
|
|
// See FilterBlockBuilder::Finish() for more on the difference in
|
|
// transferred filter data payload among different FilterBlockBuilder
|
|
// subtypes.
|
|
std::unique_ptr<const char[]> filter_data;
|
|
Slice filter_content =
|
|
rep_->filter_builder->Finish(filter_block_handle, &s, &filter_data);
|
|
|
|
assert(s.ok() || s.IsIncomplete() || s.IsCorruption());
|
|
if (s.IsCorruption()) {
|
|
rep_->SetStatus(s);
|
|
break;
|
|
}
|
|
|
|
rep_->props.filter_size += filter_content.size();
|
|
|
|
// TODO: Refactor code so that BlockType can determine both the C++ type
|
|
// of a block cache entry (TBlocklike) and the CacheEntryRole while
|
|
// inserting blocks in cache.
|
|
bool top_level_filter_block = false;
|
|
if (s.ok() && rep_->table_options.partition_filters &&
|
|
!rep_->filter_builder->IsBlockBased()) {
|
|
top_level_filter_block = true;
|
|
}
|
|
WriteRawBlock(filter_content, kNoCompression, &filter_block_handle,
|
|
BlockType::kFilter, nullptr /*raw_contents*/,
|
|
top_level_filter_block);
|
|
}
|
|
rep_->filter_builder->ResetFilterBitsBuilder();
|
|
}
|
|
if (ok() && !empty_filter_block) {
|
|
// Add mapping from "<filter_block_prefix>.Name" to location
|
|
// of filter data.
|
|
std::string key;
|
|
if (rep_->filter_builder->IsBlockBased()) {
|
|
key = BlockBasedTable::kFilterBlockPrefix;
|
|
} else {
|
|
key = rep_->table_options.partition_filters
|
|
? BlockBasedTable::kPartitionedFilterBlockPrefix
|
|
: BlockBasedTable::kFullFilterBlockPrefix;
|
|
}
|
|
key.append(rep_->table_options.filter_policy->CompatibilityName());
|
|
meta_index_builder->Add(key, filter_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteIndexBlock(
|
|
MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
|
|
if (!ok()) {
|
|
return;
|
|
}
|
|
IndexBuilder::IndexBlocks index_blocks;
|
|
auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
|
|
if (index_builder_status.IsIncomplete()) {
|
|
// We we have more than one index partition then meta_blocks are not
|
|
// supported for the index. Currently meta_blocks are used only by
|
|
// HashIndexBuilder which is not multi-partition.
|
|
assert(index_blocks.meta_blocks.empty());
|
|
} else if (ok() && !index_builder_status.ok()) {
|
|
rep_->SetStatus(index_builder_status);
|
|
}
|
|
if (ok()) {
|
|
for (const auto& item : index_blocks.meta_blocks) {
|
|
BlockHandle block_handle;
|
|
WriteBlock(item.second, &block_handle, BlockType::kIndex);
|
|
if (!ok()) {
|
|
break;
|
|
}
|
|
meta_index_builder->Add(item.first, block_handle);
|
|
}
|
|
}
|
|
if (ok()) {
|
|
if (rep_->table_options.enable_index_compression) {
|
|
WriteBlock(index_blocks.index_block_contents, index_block_handle,
|
|
BlockType::kIndex);
|
|
} else {
|
|
WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
|
|
index_block_handle, BlockType::kIndex);
|
|
}
|
|
}
|
|
// If there are more index partitions, finish them and write them out
|
|
if (index_builder_status.IsIncomplete()) {
|
|
bool index_building_finished = false;
|
|
while (ok() && !index_building_finished) {
|
|
Status s =
|
|
rep_->index_builder->Finish(&index_blocks, *index_block_handle);
|
|
if (s.ok()) {
|
|
index_building_finished = true;
|
|
} else if (s.IsIncomplete()) {
|
|
// More partitioned index after this one
|
|
assert(!index_building_finished);
|
|
} else {
|
|
// Error
|
|
rep_->SetStatus(s);
|
|
return;
|
|
}
|
|
|
|
if (rep_->table_options.enable_index_compression) {
|
|
WriteBlock(index_blocks.index_block_contents, index_block_handle,
|
|
BlockType::kIndex);
|
|
} else {
|
|
WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
|
|
index_block_handle, BlockType::kIndex);
|
|
}
|
|
// The last index_block_handle will be for the partition index block
|
|
}
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WritePropertiesBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
BlockHandle properties_block_handle;
|
|
if (ok()) {
|
|
PropertyBlockBuilder property_block_builder;
|
|
rep_->props.filter_policy_name =
|
|
rep_->table_options.filter_policy != nullptr
|
|
? rep_->table_options.filter_policy->Name()
|
|
: "";
|
|
rep_->props.index_size =
|
|
rep_->index_builder->IndexSize() + kBlockTrailerSize;
|
|
rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
|
|
? rep_->ioptions.user_comparator->Name()
|
|
: "nullptr";
|
|
rep_->props.merge_operator_name =
|
|
rep_->ioptions.merge_operator != nullptr
|
|
? rep_->ioptions.merge_operator->Name()
|
|
: "nullptr";
|
|
rep_->props.compression_name =
|
|
CompressionTypeToString(rep_->compression_type);
|
|
rep_->props.compression_options =
|
|
CompressionOptionsToString(rep_->compression_opts);
|
|
rep_->props.prefix_extractor_name =
|
|
rep_->moptions.prefix_extractor != nullptr
|
|
? rep_->moptions.prefix_extractor->AsString()
|
|
: "nullptr";
|
|
std::string property_collectors_names = "[";
|
|
for (size_t i = 0;
|
|
i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
|
|
if (i != 0) {
|
|
property_collectors_names += ",";
|
|
}
|
|
property_collectors_names +=
|
|
rep_->ioptions.table_properties_collector_factories[i]->Name();
|
|
}
|
|
property_collectors_names += "]";
|
|
rep_->props.property_collectors_names = property_collectors_names;
|
|
if (rep_->table_options.index_type ==
|
|
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
|
assert(rep_->p_index_builder_ != nullptr);
|
|
rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
|
|
rep_->props.top_level_index_size =
|
|
rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
|
|
}
|
|
rep_->props.index_key_is_user_key =
|
|
!rep_->index_builder->seperator_is_key_plus_seq();
|
|
rep_->props.index_value_is_delta_encoded =
|
|
rep_->use_delta_encoding_for_index_values;
|
|
if (rep_->sampled_input_data_bytes > 0) {
|
|
rep_->props.slow_compression_estimated_data_size = static_cast<uint64_t>(
|
|
static_cast<double>(rep_->sampled_output_slow_data_bytes) /
|
|
rep_->sampled_input_data_bytes *
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes + 0.5);
|
|
rep_->props.fast_compression_estimated_data_size = static_cast<uint64_t>(
|
|
static_cast<double>(rep_->sampled_output_fast_data_bytes) /
|
|
rep_->sampled_input_data_bytes *
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes + 0.5);
|
|
} else if (rep_->sample_for_compression > 0) {
|
|
// We tried to sample but none were found. Assume worst-case (compression
|
|
// ratio 1.0) so data is complete and aggregatable.
|
|
rep_->props.slow_compression_estimated_data_size =
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes;
|
|
rep_->props.fast_compression_estimated_data_size =
|
|
rep_->compressible_input_data_bytes +
|
|
rep_->uncompressible_input_data_bytes;
|
|
}
|
|
|
|
// Add basic properties
|
|
property_block_builder.AddTableProperty(rep_->props);
|
|
|
|
// Add use collected properties
|
|
NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
|
|
rep_->ioptions.logger,
|
|
&property_block_builder);
|
|
|
|
Slice block_data = property_block_builder.Finish();
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:BlockData", &block_data);
|
|
WriteRawBlock(block_data, kNoCompression, &properties_block_handle,
|
|
BlockType::kProperties);
|
|
}
|
|
if (ok()) {
|
|
#ifndef NDEBUG
|
|
{
|
|
uint64_t props_block_offset = properties_block_handle.offset();
|
|
uint64_t props_block_size = properties_block_handle.size();
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
|
|
&props_block_offset);
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
|
|
&props_block_size);
|
|
}
|
|
#endif // !NDEBUG
|
|
|
|
const std::string* properties_block_meta = &kPropertiesBlockName;
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WritePropertiesBlock:Meta",
|
|
&properties_block_meta);
|
|
meta_index_builder->Add(*properties_block_meta, properties_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteCompressionDictBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
if (rep_->compression_dict != nullptr &&
|
|
rep_->compression_dict->GetRawDict().size()) {
|
|
BlockHandle compression_dict_block_handle;
|
|
if (ok()) {
|
|
WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
|
|
&compression_dict_block_handle,
|
|
BlockType::kCompressionDictionary);
|
|
#ifndef NDEBUG
|
|
Slice compression_dict = rep_->compression_dict->GetRawDict();
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
|
|
&compression_dict);
|
|
#endif // NDEBUG
|
|
}
|
|
if (ok()) {
|
|
meta_index_builder->Add(kCompressionDictBlockName,
|
|
compression_dict_block_handle);
|
|
}
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteRangeDelBlock(
|
|
MetaIndexBuilder* meta_index_builder) {
|
|
if (ok() && !rep_->range_del_block.empty()) {
|
|
BlockHandle range_del_block_handle;
|
|
WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
|
|
&range_del_block_handle, BlockType::kRangeDeletion);
|
|
meta_index_builder->Add(kRangeDelBlockName, range_del_block_handle);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
|
|
BlockHandle& index_block_handle) {
|
|
Rep* r = rep_;
|
|
// this is guaranteed by BlockBasedTableBuilder's constructor
|
|
assert(r->table_options.checksum == kCRC32c ||
|
|
r->table_options.format_version != 0);
|
|
assert(ok());
|
|
|
|
FooterBuilder footer;
|
|
footer.Build(kBlockBasedTableMagicNumber, r->table_options.format_version,
|
|
r->get_offset(), r->table_options.checksum,
|
|
metaindex_block_handle, index_block_handle);
|
|
IOStatus ios = r->file->Append(footer.GetSlice());
|
|
if (ios.ok()) {
|
|
r->set_offset(r->get_offset() + footer.GetSlice().size());
|
|
} else {
|
|
r->SetIOStatus(ios);
|
|
}
|
|
}
|
|
|
|
void BlockBasedTableBuilder::EnterUnbuffered() {
|
|
Rep* r = rep_;
|
|
assert(r->state == Rep::State::kBuffered);
|
|
r->state = Rep::State::kUnbuffered;
|
|
const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
|
|
? r->compression_opts.zstd_max_train_bytes
|
|
: r->compression_opts.max_dict_bytes;
|
|
const size_t kNumBlocksBuffered = r->data_block_buffers.size();
|
|
if (kNumBlocksBuffered == 0) {
|
|
// The below code is neither safe nor necessary for handling zero data
|
|
// blocks.
|
|
return;
|
|
}
|
|
|
|
// Abstract algebra teaches us that a finite cyclic group (such as the
|
|
// additive group of integers modulo N) can be generated by a number that is
|
|
// coprime with N. Since N is variable (number of buffered data blocks), we
|
|
// must then pick a prime number in order to guarantee coprimeness with any N.
|
|
//
|
|
// One downside of this approach is the spread will be poor when
|
|
// `kPrimeGeneratorRemainder` is close to zero or close to
|
|
// `kNumBlocksBuffered`.
|
|
//
|
|
// Picked a random number between one and one trillion and then chose the
|
|
// next prime number greater than or equal to it.
|
|
const uint64_t kPrimeGenerator = 545055921143ull;
|
|
// Can avoid repeated division by just adding the remainder repeatedly.
|
|
const size_t kPrimeGeneratorRemainder = static_cast<size_t>(
|
|
kPrimeGenerator % static_cast<uint64_t>(kNumBlocksBuffered));
|
|
const size_t kInitSampleIdx = kNumBlocksBuffered / 2;
|
|
|
|
std::string compression_dict_samples;
|
|
std::vector<size_t> compression_dict_sample_lens;
|
|
size_t buffer_idx = kInitSampleIdx;
|
|
for (size_t i = 0;
|
|
i < kNumBlocksBuffered && compression_dict_samples.size() < kSampleBytes;
|
|
++i) {
|
|
size_t copy_len = std::min(kSampleBytes - compression_dict_samples.size(),
|
|
r->data_block_buffers[buffer_idx].size());
|
|
compression_dict_samples.append(r->data_block_buffers[buffer_idx], 0,
|
|
copy_len);
|
|
compression_dict_sample_lens.emplace_back(copy_len);
|
|
|
|
buffer_idx += kPrimeGeneratorRemainder;
|
|
if (buffer_idx >= kNumBlocksBuffered) {
|
|
buffer_idx -= kNumBlocksBuffered;
|
|
}
|
|
}
|
|
|
|
// final data block flushed, now we can generate dictionary from the samples.
|
|
// OK if compression_dict_samples is empty, we'll just get empty dictionary.
|
|
std::string dict;
|
|
if (r->compression_opts.zstd_max_train_bytes > 0) {
|
|
if (r->compression_opts.use_zstd_dict_trainer) {
|
|
dict = ZSTD_TrainDictionary(compression_dict_samples,
|
|
compression_dict_sample_lens,
|
|
r->compression_opts.max_dict_bytes);
|
|
} else {
|
|
dict = ZSTD_FinalizeDictionary(
|
|
compression_dict_samples, compression_dict_sample_lens,
|
|
r->compression_opts.max_dict_bytes, r->compression_opts.level);
|
|
}
|
|
} else {
|
|
dict = std::move(compression_dict_samples);
|
|
}
|
|
r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
|
|
r->compression_opts.level));
|
|
r->verify_dict.reset(new UncompressionDict(
|
|
dict, r->compression_type == kZSTD ||
|
|
r->compression_type == kZSTDNotFinalCompression));
|
|
|
|
auto get_iterator_for_block = [&r](size_t i) {
|
|
auto& data_block = r->data_block_buffers[i];
|
|
assert(!data_block.empty());
|
|
|
|
Block reader{BlockContents{data_block}};
|
|
DataBlockIter* iter = reader.NewDataIterator(
|
|
r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber);
|
|
|
|
iter->SeekToFirst();
|
|
assert(iter->Valid());
|
|
return std::unique_ptr<DataBlockIter>(iter);
|
|
};
|
|
|
|
std::unique_ptr<DataBlockIter> iter = nullptr, next_block_iter = nullptr;
|
|
|
|
for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
|
|
if (iter == nullptr) {
|
|
iter = get_iterator_for_block(i);
|
|
assert(iter != nullptr);
|
|
};
|
|
|
|
if (i + 1 < r->data_block_buffers.size()) {
|
|
next_block_iter = get_iterator_for_block(i + 1);
|
|
}
|
|
|
|
auto& data_block = r->data_block_buffers[i];
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
Slice first_key_in_next_block;
|
|
const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
|
|
if (i + 1 < r->data_block_buffers.size()) {
|
|
assert(next_block_iter != nullptr);
|
|
first_key_in_next_block = next_block_iter->key();
|
|
} else {
|
|
first_key_in_next_block_ptr = r->first_key_in_next_block;
|
|
}
|
|
|
|
std::vector<std::string> keys;
|
|
for (; iter->Valid(); iter->Next()) {
|
|
keys.emplace_back(iter->key().ToString());
|
|
}
|
|
|
|
ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
|
|
r->compression_type, first_key_in_next_block_ptr, &data_block, &keys);
|
|
|
|
assert(block_rep != nullptr);
|
|
r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
|
|
r->get_offset());
|
|
r->pc_rep->EmitBlock(block_rep);
|
|
} else {
|
|
for (; iter->Valid(); iter->Next()) {
|
|
Slice key = iter->key();
|
|
if (r->filter_builder != nullptr) {
|
|
size_t ts_sz =
|
|
r->internal_comparator.user_comparator()->timestamp_size();
|
|
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
|
|
}
|
|
r->index_builder->OnKeyAdded(key);
|
|
}
|
|
WriteBlock(Slice(data_block), &r->pending_handle, BlockType::kData);
|
|
if (ok() && i + 1 < r->data_block_buffers.size()) {
|
|
assert(next_block_iter != nullptr);
|
|
Slice first_key_in_next_block = next_block_iter->key();
|
|
|
|
Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
|
|
|
|
iter->SeekToLast();
|
|
std::string last_key = iter->key().ToString();
|
|
r->index_builder->AddIndexEntry(&last_key, first_key_in_next_block_ptr,
|
|
r->pending_handle);
|
|
}
|
|
}
|
|
std::swap(iter, next_block_iter);
|
|
}
|
|
r->data_block_buffers.clear();
|
|
r->data_begin_offset = 0;
|
|
// Release all reserved cache for data block buffers
|
|
if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
|
|
Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
|
|
r->data_begin_offset);
|
|
s.PermitUncheckedError();
|
|
}
|
|
}
|
|
|
|
Status BlockBasedTableBuilder::Finish() {
|
|
Rep* r = rep_;
|
|
assert(r->state != Rep::State::kClosed);
|
|
bool empty_data_block = r->data_block.empty();
|
|
r->first_key_in_next_block = nullptr;
|
|
Flush();
|
|
if (r->state == Rep::State::kBuffered) {
|
|
EnterUnbuffered();
|
|
}
|
|
if (r->IsParallelCompressionEnabled()) {
|
|
StopParallelCompression();
|
|
#ifndef NDEBUG
|
|
for (const auto& br : r->pc_rep->block_rep_buf) {
|
|
assert(br.status.ok());
|
|
}
|
|
#endif // !NDEBUG
|
|
} else {
|
|
// To make sure properties block is able to keep the accurate size of index
|
|
// block, we will finish writing all index entries first.
|
|
if (ok() && !empty_data_block) {
|
|
r->index_builder->AddIndexEntry(
|
|
&r->last_key, nullptr /* no next data block */, r->pending_handle);
|
|
}
|
|
}
|
|
|
|
// Write meta blocks, metaindex block and footer in the following order.
|
|
// 1. [meta block: filter]
|
|
// 2. [meta block: index]
|
|
// 3. [meta block: compression dictionary]
|
|
// 4. [meta block: range deletion tombstone]
|
|
// 5. [meta block: properties]
|
|
// 6. [metaindex block]
|
|
// 7. Footer
|
|
BlockHandle metaindex_block_handle, index_block_handle;
|
|
MetaIndexBuilder meta_index_builder;
|
|
WriteFilterBlock(&meta_index_builder);
|
|
WriteIndexBlock(&meta_index_builder, &index_block_handle);
|
|
WriteCompressionDictBlock(&meta_index_builder);
|
|
WriteRangeDelBlock(&meta_index_builder);
|
|
WritePropertiesBlock(&meta_index_builder);
|
|
if (ok()) {
|
|
// flush the meta index block
|
|
WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
|
|
&metaindex_block_handle, BlockType::kMetaIndex);
|
|
}
|
|
if (ok()) {
|
|
WriteFooter(metaindex_block_handle, index_block_handle);
|
|
}
|
|
r->state = Rep::State::kClosed;
|
|
r->SetStatus(r->CopyIOStatus());
|
|
Status ret_status = r->CopyStatus();
|
|
assert(!ret_status.ok() || io_status().ok());
|
|
return ret_status;
|
|
}
|
|
|
|
void BlockBasedTableBuilder::Abandon() {
|
|
assert(rep_->state != Rep::State::kClosed);
|
|
if (rep_->IsParallelCompressionEnabled()) {
|
|
StopParallelCompression();
|
|
}
|
|
rep_->state = Rep::State::kClosed;
|
|
rep_->CopyStatus().PermitUncheckedError();
|
|
rep_->CopyIOStatus().PermitUncheckedError();
|
|
}
|
|
|
|
uint64_t BlockBasedTableBuilder::NumEntries() const {
|
|
return rep_->props.num_entries;
|
|
}
|
|
|
|
bool BlockBasedTableBuilder::IsEmpty() const {
|
|
return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
|
|
}
|
|
|
|
uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
|
|
|
|
uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
|
|
if (rep_->IsParallelCompressionEnabled()) {
|
|
// Use compression ratio so far and inflight raw bytes to estimate
|
|
// final SST size.
|
|
return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
|
|
} else {
|
|
return FileSize();
|
|
}
|
|
}
|
|
|
|
bool BlockBasedTableBuilder::NeedCompact() const {
|
|
for (const auto& collector : rep_->table_properties_collectors) {
|
|
if (collector->NeedCompact()) {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
TableProperties BlockBasedTableBuilder::GetTableProperties() const {
|
|
TableProperties ret = rep_->props;
|
|
for (const auto& collector : rep_->table_properties_collectors) {
|
|
for (const auto& prop : collector->GetReadableProperties()) {
|
|
ret.readable_properties.insert(prop);
|
|
}
|
|
collector->Finish(&ret.user_collected_properties).PermitUncheckedError();
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
std::string BlockBasedTableBuilder::GetFileChecksum() const {
|
|
if (rep_->file != nullptr) {
|
|
return rep_->file->GetFileChecksum();
|
|
} else {
|
|
return kUnknownFileChecksum;
|
|
}
|
|
}
|
|
|
|
const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
|
|
if (rep_->file != nullptr) {
|
|
return rep_->file->GetFileChecksumFuncName();
|
|
} else {
|
|
return kUnknownFileChecksumFuncName;
|
|
}
|
|
}
|
|
|
|
const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
|
|
const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
|
|
const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
|
|
"partitionedfilter.";
|
|
} // namespace ROCKSDB_NAMESPACE
|