Builders for partition filter
Summary: This is the second split of this pull request: https://github.com/facebook/rocksdb/pull/1891 which includes only the builder part. The testing will be included in the third split, where the reader is also included. Closes https://github.com/facebook/rocksdb/pull/1952 Differential Revision: D4660272 Pulled By: maysamyabandeh fbshipit-source-id: 36b3cf0
This commit is contained in:
parent
97edc72d39
commit
54b434110e
@ -148,6 +148,15 @@ struct BlockBasedTableOptions {
|
||||
// i.e., the number of data blocks covered by each index partition
|
||||
uint64_t index_per_partition = 1024;
|
||||
|
||||
// Note: currently this option requires kTwoLevelIndexSearch to be set as
|
||||
// well.
|
||||
// TODO(myabandeh): remove the note above once the limitation is lifted
|
||||
// TODO(myabandeh): this feature is in experimental phase and shall not be
|
||||
// used in production; either remove the feature or remove this comment if
|
||||
// it is ready to be used in production.
|
||||
// Use partitioned full filters for each SST file
|
||||
bool partition_filters = false;
|
||||
|
||||
// Use delta encoding to compress keys in blocks.
|
||||
// ReadOptions::pin_data requires this option to be disabled.
|
||||
//
|
||||
|
@ -113,7 +113,10 @@ inline void BlockBasedFilterBlockBuilder::AddPrefix(const Slice& key) {
|
||||
}
|
||||
}
|
||||
|
||||
Slice BlockBasedFilterBlockBuilder::Finish() {
|
||||
Slice BlockBasedFilterBlockBuilder::Finish(const BlockHandle& tmp,
|
||||
Status* status) {
|
||||
// In this impl we ignore BlockHandle
|
||||
*status = Status::OK();
|
||||
if (!start_.empty()) {
|
||||
GenerateFilter();
|
||||
}
|
||||
|
@ -41,7 +41,8 @@ class BlockBasedFilterBlockBuilder : public FilterBlockBuilder {
|
||||
virtual bool IsBlockBased() override { return true; }
|
||||
virtual void StartBlock(uint64_t block_offset) override;
|
||||
virtual void Add(const Slice& key) override;
|
||||
virtual Slice Finish() override;
|
||||
virtual Slice Finish(const BlockHandle& tmp, Status* status) override;
|
||||
using FilterBlockBuilder::Finish;
|
||||
|
||||
private:
|
||||
void AddKey(const Slice& key);
|
||||
|
@ -38,9 +38,7 @@
|
||||
#include "table/filter_block.h"
|
||||
#include "table/format.h"
|
||||
#include "table/full_filter_block.h"
|
||||
#include "table/index_builder.h"
|
||||
#include "table/meta_blocks.h"
|
||||
#include "table/partitioned_filter_block.h"
|
||||
#include "table/table_builder.h"
|
||||
|
||||
#include "util/string_util.h"
|
||||
@ -50,6 +48,9 @@
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/xxhash.h"
|
||||
|
||||
#include "table/index_builder.h"
|
||||
#include "table/partitioned_filter_block.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
extern const std::string kHashIndexPrefixesBlock;
|
||||
@ -60,9 +61,10 @@ typedef BlockBasedTableOptions::IndexType IndexType;
|
||||
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
|
||||
namespace {
|
||||
|
||||
// Create a index builder based on its type.
|
||||
FilterBlockBuilder* CreateFilterBlockBuilder(const ImmutableCFOptions& opt,
|
||||
const BlockBasedTableOptions& table_opt) {
|
||||
// Create a filter block builder based on its type.
|
||||
FilterBlockBuilder* CreateFilterBlockBuilder(
|
||||
const ImmutableCFOptions& opt, const BlockBasedTableOptions& table_opt,
|
||||
PartitionedIndexBuilder* const p_index_builder) {
|
||||
if (table_opt.filter_policy == nullptr) return nullptr;
|
||||
|
||||
FilterBitsBuilder* filter_bits_builder =
|
||||
@ -70,9 +72,17 @@ FilterBlockBuilder* CreateFilterBlockBuilder(const ImmutableCFOptions& opt,
|
||||
if (filter_bits_builder == nullptr) {
|
||||
return new BlockBasedFilterBlockBuilder(opt.prefix_extractor, table_opt);
|
||||
} else {
|
||||
return new FullFilterBlockBuilder(opt.prefix_extractor,
|
||||
table_opt.whole_key_filtering,
|
||||
filter_bits_builder);
|
||||
if (table_opt.partition_filters) {
|
||||
assert(p_index_builder != nullptr);
|
||||
return new PartitionedFilterBlockBuilder(
|
||||
opt.prefix_extractor, table_opt.whole_key_filtering,
|
||||
filter_bits_builder, table_opt.index_block_restart_interval,
|
||||
p_index_builder);
|
||||
} else {
|
||||
return new FullFilterBlockBuilder(opt.prefix_extractor,
|
||||
table_opt.whole_key_filtering,
|
||||
filter_bits_builder);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -246,7 +256,7 @@ struct BlockBasedTableBuilder::Rep {
|
||||
TableProperties props;
|
||||
|
||||
bool closed = false; // Either Finish() or Abandon() has been called.
|
||||
std::unique_ptr<FilterBlockBuilder> filter_block;
|
||||
std::unique_ptr<FilterBlockBuilder> filter_builder;
|
||||
char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
|
||||
size_t compressed_cache_key_prefix_size;
|
||||
|
||||
@ -277,21 +287,32 @@ struct BlockBasedTableBuilder::Rep {
|
||||
table_options.use_delta_encoding),
|
||||
range_del_block(1), // TODO(andrewkr): restart_interval unnecessary
|
||||
internal_prefix_transform(_ioptions.prefix_extractor),
|
||||
index_builder(IndexBuilder::CreateIndexBuilder(
|
||||
table_options.index_type, &internal_comparator,
|
||||
&this->internal_prefix_transform,
|
||||
table_options.index_block_restart_interval,
|
||||
table_options.index_per_partition)),
|
||||
compression_type(_compression_type),
|
||||
compression_opts(_compression_opts),
|
||||
compression_dict(_compression_dict),
|
||||
filter_block(skip_filters ? nullptr : CreateFilterBlockBuilder(
|
||||
_ioptions, table_options)),
|
||||
flush_block_policy(
|
||||
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
|
||||
table_options, data_block)),
|
||||
column_family_id(_column_family_id),
|
||||
column_family_name(_column_family_name) {
|
||||
PartitionedIndexBuilder* p_index_builder = nullptr;
|
||||
if (table_options.index_type ==
|
||||
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
||||
p_index_builder = PartitionedIndexBuilder::CreateIndexBuilder(
|
||||
&internal_comparator, table_options);
|
||||
index_builder.reset(p_index_builder);
|
||||
} else {
|
||||
index_builder.reset(IndexBuilder::CreateIndexBuilder(
|
||||
table_options.index_type, &internal_comparator,
|
||||
&this->internal_prefix_transform, table_options));
|
||||
}
|
||||
if (skip_filters) {
|
||||
filter_builder = nullptr;
|
||||
} else {
|
||||
filter_builder.reset(
|
||||
CreateFilterBlockBuilder(_ioptions, table_options, p_index_builder));
|
||||
}
|
||||
|
||||
for (auto& collector_factories : *int_tbl_prop_collector_factories) {
|
||||
table_properties_collectors.emplace_back(
|
||||
collector_factories->CreateIntTblPropCollector(column_family_id));
|
||||
@ -330,8 +351,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
|
||||
compression_type, compression_opts, compression_dict,
|
||||
skip_filters, column_family_name);
|
||||
|
||||
if (rep_->filter_block != nullptr) {
|
||||
rep_->filter_block->StartBlock(0);
|
||||
if (rep_->filter_builder != nullptr) {
|
||||
rep_->filter_builder->StartBlock(0);
|
||||
}
|
||||
if (table_options.block_cache_compressed.get() != nullptr) {
|
||||
BlockBasedTable::GenerateCachePrefix(
|
||||
@ -374,8 +395,10 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
|
||||
}
|
||||
}
|
||||
|
||||
if (r->filter_block != nullptr) {
|
||||
r->filter_block->Add(ExtractUserKey(key));
|
||||
// Note: PartitionedFilterBlockBuilder requires key being added to filter
|
||||
// builder after being added to index builder.
|
||||
if (r->filter_builder != nullptr) {
|
||||
r->filter_builder->Add(ExtractUserKey(key));
|
||||
}
|
||||
|
||||
r->last_key.assign(key.data(), key.size());
|
||||
@ -409,8 +432,8 @@ void BlockBasedTableBuilder::Flush() {
|
||||
if (!ok()) return;
|
||||
if (r->data_block.empty()) return;
|
||||
WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
|
||||
if (r->filter_block != nullptr) {
|
||||
r->filter_block->StartBlock(r->offset);
|
||||
if (r->filter_builder != nullptr) {
|
||||
r->filter_builder->StartBlock(r->offset);
|
||||
}
|
||||
r->props.data_size = r->offset;
|
||||
++r->props.num_data_blocks;
|
||||
@ -600,15 +623,6 @@ Status BlockBasedTableBuilder::Finish() {
|
||||
assert(!r->closed);
|
||||
r->closed = true;
|
||||
|
||||
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle,
|
||||
compression_dict_block_handle, range_del_block_handle;
|
||||
// Write filter block
|
||||
if (ok() && r->filter_block != nullptr) {
|
||||
auto filter_contents = r->filter_block->Finish();
|
||||
r->props.filter_size = filter_contents.size();
|
||||
WriteRawBlock(filter_contents, kNoCompression, &filter_block_handle);
|
||||
}
|
||||
|
||||
// To make sure properties block is able to keep the accurate size of index
|
||||
// block, we will finish writing all index entries here and flush them
|
||||
// to storage after metaindex block is written.
|
||||
@ -617,6 +631,19 @@ Status BlockBasedTableBuilder::Finish() {
|
||||
&r->last_key, nullptr /* no next data block */, r->pending_handle);
|
||||
}
|
||||
|
||||
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle,
|
||||
compression_dict_block_handle, range_del_block_handle;
|
||||
// Write filter block
|
||||
if (ok() && r->filter_builder != nullptr) {
|
||||
Status s = Status::Incomplete();
|
||||
while (s.IsIncomplete()) {
|
||||
Slice filter_content = r->filter_builder->Finish(filter_block_handle, &s);
|
||||
assert(s.ok() || s.IsIncomplete());
|
||||
r->props.filter_size += filter_content.size();
|
||||
WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
|
||||
}
|
||||
}
|
||||
|
||||
IndexBuilder::IndexBlocks index_blocks;
|
||||
auto index_builder_status = r->index_builder->Finish(&index_blocks);
|
||||
if (index_builder_status.IsIncomplete()) {
|
||||
@ -643,14 +670,16 @@ Status BlockBasedTableBuilder::Finish() {
|
||||
}
|
||||
|
||||
if (ok()) {
|
||||
if (r->filter_block != nullptr) {
|
||||
if (r->filter_builder != nullptr) {
|
||||
// Add mapping from "<filter_block_prefix>.Name" to location
|
||||
// of filter data.
|
||||
std::string key;
|
||||
if (r->filter_block->IsBlockBased()) {
|
||||
if (r->filter_builder->IsBlockBased()) {
|
||||
key = BlockBasedTable::kFilterBlockPrefix;
|
||||
} else {
|
||||
key = BlockBasedTable::kFullFilterBlockPrefix;
|
||||
key = r->table_options.partition_filters
|
||||
? BlockBasedTable::kPartitionedFilterBlockPrefix
|
||||
: BlockBasedTable::kFullFilterBlockPrefix;
|
||||
}
|
||||
key.append(r->table_options.filter_policy->Name());
|
||||
meta_index_builder.Add(key, filter_block_handle);
|
||||
@ -810,4 +839,6 @@ TableProperties BlockBasedTableBuilder::GetTableProperties() const {
|
||||
|
||||
const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
|
||||
const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
|
||||
const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
|
||||
"partitionedfilter.";
|
||||
} // namespace rocksdb
|
||||
|
@ -63,6 +63,7 @@ class BlockBasedTable : public TableReader {
|
||||
public:
|
||||
static const std::string kFilterBlockPrefix;
|
||||
static const std::string kFullFilterBlockPrefix;
|
||||
static const std::string kPartitionedFilterBlockPrefix;
|
||||
// The longest prefix of the cache key used to identify blocks.
|
||||
// For Posix files the unique ID is three varints.
|
||||
static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length * 3 + 1;
|
||||
|
@ -51,7 +51,14 @@ class FilterBlockBuilder {
|
||||
virtual bool IsBlockBased() = 0; // If is blockbased filter
|
||||
virtual void StartBlock(uint64_t block_offset) = 0; // Start new block filter
|
||||
virtual void Add(const Slice& key) = 0; // Add a key to current filter
|
||||
virtual Slice Finish() = 0; // Generate Filter
|
||||
Slice Finish() { // Generate Filter
|
||||
const BlockHandle empty_handle;
|
||||
Status dont_care_status;
|
||||
auto ret = Finish(empty_handle, &dont_care_status);
|
||||
assert(dont_care_status.ok());
|
||||
return ret;
|
||||
}
|
||||
virtual Slice Finish(const BlockHandle& tmp, Status* status) = 0;
|
||||
|
||||
private:
|
||||
// No copying allowed
|
||||
|
@ -40,11 +40,12 @@ inline void FullFilterBlockBuilder::AddKey(const Slice& key) {
|
||||
// Add prefix to filter if needed
|
||||
inline void FullFilterBlockBuilder::AddPrefix(const Slice& key) {
|
||||
Slice prefix = prefix_extractor_->Transform(key);
|
||||
filter_bits_builder_->AddKey(prefix);
|
||||
num_added_++;
|
||||
AddKey(prefix);
|
||||
}
|
||||
|
||||
Slice FullFilterBlockBuilder::Finish() {
|
||||
Slice FullFilterBlockBuilder::Finish(const BlockHandle& tmp, Status* status) {
|
||||
// In this impl we ignore BlockHandle
|
||||
*status = Status::OK();
|
||||
if (num_added_ != 0) {
|
||||
num_added_ = 0;
|
||||
return filter_bits_builder_->Finish(&filter_data_);
|
||||
@ -73,7 +74,7 @@ FullFilterBlockReader::FullFilterBlockReader(
|
||||
}
|
||||
|
||||
bool FullFilterBlockReader::KeyMayMatch(const Slice& key,
|
||||
uint64_t block_offset) {
|
||||
uint64_t block_offset) {
|
||||
assert(block_offset == kNotValid);
|
||||
if (!whole_key_filtering_) {
|
||||
return true;
|
||||
|
@ -45,7 +45,12 @@ class FullFilterBlockBuilder : public FilterBlockBuilder {
|
||||
virtual bool IsBlockBased() override { return false; }
|
||||
virtual void StartBlock(uint64_t block_offset) override {}
|
||||
virtual void Add(const Slice& key) override;
|
||||
virtual Slice Finish() override;
|
||||
virtual Slice Finish(const BlockHandle& tmp, Status* status) override;
|
||||
using FilterBlockBuilder::Finish;
|
||||
|
||||
protected:
|
||||
virtual void AddKey(const Slice& key);
|
||||
std::unique_ptr<FilterBitsBuilder> filter_bits_builder_;
|
||||
|
||||
private:
|
||||
// important: all of these might point to invalid addresses
|
||||
@ -55,10 +60,8 @@ class FullFilterBlockBuilder : public FilterBlockBuilder {
|
||||
bool whole_key_filtering_;
|
||||
|
||||
uint32_t num_added_;
|
||||
std::unique_ptr<FilterBitsBuilder> filter_bits_builder_;
|
||||
std::unique_ptr<const char[]> filter_data_;
|
||||
|
||||
void AddKey(const Slice& key);
|
||||
void AddPrefix(const Slice& key);
|
||||
|
||||
// No copying allowed
|
||||
@ -96,16 +99,14 @@ class FullFilterBlockReader : public FilterBlockReader {
|
||||
|
||||
private:
|
||||
const SliceTransform* prefix_extractor_;
|
||||
|
||||
std::unique_ptr<FilterBitsReader> filter_bits_reader_;
|
||||
Slice contents_;
|
||||
std::unique_ptr<FilterBitsReader> filter_bits_reader_;
|
||||
BlockContents block_contents_;
|
||||
std::unique_ptr<const char[]> filter_data_;
|
||||
|
||||
bool MayMatch(const Slice& entry);
|
||||
|
||||
// No copying allowed
|
||||
FullFilterBlockReader(const FullFilterBlockReader&);
|
||||
bool MayMatch(const Slice& entry);
|
||||
void operator=(const FullFilterBlockReader&);
|
||||
};
|
||||
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include "table/index_builder.h"
|
||||
#include <assert.h>
|
||||
#include <inttypes.h>
|
||||
|
||||
#include <list>
|
||||
#include <string>
|
||||
|
||||
@ -24,21 +25,19 @@ namespace rocksdb {
|
||||
IndexBuilder* IndexBuilder::CreateIndexBuilder(
|
||||
BlockBasedTableOptions::IndexType index_type,
|
||||
const InternalKeyComparator* comparator,
|
||||
const SliceTransform* prefix_extractor, int index_block_restart_interval,
|
||||
uint64_t index_per_partition) {
|
||||
const InternalKeySliceTransform* int_key_slice_transform,
|
||||
const BlockBasedTableOptions& table_opt) {
|
||||
switch (index_type) {
|
||||
case BlockBasedTableOptions::kBinarySearch: {
|
||||
return new ShortenedIndexBuilder(comparator,
|
||||
index_block_restart_interval);
|
||||
table_opt.index_block_restart_interval);
|
||||
}
|
||||
case BlockBasedTableOptions::kHashSearch: {
|
||||
return new HashIndexBuilder(comparator, prefix_extractor,
|
||||
index_block_restart_interval);
|
||||
return new HashIndexBuilder(comparator, int_key_slice_transform,
|
||||
table_opt.index_block_restart_interval);
|
||||
}
|
||||
case BlockBasedTableOptions::kTwoLevelIndexSearch: {
|
||||
return new PartitionIndexBuilder(comparator, prefix_extractor,
|
||||
index_per_partition,
|
||||
index_block_restart_interval);
|
||||
return PartitionedIndexBuilder::CreateIndexBuilder(comparator, table_opt);
|
||||
}
|
||||
default: {
|
||||
assert(!"Do not recognize the index type ");
|
||||
@ -49,4 +48,81 @@ IndexBuilder* IndexBuilder::CreateIndexBuilder(
|
||||
assert(false);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
PartitionedIndexBuilder* PartitionedIndexBuilder::CreateIndexBuilder(
|
||||
const InternalKeyComparator* comparator,
|
||||
const BlockBasedTableOptions& table_opt) {
|
||||
return new PartitionedIndexBuilder(comparator, table_opt);
|
||||
}
|
||||
|
||||
PartitionedIndexBuilder::PartitionedIndexBuilder(
|
||||
const InternalKeyComparator* comparator,
|
||||
const BlockBasedTableOptions& table_opt)
|
||||
: IndexBuilder(comparator),
|
||||
index_block_builder_(table_opt.index_block_restart_interval),
|
||||
table_opt_(table_opt) {
|
||||
sub_index_builder_ = IndexBuilder::CreateIndexBuilder(sub_type_, comparator_,
|
||||
nullptr, table_opt_);
|
||||
}
|
||||
|
||||
PartitionedIndexBuilder::~PartitionedIndexBuilder() {
|
||||
delete sub_index_builder_;
|
||||
}
|
||||
|
||||
void PartitionedIndexBuilder::AddIndexEntry(
|
||||
std::string* last_key_in_current_block,
|
||||
const Slice* first_key_in_next_block, const BlockHandle& block_handle) {
|
||||
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
|
||||
first_key_in_next_block, block_handle);
|
||||
num_indexes++;
|
||||
if (UNLIKELY(first_key_in_next_block == nullptr)) { // no more keys
|
||||
entries_.push_back({std::string(*last_key_in_current_block),
|
||||
std::unique_ptr<IndexBuilder>(sub_index_builder_)});
|
||||
sub_index_builder_ = nullptr;
|
||||
cut_filter_block = true;
|
||||
} else if (num_indexes % table_opt_.index_per_partition == 0) {
|
||||
entries_.push_back({std::string(*last_key_in_current_block),
|
||||
std::unique_ptr<IndexBuilder>(sub_index_builder_)});
|
||||
sub_index_builder_ = IndexBuilder::CreateIndexBuilder(
|
||||
sub_type_, comparator_, nullptr, table_opt_);
|
||||
cut_filter_block = true;
|
||||
}
|
||||
}
|
||||
|
||||
Status PartitionedIndexBuilder::Finish(
|
||||
IndexBlocks* index_blocks, const BlockHandle& last_partition_block_handle) {
|
||||
assert(!entries_.empty());
|
||||
// It must be set to null after last key is added
|
||||
assert(sub_index_builder_ == nullptr);
|
||||
if (finishing_indexes == true) {
|
||||
Entry& last_entry = entries_.front();
|
||||
std::string handle_encoding;
|
||||
last_partition_block_handle.EncodeTo(&handle_encoding);
|
||||
index_block_builder_.Add(last_entry.key, handle_encoding);
|
||||
entries_.pop_front();
|
||||
}
|
||||
// If there is no sub_index left, then return the 2nd level index.
|
||||
if (UNLIKELY(entries_.empty())) {
|
||||
index_blocks->index_block_contents = index_block_builder_.Finish();
|
||||
return Status::OK();
|
||||
} else {
|
||||
// Finish the next partition index in line and Incomplete() to indicate we
|
||||
// expect more calls to Finish
|
||||
Entry& entry = entries_.front();
|
||||
auto s = entry.value->Finish(index_blocks);
|
||||
finishing_indexes = true;
|
||||
return s.ok() ? Status::Incomplete() : s;
|
||||
}
|
||||
}
|
||||
|
||||
size_t PartitionedIndexBuilder::EstimatedSize() const {
|
||||
size_t total = 0;
|
||||
for (auto it = entries_.begin(); it != entries_.end(); ++it) {
|
||||
total += it->value->EstimatedSize();
|
||||
}
|
||||
total += index_block_builder_.CurrentSizeEstimate();
|
||||
total +=
|
||||
sub_index_builder_ == nullptr ? 0 : sub_index_builder_->EstimatedSize();
|
||||
return total;
|
||||
}
|
||||
} // namespace rocksdb
|
||||
|
@ -11,6 +11,8 @@
|
||||
|
||||
#include <assert.h>
|
||||
#include <inttypes.h>
|
||||
|
||||
#include <list>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
@ -34,9 +36,9 @@ class IndexBuilder {
|
||||
public:
|
||||
static IndexBuilder* CreateIndexBuilder(
|
||||
BlockBasedTableOptions::IndexType index_type,
|
||||
const InternalKeyComparator* comparator,
|
||||
const SliceTransform* prefix_extractor, int index_block_restart_interval,
|
||||
uint64_t index_per_partition);
|
||||
const rocksdb::InternalKeyComparator* comparator,
|
||||
const InternalKeySliceTransform* int_key_slice_transform,
|
||||
const BlockBasedTableOptions& table_opt);
|
||||
|
||||
// Index builder will construct a set of blocks which contain:
|
||||
// 1. One primary index block.
|
||||
@ -262,4 +264,65 @@ class HashIndexBuilder : public IndexBuilder {
|
||||
|
||||
uint64_t current_restart_index_ = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* IndexBuilder for two-level indexing. Internally it creates a new index for
|
||||
* each partition and Finish then in order when Finish is called on it
|
||||
* continiously until Status::OK() is returned.
|
||||
*
|
||||
* The format on the disk would be I I I I I I IP where I is block containing a
|
||||
* partition of indexes built using ShortenedIndexBuilder and IP is a block
|
||||
* containing a secondary index on the partitions, built using
|
||||
* ShortenedIndexBuilder.
|
||||
*/
|
||||
class PartitionedIndexBuilder : public IndexBuilder {
|
||||
public:
|
||||
static PartitionedIndexBuilder* CreateIndexBuilder(
|
||||
const rocksdb::InternalKeyComparator* comparator,
|
||||
const BlockBasedTableOptions& table_opt);
|
||||
|
||||
explicit PartitionedIndexBuilder(const InternalKeyComparator* comparator,
|
||||
const BlockBasedTableOptions& table_opt);
|
||||
|
||||
virtual ~PartitionedIndexBuilder();
|
||||
|
||||
virtual void AddIndexEntry(std::string* last_key_in_current_block,
|
||||
const Slice* first_key_in_next_block,
|
||||
const BlockHandle& block_handle) override;
|
||||
|
||||
virtual Status Finish(
|
||||
IndexBlocks* index_blocks,
|
||||
const BlockHandle& last_partition_block_handle) override;
|
||||
|
||||
virtual size_t EstimatedSize() const override;
|
||||
|
||||
inline bool ShouldCutFilterBlock() {
|
||||
// Current policy is to align the partitions of index and filters
|
||||
if (cut_filter_block) {
|
||||
cut_filter_block = false;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string& GetPartitionKey() { return entries_.back().key; }
|
||||
|
||||
private:
|
||||
static const BlockBasedTableOptions::IndexType sub_type_ =
|
||||
BlockBasedTableOptions::kBinarySearch;
|
||||
struct Entry {
|
||||
std::string key;
|
||||
std::unique_ptr<IndexBuilder> value;
|
||||
};
|
||||
std::list<Entry> entries_; // list of partitioned indexes and their keys
|
||||
BlockBuilder index_block_builder_; // top-level index builder
|
||||
IndexBuilder* sub_index_builder_; // the active partition index builder
|
||||
uint64_t num_indexes = 0;
|
||||
bool finishing_indexes =
|
||||
false; // true if Finish is called once but not complete yet.
|
||||
const BlockBasedTableOptions& table_opt_;
|
||||
// Filter data
|
||||
bool cut_filter_block =
|
||||
false; // true if it should cut the next filter partition block
|
||||
};
|
||||
} // namespace rocksdb
|
||||
|
@ -9,76 +9,62 @@
|
||||
#include "util/coding.h"
|
||||
|
||||
namespace rocksdb {
|
||||
PartitionIndexBuilder::PartitionIndexBuilder(
|
||||
const InternalKeyComparator* comparator,
|
||||
const SliceTransform* prefix_extractor, const uint64_t index_per_partition,
|
||||
int index_block_restart_interval)
|
||||
: IndexBuilder(comparator),
|
||||
prefix_extractor_(prefix_extractor),
|
||||
index_block_builder_(index_block_restart_interval),
|
||||
index_per_partition_(index_per_partition),
|
||||
index_block_restart_interval_(index_block_restart_interval) {
|
||||
sub_index_builder_ =
|
||||
CreateIndexBuilder(sub_type_, comparator_, prefix_extractor_,
|
||||
index_block_restart_interval_, index_per_partition_);
|
||||
}
|
||||
|
||||
PartitionIndexBuilder::~PartitionIndexBuilder() { delete sub_index_builder_; }
|
||||
PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder(
|
||||
const SliceTransform* prefix_extractor, bool whole_key_filtering,
|
||||
FilterBitsBuilder* filter_bits_builder, int index_block_restart_interval,
|
||||
PartitionedIndexBuilder* const p_index_builder)
|
||||
: FullFilterBlockBuilder(prefix_extractor, whole_key_filtering,
|
||||
filter_bits_builder),
|
||||
index_on_filter_block_builder_(index_block_restart_interval),
|
||||
p_index_builder_(p_index_builder) {}
|
||||
|
||||
void PartitionIndexBuilder::AddIndexEntry(
|
||||
std::string* last_key_in_current_block,
|
||||
const Slice* first_key_in_next_block, const BlockHandle& block_handle) {
|
||||
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
|
||||
first_key_in_next_block, block_handle);
|
||||
num_indexes++;
|
||||
if (UNLIKELY(first_key_in_next_block == nullptr)) { // no more keys
|
||||
entries_.push_back({std::string(*last_key_in_current_block),
|
||||
std::unique_ptr<IndexBuilder>(sub_index_builder_)});
|
||||
sub_index_builder_ = nullptr;
|
||||
} else if (num_indexes % index_per_partition_ == 0) {
|
||||
entries_.push_back({std::string(*last_key_in_current_block),
|
||||
std::unique_ptr<IndexBuilder>(sub_index_builder_)});
|
||||
sub_index_builder_ =
|
||||
CreateIndexBuilder(sub_type_, comparator_, prefix_extractor_,
|
||||
index_block_restart_interval_, index_per_partition_);
|
||||
PartitionedFilterBlockBuilder::~PartitionedFilterBlockBuilder() {}
|
||||
|
||||
void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock() {
|
||||
if (!p_index_builder_->ShouldCutFilterBlock()) {
|
||||
return;
|
||||
}
|
||||
filter_gc.push_back(std::unique_ptr<const char[]>(nullptr));
|
||||
Slice filter = filter_bits_builder_->Finish(&filter_gc.back());
|
||||
std::string& index_key = p_index_builder_->GetPartitionKey();
|
||||
filters.push_back({index_key, filter});
|
||||
}
|
||||
|
||||
Status PartitionIndexBuilder::Finish(
|
||||
IndexBlocks* index_blocks, const BlockHandle& last_partition_block_handle) {
|
||||
assert(!entries_.empty());
|
||||
// It must be set to null after last key is added
|
||||
assert(sub_index_builder_ == nullptr);
|
||||
if (finishing == true) {
|
||||
Entry& last_entry = entries_.front();
|
||||
void PartitionedFilterBlockBuilder::AddKey(const Slice& key) {
|
||||
MaybeCutAFilterBlock();
|
||||
filter_bits_builder_->AddKey(key);
|
||||
}
|
||||
|
||||
Slice PartitionedFilterBlockBuilder::Finish(
|
||||
const BlockHandle& last_partition_block_handle, Status* status) {
|
||||
if (finishing_filters == true) {
|
||||
// Record the handle of the last written filter block in the index
|
||||
FilterEntry& last_entry = filters.front();
|
||||
std::string handle_encoding;
|
||||
last_partition_block_handle.EncodeTo(&handle_encoding);
|
||||
index_block_builder_.Add(last_entry.key, handle_encoding);
|
||||
entries_.pop_front();
|
||||
}
|
||||
// If there is no sub_index left, then return the 2nd level index.
|
||||
if (UNLIKELY(entries_.empty())) {
|
||||
index_blocks->index_block_contents = index_block_builder_.Finish();
|
||||
return Status::OK();
|
||||
index_on_filter_block_builder_.Add(last_entry.key, handle_encoding);
|
||||
filters.pop_front();
|
||||
} else {
|
||||
// Finish the next partition index in line and Incomplete() to indicate we
|
||||
// expect more calls to Finish
|
||||
Entry& entry = entries_.front();
|
||||
auto s = entry.value->Finish(index_blocks);
|
||||
finishing = true;
|
||||
return s.ok() ? Status::Incomplete() : s;
|
||||
MaybeCutAFilterBlock();
|
||||
}
|
||||
}
|
||||
|
||||
size_t PartitionIndexBuilder::EstimatedSize() const {
|
||||
size_t total = 0;
|
||||
for (auto it = entries_.begin(); it != entries_.end(); ++it) {
|
||||
total += it->value->EstimatedSize();
|
||||
// If there is no filter partition left, then return the index on filter
|
||||
// partitions
|
||||
if (UNLIKELY(filters.empty())) {
|
||||
*status = Status::OK();
|
||||
if (finishing_filters) {
|
||||
return index_on_filter_block_builder_.Finish();
|
||||
} else {
|
||||
// This is the rare case where no key was added to the filter
|
||||
return Slice();
|
||||
}
|
||||
} else {
|
||||
// Return the next filter partition in line and set Incomplete() status to
|
||||
// indicate we expect more calls to Finish
|
||||
*status = Status::Incomplete();
|
||||
finishing_filters = true;
|
||||
return filters.front().filter;
|
||||
}
|
||||
total += index_block_builder_.CurrentSizeEstimate();
|
||||
total +=
|
||||
sub_index_builder_ == nullptr ? 0 : sub_index_builder_->EstimatedSize();
|
||||
return total;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -7,59 +7,46 @@
|
||||
|
||||
#include <list>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "db/dbformat.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/slice_transform.h"
|
||||
#include "util/hash.h"
|
||||
|
||||
#include "table/full_filter_block.h"
|
||||
#include "table/index_builder.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
/**
|
||||
* IndexBuilder for two-level indexing. Internally it creates a new index for
|
||||
* each partition and Finish then in order when Finish is called on it
|
||||
* continiously until Status::OK() is returned.
|
||||
*
|
||||
* The format on the disk would be I I I I I I IP where I is block containing a
|
||||
* partition of indexes built using ShortenedIndexBuilder and IP is a block
|
||||
* containing a secondary index on the partitions, built using
|
||||
* ShortenedIndexBuilder.
|
||||
*/
|
||||
class PartitionIndexBuilder : public IndexBuilder {
|
||||
class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder {
|
||||
public:
|
||||
explicit PartitionIndexBuilder(const InternalKeyComparator* comparator,
|
||||
const SliceTransform* prefix_extractor,
|
||||
const uint64_t index_per_partition,
|
||||
int index_block_restart_interval);
|
||||
explicit PartitionedFilterBlockBuilder(
|
||||
const SliceTransform* prefix_extractor, bool whole_key_filtering,
|
||||
FilterBitsBuilder* filter_bits_builder, int index_block_restart_interval,
|
||||
PartitionedIndexBuilder* const p_index_builder);
|
||||
|
||||
virtual ~PartitionIndexBuilder();
|
||||
virtual ~PartitionedFilterBlockBuilder();
|
||||
|
||||
virtual void AddIndexEntry(std::string* last_key_in_current_block,
|
||||
const Slice* first_key_in_next_block,
|
||||
const BlockHandle& block_handle);
|
||||
void AddKey(const Slice& key) override;
|
||||
|
||||
virtual Status Finish(IndexBlocks* index_blocks,
|
||||
const BlockHandle& last_partition_block_handle);
|
||||
|
||||
virtual size_t EstimatedSize() const;
|
||||
virtual Slice Finish(const BlockHandle& last_partition_block_handle,
|
||||
Status* status) override;
|
||||
|
||||
private:
|
||||
static const BlockBasedTableOptions::IndexType sub_type_ =
|
||||
BlockBasedTableOptions::kBinarySearch;
|
||||
struct Entry {
|
||||
// Filter data
|
||||
BlockBuilder index_on_filter_block_builder_; // top-level index builder
|
||||
struct FilterEntry {
|
||||
std::string key;
|
||||
std::unique_ptr<IndexBuilder> value;
|
||||
Slice filter;
|
||||
};
|
||||
std::list<Entry> entries_; // list of partitioned indexes and their keys
|
||||
const SliceTransform* prefix_extractor_;
|
||||
BlockBuilder index_block_builder_; // top-level index builder
|
||||
IndexBuilder* sub_index_builder_; // the active partition index builder
|
||||
uint64_t index_per_partition_;
|
||||
int index_block_restart_interval_;
|
||||
uint64_t num_indexes = 0;
|
||||
bool finishing =
|
||||
std::list<FilterEntry> filters; // list of partitioned indexes and their keys
|
||||
std::unique_ptr<IndexBuilder> value;
|
||||
std::vector<std::unique_ptr<const char[]>> filter_gc;
|
||||
bool finishing_filters =
|
||||
false; // true if Finish is called once but not complete yet.
|
||||
// The policy of when cut a filter block and Finish it
|
||||
void MaybeCutAFilterBlock();
|
||||
PartitionedIndexBuilder* const p_index_builder_;
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -638,6 +638,9 @@ static std::unordered_map<std::string, OptionTypeInfo>
|
||||
{"index_per_partition",
|
||||
{offsetof(struct BlockBasedTableOptions, index_per_partition),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal, false, 0}},
|
||||
{"partition_filters",
|
||||
{offsetof(struct BlockBasedTableOptions, partition_filters),
|
||||
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
|
||||
{"filter_policy",
|
||||
{offsetof(struct BlockBasedTableOptions, filter_policy),
|
||||
OptionType::kFilterPolicy, OptionVerificationType::kByName, false,
|
||||
@ -702,7 +705,7 @@ static std::unordered_map<std::string, BlockBasedTableOptions::IndexType>
|
||||
{"kBinarySearch", BlockBasedTableOptions::IndexType::kBinarySearch},
|
||||
{"kHashSearch", BlockBasedTableOptions::IndexType::kHashSearch},
|
||||
{"kTwoLevelIndexSearch",
|
||||
BlockBasedTableOptions::IndexType::kHashSearch}};
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch}};
|
||||
|
||||
static std::unordered_map<std::string, EncodingType> encoding_type_string_map =
|
||||
{{"kPlain", kPlain}, {"kPrefix", kPrefix}};
|
||||
|
@ -156,6 +156,7 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
|
||||
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
|
||||
"block_size_deviation=8;block_restart_interval=4; "
|
||||
"index_per_partition=4;"
|
||||
"partition_filters=false;"
|
||||
"index_block_restart_interval=4;"
|
||||
"filter_policy=bloomfilter:4:true;whole_key_filtering=1;"
|
||||
"format_version=1;"
|
||||
|
Loading…
x
Reference in New Issue
Block a user