rocksdb/table/partitioned_filter_block.cc
Maysam Yabandeh caf0f53a74 Index value delta encoding (#3983)
Summary:
Given that index value is a BlockHandle, which is basically an <offset, size> pair we can apply delta encoding on the values. The first value at each index restart interval encoded the full BlockHandle but the rest encode only the size. Refer to IndexBlockIter::DecodeCurrentValue for the detail of the encoding. This reduces the index size which helps using the  block cache more efficiently. The feature is enabled with using format_version 4.

The feature comes with a bit of cpu overhead which should be paid back by the higher cache hits due to smaller index block size.
Results with sysbench read-only using 4k blocks and using 16 index restart interval:
Format 2:
19585   rocksdb read-only range=100
Format 3:
19569   rocksdb read-only range=100
Format 4:
19352   rocksdb read-only range=100
Pull Request resolved: https://github.com/facebook/rocksdb/pull/3983

Differential Revision: D8361343

Pulled By: maysamyabandeh

fbshipit-source-id: f882ee082322acac32b0072e2bdbb0b5f854e651
2018-08-09 16:58:40 -07:00

353 lines
13 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "table/partitioned_filter_block.h"
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
#ifdef OS_FREEBSD
#include <malloc_np.h>
#else
#include <malloc.h>
#endif
#endif
#include <utility>
#include "monitoring/perf_context_imp.h"
#include "port/port.h"
#include "rocksdb/filter_policy.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
#include "util/coding.h"
namespace rocksdb {
PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder(
const SliceTransform* prefix_extractor, bool whole_key_filtering,
FilterBitsBuilder* filter_bits_builder, int index_block_restart_interval,
const bool use_value_delta_encoding,
PartitionedIndexBuilder* const p_index_builder,
const uint32_t partition_size)
: FullFilterBlockBuilder(prefix_extractor, whole_key_filtering,
filter_bits_builder),
index_on_filter_block_builder_(index_block_restart_interval,
true /*use_delta_encoding*/,
use_value_delta_encoding),
index_on_filter_block_builder_without_seq_(index_block_restart_interval,
true /*use_delta_encoding*/,
use_value_delta_encoding),
p_index_builder_(p_index_builder),
filters_in_partition_(0),
num_added_(0) {
filters_per_partition_ =
filter_bits_builder_->CalculateNumEntry(partition_size);
}
PartitionedFilterBlockBuilder::~PartitionedFilterBlockBuilder() {}
void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock() {
// Use == to send the request only once
if (filters_in_partition_ == filters_per_partition_) {
// Currently only index builder is in charge of cutting a partition. We keep
// requesting until it is granted.
p_index_builder_->RequestPartitionCut();
}
if (!p_index_builder_->ShouldCutFilterBlock()) {
return;
}
filter_gc.push_back(std::unique_ptr<const char[]>(nullptr));
Slice filter = filter_bits_builder_->Finish(&filter_gc.back());
std::string& index_key = p_index_builder_->GetPartitionKey();
filters.push_back({index_key, filter});
filters_in_partition_ = 0;
Reset();
}
void PartitionedFilterBlockBuilder::AddKey(const Slice& key) {
MaybeCutAFilterBlock();
filter_bits_builder_->AddKey(key);
filters_in_partition_++;
num_added_++;
}
Slice PartitionedFilterBlockBuilder::Finish(
const BlockHandle& last_partition_block_handle, Status* status) {
if (finishing_filters == true) {
// Record the handle of the last written filter block in the index
FilterEntry& last_entry = filters.front();
std::string handle_encoding;
last_partition_block_handle.EncodeTo(&handle_encoding);
std::string handle_delta_encoding;
last_partition_block_handle.EncodeSizeTo(&handle_delta_encoding);
const Slice handle_delta_encoding_slice(handle_delta_encoding);
index_on_filter_block_builder_.Add(last_entry.key, handle_encoding,
&handle_delta_encoding_slice);
if (!p_index_builder_->seperator_is_key_plus_seq()) {
index_on_filter_block_builder_without_seq_.Add(
ExtractUserKey(last_entry.key), handle_encoding,
&handle_delta_encoding_slice);
}
filters.pop_front();
} else {
MaybeCutAFilterBlock();
}
// If there is no filter partition left, then return the index on filter
// partitions
if (UNLIKELY(filters.empty())) {
*status = Status::OK();
if (finishing_filters) {
if (p_index_builder_->seperator_is_key_plus_seq()) {
return index_on_filter_block_builder_.Finish();
} else {
return index_on_filter_block_builder_without_seq_.Finish();
}
} else {
// This is the rare case where no key was added to the filter
return Slice();
}
} else {
// Return the next filter partition in line and set Incomplete() status to
// indicate we expect more calls to Finish
*status = Status::Incomplete();
finishing_filters = true;
return filters.front().filter;
}
}
PartitionedFilterBlockReader::PartitionedFilterBlockReader(
const SliceTransform* prefix_extractor, bool _whole_key_filtering,
BlockContents&& contents, FilterBitsReader* /*filter_bits_reader*/,
Statistics* stats, const InternalKeyComparator comparator,
const BlockBasedTable* table, const bool index_key_includes_seq,
const bool index_value_is_full)
: FilterBlockReader(contents.data.size(), stats, _whole_key_filtering),
prefix_extractor_(prefix_extractor),
comparator_(comparator),
table_(table),
index_key_includes_seq_(index_key_includes_seq),
index_value_is_full_(index_value_is_full) {
idx_on_fltr_blk_.reset(new Block(std::move(contents),
kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */, stats));
}
PartitionedFilterBlockReader::~PartitionedFilterBlockReader() {
// TODO(myabandeh): if instead of filter object we store only the blocks in
// block cache, then we don't have to manually earse them from block cache
// here.
auto block_cache = table_->rep_->table_options.block_cache.get();
if (UNLIKELY(block_cache == nullptr)) {
return;
}
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
IndexBlockIter biter;
BlockHandle handle;
Statistics* kNullStats = nullptr;
idx_on_fltr_blk_->NewIterator<IndexBlockIter>(
&comparator_, comparator_.user_comparator(), &biter, kNullStats, true,
index_key_includes_seq_, index_value_is_full_);
biter.SeekToFirst();
for (; biter.Valid(); biter.Next()) {
handle = biter.value();
auto key = BlockBasedTable::GetCacheKey(table_->rep_->cache_key_prefix,
table_->rep_->cache_key_prefix_size,
handle, cache_key);
block_cache->Erase(key);
}
}
bool PartitionedFilterBlockReader::KeyMayMatch(
const Slice& key, const SliceTransform* prefix_extractor,
uint64_t block_offset, const bool no_io,
const Slice* const const_ikey_ptr) {
assert(const_ikey_ptr != nullptr);
assert(block_offset == kNotValid);
if (!whole_key_filtering_) {
return true;
}
if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) {
return true;
}
auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr);
if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range
return false;
}
bool cached = false;
auto filter_partition =
GetFilterPartition(nullptr /* prefetch_buffer */, filter_handle, no_io,
&cached, prefix_extractor);
if (UNLIKELY(!filter_partition.value)) {
return true;
}
auto res = filter_partition.value->KeyMayMatch(key, prefix_extractor,
block_offset, no_io);
if (cached) {
return res;
}
if (LIKELY(filter_partition.IsSet())) {
filter_partition.Release(table_->rep_->table_options.block_cache.get());
} else {
delete filter_partition.value;
}
return res;
}
bool PartitionedFilterBlockReader::PrefixMayMatch(
const Slice& prefix, const SliceTransform* prefix_extractor,
uint64_t block_offset, const bool no_io,
const Slice* const const_ikey_ptr) {
#ifdef NDEBUG
(void)block_offset;
#endif
assert(const_ikey_ptr != nullptr);
assert(block_offset == kNotValid);
if (!prefix_extractor_ && !prefix_extractor) {
return true;
}
if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) {
return true;
}
auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr);
if (UNLIKELY(filter_handle.size() == 0)) { // prefix is out of range
return false;
}
bool cached = false;
auto filter_partition =
GetFilterPartition(nullptr /* prefetch_buffer */, filter_handle, no_io,
&cached, prefix_extractor);
if (UNLIKELY(!filter_partition.value)) {
return true;
}
auto res = filter_partition.value->PrefixMayMatch(prefix, prefix_extractor,
kNotValid, no_io);
if (cached) {
return res;
}
if (LIKELY(filter_partition.IsSet())) {
filter_partition.Release(table_->rep_->table_options.block_cache.get());
} else {
delete filter_partition.value;
}
return res;
}
BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
const Slice& entry) {
IndexBlockIter iter;
Statistics* kNullStats = nullptr;
idx_on_fltr_blk_->NewIterator<IndexBlockIter>(
&comparator_, comparator_.user_comparator(), &iter, kNullStats, true,
index_key_includes_seq_, index_value_is_full_);
iter.Seek(entry);
if (UNLIKELY(!iter.Valid())) {
return BlockHandle(0, 0);
}
assert(iter.Valid());
BlockHandle fltr_blk_handle = iter.value();
return fltr_blk_handle;
}
BlockBasedTable::CachableEntry<FilterBlockReader>
PartitionedFilterBlockReader::GetFilterPartition(
FilePrefetchBuffer* prefetch_buffer, BlockHandle& fltr_blk_handle,
const bool no_io, bool* cached, const SliceTransform* prefix_extractor) {
const bool is_a_filter_partition = true;
auto block_cache = table_->rep_->table_options.block_cache.get();
if (LIKELY(block_cache != nullptr)) {
if (filter_map_.size() != 0) {
auto iter = filter_map_.find(fltr_blk_handle.offset());
// This is a possible scenario since block cache might not have had space
// for the partition
if (iter != filter_map_.end()) {
PERF_COUNTER_ADD(block_cache_hit_count, 1);
RecordTick(statistics(), BLOCK_CACHE_FILTER_HIT);
RecordTick(statistics(), BLOCK_CACHE_HIT);
RecordTick(statistics(), BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(iter->second.cache_handle));
*cached = true;
return iter->second;
}
}
return table_->GetFilter(/*prefetch_buffer*/ nullptr, fltr_blk_handle,
is_a_filter_partition, no_io,
/* get_context */ nullptr, prefix_extractor);
} else {
auto filter = table_->ReadFilter(prefetch_buffer, fltr_blk_handle,
is_a_filter_partition, prefix_extractor);
return {filter, nullptr};
}
}
size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const {
size_t usage = idx_on_fltr_blk_->usable_size();
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
usage += malloc_usable_size((void*)this);
#else
usage += sizeof(*this);
#endif // ROCKSDB_MALLOC_USABLE_SIZE
return usage;
// TODO(myabandeh): better estimation for filter_map_ size
}
// Release the cached entry and decrement its ref count.
void ReleaseFilterCachedEntry(void* arg, void* h) {
Cache* cache = reinterpret_cast<Cache*>(arg);
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
cache->Release(handle);
}
// TODO(myabandeh): merge this with the same function in IndexReader
void PartitionedFilterBlockReader::CacheDependencies(
bool pin, const SliceTransform* prefix_extractor) {
// Before read partitions, prefetch them to avoid lots of IOs
auto rep = table_->rep_;
IndexBlockIter biter;
Statistics* kNullStats = nullptr;
idx_on_fltr_blk_->NewIterator<IndexBlockIter>(
&comparator_, comparator_.user_comparator(), &biter, kNullStats, true,
index_key_includes_seq_, index_value_is_full_);
// Index partitions are assumed to be consecuitive. Prefetch them all.
// Read the first block offset
biter.SeekToFirst();
BlockHandle handle = biter.value();
uint64_t prefetch_off = handle.offset();
// Read the last block's offset
biter.SeekToLast();
handle = biter.value();
uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize;
uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
auto& file = table_->rep_->file;
prefetch_buffer.reset(new FilePrefetchBuffer());
Status s;
s = prefetch_buffer->Prefetch(file.get(), prefetch_off,
static_cast<size_t>(prefetch_len));
// After prefetch, read the partitions one by one
biter.SeekToFirst();
Cache* block_cache = rep->table_options.block_cache.get();
for (; biter.Valid(); biter.Next()) {
handle = biter.value();
const bool no_io = true;
const bool is_a_filter_partition = true;
auto filter = table_->GetFilter(
prefetch_buffer.get(), handle, is_a_filter_partition, !no_io,
/* get_context */ nullptr, prefix_extractor);
if (LIKELY(filter.IsSet())) {
if (pin) {
filter_map_[handle.offset()] = std::move(filter);
RegisterCleanup(&ReleaseFilterCachedEntry, block_cache,
filter.cache_handle);
} else {
block_cache->Release(filter.cache_handle);
}
} else {
delete filter.value;
}
}
}
} // namespace rocksdb