Readers for partition filter

Summary:
This is the last split of this pull request: https://github.com/facebook/rocksdb/pull/1891 which includes the reader part as well as the tests.
Closes https://github.com/facebook/rocksdb/pull/1961

Differential Revision: D4672216

Pulled By: maysamyabandeh

fbshipit-source-id: 6a2b829
This commit is contained in:
Maysam Yabandeh 2017-03-22 09:11:23 -07:00 committed by Facebook Github Bot
parent 9ef3627fd3
commit 8b0097b49b
15 changed files with 1069 additions and 321 deletions

View File

@ -354,6 +354,7 @@ TESTS = \
file_reader_writer_test \
block_based_filter_block_test \
full_filter_block_test \
partitioned_filter_block_test \
hash_table_test \
histogram_test \
log_test \
@ -1158,6 +1159,9 @@ block_based_filter_block_test: table/block_based_filter_block_test.o $(LIBOBJECT
full_filter_block_test: table/full_filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
partitioned_filter_block_test: table/partitioned_filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

View File

@ -20,17 +20,23 @@ class DBBloomFilterTest : public DBTestBase {
DBBloomFilterTest() : DBTestBase("/db_bloom_filter_test") {}
};
class DBBloomFilterTestWithParam : public DBTestBase,
public testing::WithParamInterface<bool> {
class DBBloomFilterTestWithParam
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
// public testing::WithParamInterface<bool> {
protected:
bool use_block_based_filter_;
bool partition_filters_;
public:
DBBloomFilterTestWithParam() : DBTestBase("/db_bloom_filter_tests") {}
~DBBloomFilterTestWithParam() {}
void SetUp() override { use_block_based_filter_ = GetParam(); }
void SetUp() override {
use_block_based_filter_ = std::get<0>(GetParam());
partition_filters_ = std::get<1>(GetParam());
}
};
// KeyMayExist can lead to a few false positives, but not false negatives.
@ -43,7 +49,17 @@ TEST_P(DBBloomFilterTestWithParam, KeyMayExist) {
anon::OptionsOverride options_override;
options_override.filter_policy.reset(
NewBloomFilterPolicy(20, use_block_based_filter_));
options_override.partition_filters = partition_filters_;
options_override.index_per_partition = 2;
Options options = CurrentOptions(options_override);
if (partition_filters_ &&
static_cast<BlockBasedTableOptions*>(
options.table_factory->GetOptions())
->index_type != BlockBasedTableOptions::kTwoLevelIndexSearch) {
// In the current implementation partitioned filters depend on partitioned
// indexes
continue;
}
options.statistics = rocksdb::CreateDBStatistics();
CreateAndReopenWithCF({"pikachu"}, options);
@ -102,11 +118,16 @@ TEST_P(DBBloomFilterTestWithParam, KeyMayExist) {
}
TEST_F(DBBloomFilterTest, GetFilterByPrefixBloom) {
for (bool partition_filters : {true, false}) {
Options options = last_options_;
options.prefix_extractor.reset(NewFixedPrefixTransform(8));
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
if (partition_filters) {
bbto.partition_filters = true;
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
bbto.whole_key_filtering = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
@ -139,9 +160,11 @@ TEST_F(DBBloomFilterTest, GetFilterByPrefixBloom) {
ro.total_order_seek = true;
ASSERT_TRUE(db_->Get(ro, "foobarbar", &value).IsNotFound());
ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 2);
}
}
TEST_F(DBBloomFilterTest, WholeKeyFilterProp) {
for (bool partition_filters : {true, false}) {
Options options = last_options_;
options.prefix_extractor.reset(NewFixedPrefixTransform(3));
options.statistics = rocksdb::CreateDBStatistics();
@ -149,6 +172,10 @@ TEST_F(DBBloomFilterTest, WholeKeyFilterProp) {
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.whole_key_filtering = false;
if (partition_filters) {
bbto.partition_filters = true;
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
@ -288,6 +315,7 @@ TEST_F(DBBloomFilterTest, WholeKeyFilterProp) {
ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 12);
ASSERT_EQ("bar", Get("barfoo"));
ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 12);
}
}
TEST_P(DBBloomFilterTestWithParam, BloomFilter) {
@ -301,6 +329,12 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) {
table_options.no_block_cache = true;
table_options.filter_policy.reset(
NewBloomFilterPolicy(10, use_block_based_filter_));
table_options.partition_filters = partition_filters_;
if (partition_filters_) {
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
table_options.index_per_partition = 2;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
@ -327,7 +361,13 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) {
int reads = env_->random_read_counter_.Read();
fprintf(stderr, "%d present => %d reads\n", N, reads);
ASSERT_GE(reads, N);
if (partition_filters_) {
// Without block cache, we read an extra partition filter per each
// level*read and a partition index per each read
ASSERT_LE(reads, 4 * N + 2 * N / 100);
} else {
ASSERT_LE(reads, N + 2 * N / 100);
}
// Lookup present keys. Should rarely read from either sstable.
env_->random_read_counter_.Reset();
@ -336,7 +376,13 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) {
}
reads = env_->random_read_counter_.Read();
fprintf(stderr, "%d missing => %d reads\n", N, reads);
if (partition_filters_) {
// With partitioned filter we read one extra filter per level per each
// missed read.
ASSERT_LE(reads, 2 * N + 3 * N / 100);
} else {
ASSERT_LE(reads, 3 * N / 100);
}
env_->delay_sstable_sync_.store(false, std::memory_order_release);
Close();
@ -344,7 +390,9 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) {
}
INSTANTIATE_TEST_CASE_P(DBBloomFilterTestWithParam, DBBloomFilterTestWithParam,
::testing::Bool());
::testing::Values(std::make_tuple(true, false),
std::make_tuple(false, true),
std::make_tuple(false, false)));
TEST_F(DBBloomFilterTest, BloomFilterRate) {
while (ChangeFilterOptions()) {
@ -401,14 +449,35 @@ TEST_F(DBBloomFilterTest, BloomFilterCompatibility) {
ASSERT_EQ(Key(i), Get(1, Key(i)));
}
ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0);
// Check db with partitioned full filter
table_options.partition_filters = true;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
// Check if they can be found
for (int i = 0; i < maxKey; i++) {
ASSERT_EQ(Key(i), Get(1, Key(i)));
}
ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0);
}
TEST_F(DBBloomFilterTest, BloomFilterReverseCompatibility) {
for (bool partition_filters : {true, false}) {
Options options = CurrentOptions();
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions table_options;
if (partition_filters) {
table_options.partition_filters = true;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
// Create with full filter
CreateAndReopenWithCF({"pikachu"}, options);
@ -430,6 +499,7 @@ TEST_F(DBBloomFilterTest, BloomFilterReverseCompatibility) {
ASSERT_EQ(Key(i), Get(1, Key(i)));
}
ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0);
}
}
namespace {
@ -586,11 +656,12 @@ TEST_F(DBBloomFilterTest, PrefixExtractorBlockFilter) {
#ifndef ROCKSDB_LITE
class BloomStatsTestWithParam
: public DBBloomFilterTest,
public testing::WithParamInterface<std::tuple<bool, bool>> {
public testing::WithParamInterface<std::tuple<bool, bool, bool>> {
public:
BloomStatsTestWithParam() {
use_block_table_ = std::get<0>(GetParam());
use_block_based_builder_ = std::get<1>(GetParam());
partition_filters_ = std::get<2>(GetParam());
options_.create_if_missing = true;
options_.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(4));
@ -599,10 +670,17 @@ class BloomStatsTestWithParam
if (use_block_table_) {
BlockBasedTableOptions table_options;
table_options.hash_index_allow_collision = false;
if (partition_filters_) {
assert(!use_block_based_builder_);
table_options.partition_filters = partition_filters_;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
table_options.filter_policy.reset(
NewBloomFilterPolicy(10, use_block_based_builder_));
options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
} else {
assert(!partition_filters_); // not supported in plain table
PlainTableOptions table_options;
options_.table_factory.reset(NewPlainTableFactory(table_options));
}
@ -623,6 +701,7 @@ class BloomStatsTestWithParam
bool use_block_table_;
bool use_block_based_builder_;
bool partition_filters_;
Options options_;
};
@ -717,25 +796,44 @@ TEST_P(BloomStatsTestWithParam, BloomStatsTestWithIter) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(value1, iter->value().ToString());
if (partition_filters_) {
ASSERT_EQ(0, perf_context.bloom_sst_hit_count); // no_io
ASSERT_EQ(0, perf_context.bloom_sst_miss_count); // no_io
} else {
ASSERT_EQ(1, perf_context.bloom_sst_hit_count);
}
iter->Seek(key3);
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(value3, iter->value().ToString());
if (partition_filters_) {
ASSERT_EQ(0, perf_context.bloom_sst_hit_count); // no_io
ASSERT_EQ(0, perf_context.bloom_sst_miss_count); // no_io
} else {
ASSERT_EQ(2, perf_context.bloom_sst_hit_count);
}
iter->Seek(key2);
ASSERT_OK(iter->status());
if (partition_filters_) {
// iter is still valid since filter did not reject the key2
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(0, perf_context.bloom_sst_hit_count); // no_io
ASSERT_EQ(0, perf_context.bloom_sst_miss_count); // no_io
} else {
ASSERT_TRUE(!iter->Valid());
ASSERT_EQ(1, perf_context.bloom_sst_miss_count);
ASSERT_EQ(2, perf_context.bloom_sst_hit_count);
}
}
INSTANTIATE_TEST_CASE_P(BloomStatsTestWithParam, BloomStatsTestWithParam,
::testing::Values(std::make_tuple(true, true),
std::make_tuple(true, false),
std::make_tuple(false, false)));
::testing::Values(std::make_tuple(true, true, false),
std::make_tuple(true, false, false),
std::make_tuple(true, false, true),
std::make_tuple(false, false,
false)));
namespace {
void PrefixScanInit(DBBloomFilterTest* dbtest) {

View File

@ -1221,7 +1221,7 @@ class PinL0IndexAndFilterBlocksTest : public DBTestBase,
PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {}
virtual void SetUp() override { infinite_max_files_ = GetParam(); }
void CreateTwoLevels(Options* options) {
void CreateTwoLevels(Options* options, bool close_afterwards) {
if (infinite_max_files_) {
options->max_open_files = -1;
}
@ -1249,6 +1249,9 @@ class PinL0IndexAndFilterBlocksTest : public DBTestBase,
Put(1, "z2", "end2");
ASSERT_OK(Flush(1));
if (close_afterwards) {
Close(); // This ensures that there is no ref to block cache entries
}
table_options.block_cache->EraseUnRefEntries();
}
@ -1303,7 +1306,7 @@ TEST_P(PinL0IndexAndFilterBlocksTest,
TEST_P(PinL0IndexAndFilterBlocksTest,
MultiLevelIndexAndFilterBlocksCachedWithPinning) {
Options options = CurrentOptions();
PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options);
PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, false);
// get base cache values
uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
@ -1332,7 +1335,10 @@ TEST_P(PinL0IndexAndFilterBlocksTest,
TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
Options options = CurrentOptions();
PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options);
// This ensures that db does not ref anything in the block cache, so
// EraseUnRefEntries could clear them up.
bool close_afterwards = true;
PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, close_afterwards);
// Get base cache values
uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);

View File

@ -230,6 +230,8 @@ bool DBTestBase::ChangeFilterOptions() {
option_config_ = kFilter;
} else if (option_config_ == kFilter) {
option_config_ = kFullFilterWithNewTableReaderForCompactions;
} else if (option_config_ == kFullFilterWithNewTableReaderForCompactions) {
option_config_ = kPartitionedFilterWithNewTableReaderForCompactions;
} else {
return false;
}
@ -325,6 +327,14 @@ Options DBTestBase::CurrentOptions(
options.new_table_reader_for_compaction_inputs = true;
options.compaction_readahead_size = 10 * 1024 * 1024;
break;
case kPartitionedFilterWithNewTableReaderForCompactions:
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
table_options.partition_filters = true;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
options.new_table_reader_for_compaction_inputs = true;
options.compaction_readahead_size = 10 * 1024 * 1024;
break;
case kUncompressed:
options.compression = kNoCompression;
break;
@ -426,6 +436,8 @@ Options DBTestBase::CurrentOptions(
if (options_override.filter_policy) {
table_options.filter_policy = options_override.filter_policy;
table_options.partition_filters = options_override.partition_filters;
table_options.index_per_partition = options_override.index_per_partition;
}
if (set_block_based_table_factory) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options));

View File

@ -109,6 +109,11 @@ class AtomicCounter {
struct OptionsOverride {
std::shared_ptr<const FilterPolicy> filter_policy = nullptr;
// These will be used only if filter_policy is set
bool partition_filters = false;
uint64_t index_per_partition = 1024;
BlockBasedTableOptions::IndexType index_type =
BlockBasedTableOptions::IndexType::kBinarySearch;
// Used as a bit mask of individual enums in which to skip an XF test point
int skip_policy = 0;
@ -617,6 +622,7 @@ class DBTestBase : public testing::Test {
kUniversalSubcompactions = 32,
kBlockBasedTableWithIndexRestartInterval = 33,
kBlockBasedTableWithPartitionedIndex = 34,
kPartitionedFilterWithNewTableReaderForCompactions = 35,
};
int option_config_;

View File

@ -184,8 +184,9 @@ BlockBasedFilterBlockReader::BlockBasedFilterBlockReader(
num_ = (n - 5 - last_word) / 4;
}
bool BlockBasedFilterBlockReader::KeyMayMatch(const Slice& key,
uint64_t block_offset) {
bool BlockBasedFilterBlockReader::KeyMayMatch(
const Slice& key, uint64_t block_offset, const bool no_io,
const Slice* const const_ikey_ptr) {
assert(block_offset != kNotValid);
if (!whole_key_filtering_) {
return true;
@ -193,8 +194,9 @@ bool BlockBasedFilterBlockReader::KeyMayMatch(const Slice& key,
return MayMatch(key, block_offset);
}
bool BlockBasedFilterBlockReader::PrefixMayMatch(const Slice& prefix,
uint64_t block_offset) {
bool BlockBasedFilterBlockReader::PrefixMayMatch(
const Slice& prefix, uint64_t block_offset, const bool no_io,
const Slice* const const_ikey_ptr) {
assert(block_offset != kNotValid);
if (!prefix_extractor_) {
return true;

View File

@ -81,10 +81,14 @@ class BlockBasedFilterBlockReader : public FilterBlockReader {
bool whole_key_filtering,
BlockContents&& contents, Statistics* statistics);
virtual bool IsBlockBased() override { return true; }
virtual bool KeyMayMatch(const Slice& key,
uint64_t block_offset = kNotValid) override;
virtual bool PrefixMayMatch(const Slice& prefix,
uint64_t block_offset = kNotValid) override;
virtual bool KeyMayMatch(
const Slice& key, uint64_t block_offset = kNotValid,
const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) override;
virtual bool PrefixMayMatch(
const Slice& prefix, uint64_t block_offset = kNotValid,
const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) override;
virtual size_t ApproximateMemoryUsage() const override;
// convert this object to a human readable form

View File

@ -37,6 +37,7 @@
#include "table/get_context.h"
#include "table/internal_iterator.h"
#include "table/meta_blocks.h"
#include "table/partitioned_filter_block.h"
#include "table/persistent_cache_helper.h"
#include "table/sst_file_writer_collectors.h"
#include "table/two_level_iterator.h"
@ -149,7 +150,7 @@ Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
} // namespace
// Index that allows binary search lookup in a two-level index structure.
class PartitionIndexReader : public IndexReader {
class PartitionIndexReader : public IndexReader, public Cleanable {
public:
// Read the partition index from the file and create an instance for
// `PartitionIndexReader`.
@ -159,7 +160,8 @@ class PartitionIndexReader : public IndexReader {
const Footer& footer, const BlockHandle& index_handle,
const ImmutableCFOptions& ioptions,
const Comparator* comparator, IndexReader** index_reader,
const PersistentCacheOptions& cache_options) {
const PersistentCacheOptions& cache_options,
const int level) {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(
file, footer, ReadOptions(), index_handle, &index_block, ioptions,
@ -167,8 +169,9 @@ class PartitionIndexReader : public IndexReader {
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */);
if (s.ok()) {
*index_reader = new PartitionIndexReader(
table, comparator, std::move(index_block), ioptions.statistics);
*index_reader =
new PartitionIndexReader(table, comparator, std::move(index_block),
ioptions.statistics, level);
}
return s;
@ -177,10 +180,25 @@ class PartitionIndexReader : public IndexReader {
// return a two-level iterator: first level is on the partition index
virtual InternalIterator* NewIterator(BlockIter* iter = nullptr,
bool dont_care = true) override {
// Filters are already checked before seeking the index
const bool skip_filters = true;
const bool is_index = true;
Cleanable* block_cache_cleaner = nullptr;
const bool pin_cached_indexes =
level_ == 0 &&
table_->rep_->table_options.pin_l0_filter_and_index_blocks_in_cache;
if (pin_cached_indexes) {
// Keep partition indexes into the cache as long as the partition index
// reader object is alive
block_cache_cleaner = this;
}
return NewTwoLevelIterator(
new BlockBasedTable::BlockEntryIteratorState(table_, ReadOptions(),
false),
index_block_->NewIterator(comparator_, iter, true));
new BlockBasedTable::BlockEntryIteratorState(
table_, ReadOptions(), skip_filters, is_index, block_cache_cleaner),
index_block_->NewIterator(comparator_, nullptr, true));
// TODO(myabandeh): Update TwoLevelIterator to be able to make use of
// on-stack
// BlockIter while the state is on heap
}
virtual size_t size() const override { return index_block_->size(); }
@ -195,14 +213,17 @@ class PartitionIndexReader : public IndexReader {
private:
PartitionIndexReader(BlockBasedTable* table, const Comparator* comparator,
std::unique_ptr<Block>&& index_block, Statistics* stats)
std::unique_ptr<Block>&& index_block, Statistics* stats,
const int level)
: IndexReader(comparator, stats),
table_(table),
index_block_(std::move(index_block)) {
index_block_(std::move(index_block)),
level_(level) {
assert(index_block_ != nullptr);
}
BlockBasedTable* table_;
std::unique_ptr<Block> index_block_;
int level_;
};
// Index that allows binary search lookup for the first key of each block.
@ -555,14 +576,28 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// Find filter handle and filter type
if (rep->filter_policy) {
for (auto prefix : {kFullFilterBlockPrefix, kFilterBlockPrefix}) {
for (auto filter_type :
{Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter,
Rep::FilterType::kBlockFilter}) {
std::string prefix;
switch (filter_type) {
case Rep::FilterType::kFullFilter:
prefix = kFullFilterBlockPrefix;
break;
case Rep::FilterType::kPartitionedFilter:
prefix = kPartitionedFilterBlockPrefix;
break;
case Rep::FilterType::kBlockFilter:
prefix = kFilterBlockPrefix;
break;
default:
assert(0);
}
std::string filter_block_key = prefix;
filter_block_key.append(rep->filter_policy->Name());
if (FindMetaBlock(meta_iter.get(), filter_block_key, &rep->filter_handle)
.ok()) {
rep->filter_type = (prefix == kFullFilterBlockPrefix)
? Rep::FilterType::kFullFilter
: Rep::FilterType::kBlockFilter;
rep->filter_type = filter_type;
break;
}
}
@ -697,6 +732,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
level == 0) {
rep->filter_entry = filter_entry;
rep->filter_entry.value->SetLevel(level);
} else {
filter_entry.Release(table_options.block_cache.get());
}
@ -707,14 +743,19 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// pre-load these blocks, which will kept in member variables in Rep
// and with a same life-time as this table object.
IndexReader* index_reader = nullptr;
s = new_table->CreateIndexReader(&index_reader, meta_iter.get());
s = new_table->CreateIndexReader(&index_reader, meta_iter.get(), level);
if (s.ok()) {
rep->index_reader.reset(index_reader);
// Set filter block
if (rep->filter_policy) {
rep->filter.reset(new_table->ReadFilter(rep));
const bool is_a_filter_partition = true;
rep->filter.reset(
new_table->ReadFilter(rep->filter_handle, !is_a_filter_partition));
if (rep->filter.get()) {
rep->filter->SetLevel(level);
}
}
} else {
delete index_reader;
@ -798,7 +839,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
Cache* block_cache, Cache* block_cache_compressed,
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit) {
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index) {
Status s;
Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
@ -806,9 +848,10 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Lookup uncompressed cache first
if (block_cache != nullptr) {
block->cache_handle =
GetEntryFromCache(block_cache, block_cache_key, BLOCK_CACHE_DATA_MISS,
BLOCK_CACHE_DATA_HIT, statistics);
block->cache_handle = GetEntryFromCache(
block_cache, block_cache_key,
is_index ? BLOCK_CACHE_INDEX_MISS : BLOCK_CACHE_DATA_MISS,
is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT, statistics);
if (block->cache_handle != nullptr) {
block->value =
reinterpret_cast<Block*>(block_cache->Value(block->cache_handle));
@ -860,9 +903,15 @@ Status BlockBasedTable::GetDataBlockFromCache(
&DeleteCachedEntry<Block>, &(block->cache_handle));
if (s.ok()) {
RecordTick(statistics, BLOCK_CACHE_ADD);
if (is_index) {
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT,
block->value->usable_size());
} else {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT,
block->value->usable_size());
}
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
block->value->usable_size());
} else {
@ -883,7 +932,8 @@ Status BlockBasedTable::PutDataBlockToCache(
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit) {
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
Cache::Priority priority) {
assert(raw_block->compression_type() == kNoCompression ||
block_cache_compressed != nullptr);
@ -929,15 +979,21 @@ Status BlockBasedTable::PutDataBlockToCache(
// insert into uncompressed block cache
assert((block->value->compression_type() == kNoCompression));
if (block_cache != nullptr && block->value->cachable()) {
s = block_cache->Insert(block_cache_key, block->value,
block->value->usable_size(),
&DeleteCachedEntry<Block>, &(block->cache_handle));
s = block_cache->Insert(
block_cache_key, block->value, block->value->usable_size(),
&DeleteCachedEntry<Block>, &(block->cache_handle), priority);
if (s.ok()) {
assert(block->cache_handle != nullptr);
RecordTick(statistics, BLOCK_CACHE_ADD);
if (is_index) {
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT,
block->value->usable_size());
} else {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT,
block->value->usable_size());
}
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
block->value->usable_size());
assert(reinterpret_cast<Block*>(
@ -952,7 +1008,9 @@ Status BlockBasedTable::PutDataBlockToCache(
return s;
}
FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep) const {
FilterBlockReader* BlockBasedTable::ReadFilter(
const BlockHandle& filter_handle, const bool is_a_filter_partition) const {
auto& rep = rep_;
// TODO: We might want to unify with ReadBlockFromFile() if we start
// requiring checksum verification in Table::Open.
if (rep->filter_type == Rep::FilterType::kNoFilter) {
@ -960,7 +1018,7 @@ FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep) const {
}
BlockContents block;
if (!ReadBlockContents(rep->file.get(), rep->footer, ReadOptions(),
rep->filter_handle, &block, rep->ioptions,
filter_handle, &block, rep->ioptions,
false /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options)
.ok()) {
@ -970,35 +1028,60 @@ FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep) const {
assert(rep->filter_policy);
if (rep->filter_type == Rep::FilterType::kBlockFilter) {
auto filter_type = rep->filter_type;
if (rep->filter_type == Rep::FilterType::kPartitionedFilter &&
is_a_filter_partition) {
filter_type = Rep::FilterType::kFullFilter;
}
switch (filter_type) {
case Rep::FilterType::kPartitionedFilter: {
return new PartitionedFilterBlockReader(
rep->prefix_filtering ? rep->ioptions.prefix_extractor : nullptr,
rep->whole_key_filtering, std::move(block), nullptr,
rep->ioptions.statistics, rep->internal_comparator, this);
}
case Rep::FilterType::kBlockFilter:
return new BlockBasedFilterBlockReader(
rep->prefix_filtering ? rep->ioptions.prefix_extractor : nullptr,
rep->table_options, rep->whole_key_filtering, std::move(block),
rep->ioptions.statistics);
} else if (rep->filter_type == Rep::FilterType::kFullFilter) {
case Rep::FilterType::kFullFilter: {
auto filter_bits_reader =
rep->filter_policy->GetFilterBitsReader(block.data);
if (filter_bits_reader != nullptr) {
assert(filter_bits_reader != nullptr);
return new FullFilterBlockReader(
rep->prefix_filtering ? rep->ioptions.prefix_extractor : nullptr,
rep->whole_key_filtering, std::move(block), filter_bits_reader,
rep->ioptions.statistics);
}
}
default:
// filter_type is either kNoFilter (exited the function at the first if),
// kBlockFilter or kFullFilter. there is no way for the execution to come here
// or it must be covered in this switch block
assert(false);
return nullptr;
}
}
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
bool no_io) const {
const BlockHandle& filter_blk_handle = rep_->filter_handle;
const bool is_a_filter_partition = true;
return GetFilter(filter_blk_handle, !is_a_filter_partition, no_io);
}
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
const BlockHandle& filter_blk_handle, const bool is_a_filter_partition,
bool no_io) const {
// If cache_index_and_filter_blocks is false, filter should be pre-populated.
// We will return rep_->filter anyway. rep_->filter can be nullptr if filter
// read fails at Open() time. We don't want to reload again since it will
// most probably fail again.
if (!rep_->table_options.cache_index_and_filter_blocks) {
if (!is_a_filter_partition &&
!rep_->table_options.cache_index_and_filter_blocks) {
return {rep_->filter.get(), nullptr /* cache handle */};
}
@ -1008,8 +1091,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
return {nullptr /* filter */, nullptr /* cache handle */};
}
// we have a pinned filter block
if (rep_->filter_entry.IsSet()) {
if (!is_a_filter_partition && rep_->filter_entry.IsSet()) {
return rep_->filter_entry;
}
@ -1018,8 +1100,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
// Fetching from the cache
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
rep_->footer.metaindex_handle(),
cache_key);
filter_blk_handle, cache_key);
Statistics* statistics = rep_->ioptions.statistics;
auto cache_handle =
@ -1034,7 +1115,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
// Do not invoke any io.
return CachableEntry<FilterBlockReader>();
} else {
filter = ReadFilter(rep_);
filter = ReadFilter(filter_blk_handle, is_a_filter_partition);
if (filter != nullptr) {
assert(filter->size() > 0);
Status s = block_cache->Insert(
@ -1074,7 +1155,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
PERF_TIMER_GUARD(read_index_block_nanos);
bool no_io = read_options.read_tier == kBlockCacheTier;
const bool no_io = read_options.read_tier == kBlockCacheTier;
Cache* block_cache = rep_->table_options.block_cache.get();
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key =
@ -1153,29 +1234,36 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
return iter;
}
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
InternalIterator* BlockBasedTable::NewDataBlockIterator(
Rep* rep, const ReadOptions& ro, const Slice& index_value,
BlockIter* input_iter) {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep->table_options.block_cache.get();
CachableEntry<Block> block;
BlockIter* input_iter, bool is_index) {
BlockHandle handle;
Slice input = index_value;
// We intentionally allow extra stuff in index_value so that we
// can add more features in the future.
Status s = handle.DecodeFrom(&input);
return NewDataBlockIterator(rep, ro, handle, input_iter, is_index, s);
}
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
InternalIterator* BlockBasedTable::NewDataBlockIterator(
Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
BlockIter* input_iter, bool is_index, Status s) {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep->table_options.block_cache.get();
CachableEntry<Block> block;
Slice compression_dict;
if (s.ok()) {
if (rep->compression_dict_block) {
compression_dict = rep->compression_dict_block->data;
}
s = MaybeLoadDataBlockToCache(rep, ro, handle, compression_dict, &block);
s = MaybeLoadDataBlockToCache(rep, ro, handle, compression_dict, &block,
is_index);
}
// Didn't get any data from block caches.
@ -1224,7 +1312,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
Status BlockBasedTable::MaybeLoadDataBlockToCache(
Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
Slice compression_dict, CachableEntry<Block>* block_entry) {
Slice compression_dict, CachableEntry<Block>* block_entry, bool is_index) {
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep->table_options.block_cache.get();
Cache* block_cache_compressed =
@ -1254,7 +1342,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
s = GetDataBlockFromCache(
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
block_entry, rep->table_options.format_version, compression_dict,
rep->table_options.read_amp_bytes_per_bit);
rep->table_options.read_amp_bytes_per_bit, is_index);
if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
std::unique_ptr<Block> raw_block;
@ -1271,7 +1359,13 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
s = PutDataBlockToCache(
key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions,
block_entry, raw_block.release(), rep->table_options.format_version,
compression_dict, rep->table_options.read_amp_bytes_per_bit);
compression_dict, rep->table_options.read_amp_bytes_per_bit,
is_index,
is_index &&
rep->table_options
.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH
: Cache::Priority::LOW);
}
}
}
@ -1279,17 +1373,39 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
}
BlockBasedTable::BlockEntryIteratorState::BlockEntryIteratorState(
BlockBasedTable* table, const ReadOptions& read_options, bool skip_filters)
BlockBasedTable* table, const ReadOptions& read_options, bool skip_filters,
bool is_index, Cleanable* block_cache_cleaner)
: TwoLevelIteratorState(table->rep_->ioptions.prefix_extractor != nullptr),
table_(table),
read_options_(read_options),
skip_filters_(skip_filters) {}
skip_filters_(skip_filters),
is_index_(is_index),
block_cache_cleaner_(block_cache_cleaner) {}
InternalIterator*
BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator(
const Slice& index_value) {
// Return a block iterator on the index partition
return NewDataBlockIterator(table_->rep_, read_options_, index_value);
BlockHandle handle;
Slice input = index_value;
Status s = handle.DecodeFrom(&input);
auto iter = NewDataBlockIterator(table_->rep_, read_options_, handle, nullptr,
is_index_, s);
if (block_cache_cleaner_) {
uint64_t offset = handle.offset();
{
ReadLock rl(&cleaner_mu);
if (cleaner_set.find(offset) != cleaner_set.end()) {
// already have a refernce to the block cache objects
return iter;
}
}
WriteLock wl(&cleaner_mu);
cleaner_set.insert(offset);
// Keep the data into cache until the cleaner cleansup
iter->DelegateCleanupsTo(block_cache_cleaner_);
}
return iter;
}
bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch(
@ -1325,25 +1441,29 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
return true;
}
auto prefix = rep_->ioptions.prefix_extractor->Transform(user_key);
InternalKey internal_key_prefix(prefix, kMaxSequenceNumber, kTypeValue);
auto internal_prefix = internal_key_prefix.Encode();
bool may_match = true;
Status s;
// First, try check with full filter
const bool no_io = true;
auto filter_entry = GetFilter(no_io);
FilterBlockReader* filter = filter_entry.value;
if (filter != nullptr) {
if (!filter->IsBlockBased()) {
const Slice* const const_ikey_ptr = &internal_key;
may_match =
filter->PrefixMayMatch(prefix, kNotValid, no_io, const_ikey_ptr);
} else {
InternalKey internal_key_prefix(prefix, kMaxSequenceNumber, kTypeValue);
auto internal_prefix = internal_key_prefix.Encode();
// To prevent any io operation in this method, we set `read_tier` to make
// sure we always read index or filter only when they have already been
// loaded to memory.
ReadOptions no_io_read_options;
no_io_read_options.read_tier = kBlockCacheTier;
// First, try check with full filter
auto filter_entry = GetFilter(true /* no io */);
FilterBlockReader* filter = filter_entry.value;
if (filter != nullptr) {
if (!filter->IsBlockBased()) {
may_match = filter->PrefixMayMatch(prefix);
} else {
// Then, try find it within each block
unique_ptr<InternalIterator> iiter(NewIndexIterator(no_io_read_options));
iiter->Seek(internal_prefix);
@ -1438,20 +1558,23 @@ InternalIterator* BlockBasedTable::NewRangeTombstoneIterator(
bool BlockBasedTable::FullFilterKeyMayMatch(const ReadOptions& read_options,
FilterBlockReader* filter,
const Slice& internal_key) const {
const Slice& internal_key,
const bool no_io) const {
if (filter == nullptr || filter->IsBlockBased()) {
return true;
}
Slice user_key = ExtractUserKey(internal_key);
const Slice* const const_ikey_ptr = &internal_key;
if (filter->whole_key_filtering()) {
return filter->KeyMayMatch(user_key);
return filter->KeyMayMatch(user_key, kNotValid, no_io, const_ikey_ptr);
}
if (!read_options.total_order_seek && rep_->ioptions.prefix_extractor &&
rep_->table_properties->prefix_extractor_name.compare(
rep_->ioptions.prefix_extractor->Name()) == 0 &&
rep_->ioptions.prefix_extractor->InDomain(user_key) &&
!filter->PrefixMayMatch(
rep_->ioptions.prefix_extractor->Transform(user_key))) {
rep_->ioptions.prefix_extractor->Transform(user_key), kNotValid,
false, const_ikey_ptr)) {
return false;
}
return true;
@ -1460,6 +1583,7 @@ bool BlockBasedTable::FullFilterKeyMayMatch(const ReadOptions& read_options,
Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
GetContext* get_context, bool skip_filters) {
Status s;
const bool no_io = read_options.read_tier == kBlockCacheTier;
CachableEntry<FilterBlockReader> filter_entry;
if (!skip_filters) {
filter_entry = GetFilter(read_options.read_tier == kBlockCacheTier);
@ -1468,14 +1592,14 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
// First check the full filter
// If full filter not useful, Then go into each block
if (!FullFilterKeyMayMatch(read_options, filter, key)) {
if (!FullFilterKeyMayMatch(read_options, filter, key, no_io)) {
RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL);
} else {
BlockIter iiter_on_stack;
auto iiter = NewIndexIterator(read_options, &iiter_on_stack);
std::unique_ptr<InternalIterator> iiter_unique_ptr;
if (iiter != &iiter_on_stack) {
iiter_unique_ptr = std::unique_ptr<InternalIterator>(iiter);
iiter_unique_ptr.reset(iiter);
}
bool done = false;
@ -1486,7 +1610,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
bool not_exist_in_filter =
filter != nullptr && filter->IsBlockBased() == true &&
handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(ExtractUserKey(key), handle.offset());
!filter->KeyMayMatch(ExtractUserKey(key), handle.offset(), no_io);
if (not_exist_in_filter) {
// Not found
@ -1525,6 +1649,10 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
}
s = biter.status();
}
if (done) {
// Avoid the extra Next which is expensive in two-level indexes
break;
}
}
if (s.ok()) {
s = iiter->status();
@ -1631,7 +1759,8 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
// 4. internal_comparator
// 5. index_type
Status BlockBasedTable::CreateIndexReader(
IndexReader** index_reader, InternalIterator* preloaded_meta_index_iter) {
IndexReader** index_reader, InternalIterator* preloaded_meta_index_iter,
int level) {
// Some old version of block-based tables don't have index type present in
// table properties. If that's the case we can safely use the kBinarySearch.
auto index_type_on_file = BlockBasedTableOptions::kBinarySearch;
@ -1660,7 +1789,7 @@ Status BlockBasedTable::CreateIndexReader(
case BlockBasedTableOptions::kTwoLevelIndexSearch: {
return PartitionIndexReader::Create(
this, file, footer, footer.index_handle(), rep_->ioptions, comparator,
index_reader, rep_->persistent_cache_options);
index_reader, rep_->persistent_cache_options, level);
}
case BlockBasedTableOptions::kBinarySearch: {
return BinarySearchIndexReader::Create(

View File

@ -11,6 +11,7 @@
#include <stdint.h>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
@ -193,17 +194,29 @@ class BlockBasedTable : public TableReader {
class BlockEntryIteratorState;
private:
friend class PartitionIndexReader;
protected:
template <class TValue>
struct CachableEntry;
struct Rep;
Rep* rep_;
explicit BlockBasedTable(Rep* rep)
: rep_(rep), compaction_optimized_(false) {}
private:
bool compaction_optimized_;
// input_iter: if it is not null, update this one and return it as Iterator
static InternalIterator* NewDataBlockIterator(
Rep* rep, const ReadOptions& ro, const Slice& index_value,
BlockIter* input_iter = nullptr);
static InternalIterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro,
const Slice& index_value,
BlockIter* input_iter = nullptr,
bool is_index = false);
static InternalIterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro,
const BlockHandle& block_hanlde,
BlockIter* input_iter = nullptr,
bool is_index = false,
Status s = Status());
// If block cache enabled (compressed or uncompressed), looks for the block
// identified by handle in (1) uncompressed cache, (2) compressed cache, and
// then (3) file. If found, inserts into the cache(s) that were searched
@ -213,14 +226,19 @@ class BlockBasedTable : public TableReader {
// @param block_entry value is set to the uncompressed block if found. If
// in uncompressed block cache, also sets cache_handle to reference that
// block.
static Status MaybeLoadDataBlockToCache(
Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
Slice compression_dict, CachableEntry<Block>* block_entry);
static Status MaybeLoadDataBlockToCache(Rep* rep, const ReadOptions& ro,
const BlockHandle& handle,
Slice compression_dict,
CachableEntry<Block>* block_entry,
bool is_index = false);
// For the following two functions:
// if `no_io == true`, we will not try to read filter/index from sst file
// were they not present in cache yet.
CachableEntry<FilterBlockReader> GetFilter(bool no_io = false) const;
virtual CachableEntry<FilterBlockReader> GetFilter(
const BlockHandle& filter_blk_handle, const bool is_a_filter_partition,
bool no_io) const;
// Get the iterator from the index reader.
// If input_iter is not set, return new Iterator
@ -247,7 +265,8 @@ class BlockBasedTable : public TableReader {
Cache* block_cache, Cache* block_cache_compressed,
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit);
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false);
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then
@ -264,7 +283,8 @@ class BlockBasedTable : public TableReader {
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit);
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.
@ -280,24 +300,23 @@ class BlockBasedTable : public TableReader {
// helps avoid re-reading meta index block if caller already created one.
Status CreateIndexReader(
IndexReader** index_reader,
InternalIterator* preloaded_meta_index_iter = nullptr);
InternalIterator* preloaded_meta_index_iter = nullptr,
const int level = -1);
bool FullFilterKeyMayMatch(const ReadOptions& read_options,
FilterBlockReader* filter,
const Slice& user_key) const;
FilterBlockReader* filter, const Slice& user_key,
const bool no_io) const;
// Read the meta block from sst.
static Status ReadMetaBlock(Rep* rep, std::unique_ptr<Block>* meta_block,
std::unique_ptr<InternalIterator>* iter);
// Create the filter from the filter block.
FilterBlockReader* ReadFilter(Rep* rep) const;
FilterBlockReader* ReadFilter(const BlockHandle& filter_handle,
const bool is_a_filter_partition) const;
static void SetupCacheKeyPrefix(Rep* rep, uint64_t file_size);
explicit BlockBasedTable(Rep* rep)
: rep_(rep), compaction_optimized_(false) {}
// Generate a cache key prefix from the file
static void GenerateCachePrefix(Cache* cc,
RandomAccessFile* file, char* buffer, size_t* size);
@ -313,13 +332,18 @@ class BlockBasedTable : public TableReader {
// No copying allowed
explicit BlockBasedTable(const TableReader&) = delete;
void operator=(const TableReader&) = delete;
friend class PartitionedFilterBlockReader;
friend class PartitionedFilterBlockTest;
};
// Maitaning state of a two-level iteration on a partitioned index structure
class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
public:
BlockEntryIteratorState(BlockBasedTable* table,
const ReadOptions& read_options, bool skip_filters);
const ReadOptions& read_options, bool skip_filters,
bool is_index = false,
Cleanable* block_cache_cleaner = nullptr);
InternalIterator* NewSecondaryIterator(const Slice& index_value) override;
bool PrefixMayMatch(const Slice& internal_key) override;
@ -328,6 +352,11 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
BlockBasedTable* table_;
const ReadOptions read_options_;
bool skip_filters_;
// true if the 2nd level iterator is on indexes instead of on user data.
bool is_index_;
Cleanable* block_cache_cleaner_;
std::set<uint64_t> cleaner_set;
port::RWMutex cleaner_mu;
};
// CachableEntry represents the entries that *may* be fetched from block cache.

View File

@ -82,16 +82,35 @@ class FilterBlockReader {
virtual ~FilterBlockReader() {}
virtual bool IsBlockBased() = 0; // If is blockbased filter
virtual bool KeyMayMatch(const Slice& key,
uint64_t block_offset = kNotValid) = 0;
/**
* If no_io is set, then it returns true if it cannot answer the query without
* reading data from disk. This is used in PartitionedFilterBlockReader to
* avoid reading partitions that are not in block cache already
*
* Normally filters are built on only the user keys and the InternalKey is not
* needed for a query. The index in PartitionedFilterBlockReader however is
* built upon InternalKey and must be provided via const_ikey_ptr when running
* queries.
*/
virtual bool KeyMayMatch(const Slice& key, uint64_t block_offset = kNotValid,
const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) = 0;
/**
* no_io and const_ikey_ptr here means the same as in KeyMayMatch
*/
virtual bool PrefixMayMatch(const Slice& prefix,
uint64_t block_offset = kNotValid) = 0;
uint64_t block_offset = kNotValid,
const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) = 0;
virtual size_t ApproximateMemoryUsage() const = 0;
virtual size_t size() const { return size_; }
virtual Statistics* statistics() const { return statistics_; }
bool whole_key_filtering() const { return whole_key_filtering_; }
int GetLevel() const { return level_; }
void SetLevel(int level) { level_ = level; }
// convert this object to a human readable form
virtual std::string ToString() const {
std::string error_msg("Unsupported filter \n");
@ -107,6 +126,7 @@ class FilterBlockReader {
void operator=(const FilterBlockReader&);
size_t size_;
Statistics* statistics_;
int level_ = -1;
};
} // namespace rocksdb

View File

@ -73,8 +73,9 @@ FullFilterBlockReader::FullFilterBlockReader(
block_contents_ = std::move(contents);
}
bool FullFilterBlockReader::KeyMayMatch(const Slice& key,
uint64_t block_offset) {
bool FullFilterBlockReader::KeyMayMatch(const Slice& key, uint64_t block_offset,
const bool no_io,
const Slice* const const_ikey_ptr) {
assert(block_offset == kNotValid);
if (!whole_key_filtering_) {
return true;
@ -83,7 +84,9 @@ bool FullFilterBlockReader::KeyMayMatch(const Slice& key,
}
bool FullFilterBlockReader::PrefixMayMatch(const Slice& prefix,
uint64_t block_offset) {
uint64_t block_offset,
const bool no_io,
const Slice* const const_ikey_ptr) {
assert(block_offset == kNotValid);
if (!prefix_extractor_) {
return true;

View File

@ -91,10 +91,14 @@ class FullFilterBlockReader : public FilterBlockReader {
~FullFilterBlockReader() {}
virtual bool IsBlockBased() override { return false; }
virtual bool KeyMayMatch(const Slice& key,
uint64_t block_offset = kNotValid) override;
virtual bool PrefixMayMatch(const Slice& prefix,
uint64_t block_offset = kNotValid) override;
virtual bool KeyMayMatch(
const Slice& key, uint64_t block_offset = kNotValid,
const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) override;
virtual bool PrefixMayMatch(
const Slice& prefix, uint64_t block_offset = kNotValid,
const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) override;
virtual size_t ApproximateMemoryUsage() const override;
private:

View File

@ -5,7 +5,12 @@
#include "table/partitioned_filter_block.h"
#include <utility>
#include "port/port.h"
#include "rocksdb/filter_policy.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
#include "util/coding.h"
namespace rocksdb {
@ -67,4 +72,149 @@ Slice PartitionedFilterBlockBuilder::Finish(
}
}
PartitionedFilterBlockReader::PartitionedFilterBlockReader(
const SliceTransform* prefix_extractor, bool _whole_key_filtering,
BlockContents&& contents, FilterBitsReader* filter_bits_reader,
Statistics* stats, const Comparator& comparator,
const BlockBasedTable* table)
: FilterBlockReader(contents.data.size(), stats, _whole_key_filtering),
prefix_extractor_(prefix_extractor),
comparator_(comparator),
table_(table) {
idx_on_fltr_blk_.reset(new Block(std::move(contents),
kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */, stats));
}
PartitionedFilterBlockReader::~PartitionedFilterBlockReader() {
ReadLock rl(&mu_);
for (auto it = handle_list_.begin(); it != handle_list_.end(); ++it) {
table_->rep_->table_options.block_cache.get()->Release(*it);
}
}
bool PartitionedFilterBlockReader::KeyMayMatch(
const Slice& key, uint64_t block_offset, const bool no_io,
const Slice* const const_ikey_ptr) {
assert(const_ikey_ptr != nullptr);
assert(block_offset == kNotValid);
if (!whole_key_filtering_) {
return true;
}
if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) {
return true;
}
// This is the user key vs. the full key in the partition index. We assume
// that user key <= full key
auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr);
if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range
return false;
}
bool cached = false;
auto filter_partition = GetFilterPartition(&filter_handle, no_io, &cached);
if (UNLIKELY(!filter_partition.value)) {
return true;
}
auto res = filter_partition.value->KeyMayMatch(key, block_offset, no_io);
if (cached) {
return res;
}
if (LIKELY(filter_partition.IsSet())) {
filter_partition.Release(table_->rep_->table_options.block_cache.get());
} else {
delete filter_partition.value;
}
return res;
}
bool PartitionedFilterBlockReader::PrefixMayMatch(
const Slice& prefix, uint64_t block_offset, const bool no_io,
const Slice* const const_ikey_ptr) {
assert(const_ikey_ptr != nullptr);
assert(block_offset == kNotValid);
if (!prefix_extractor_) {
return true;
}
if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) {
return true;
}
auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr);
if (UNLIKELY(filter_handle.size() == 0)) { // prefix is out of range
return false;
}
bool cached = false;
auto filter_partition = GetFilterPartition(&filter_handle, no_io, &cached);
if (UNLIKELY(!filter_partition.value)) {
return true;
}
auto res = filter_partition.value->PrefixMayMatch(prefix, kNotValid, no_io);
if (cached) {
return res;
}
if (LIKELY(filter_partition.IsSet())) {
filter_partition.Release(table_->rep_->table_options.block_cache.get());
} else {
delete filter_partition.value;
}
return res;
}
Slice PartitionedFilterBlockReader::GetFilterPartitionHandle(
const Slice& entry) {
BlockIter iter;
idx_on_fltr_blk_->NewIterator(&comparator_, &iter, true);
iter.Seek(entry);
if (UNLIKELY(!iter.Valid())) {
return Slice();
}
assert(iter.Valid());
Slice handle_value = iter.value();
return handle_value;
}
BlockBasedTable::CachableEntry<FilterBlockReader>
PartitionedFilterBlockReader::GetFilterPartition(Slice* handle_value,
const bool no_io,
bool* cached) {
BlockHandle fltr_blk_handle;
auto s = fltr_blk_handle.DecodeFrom(handle_value);
assert(s.ok());
const bool is_a_filter_partition = true;
auto block_cache = table_->rep_->table_options.block_cache.get();
if (LIKELY(block_cache != nullptr)) {
bool pin_cached_filters =
GetLevel() == 0 &&
table_->rep_->table_options.pin_l0_filter_and_index_blocks_in_cache;
if (pin_cached_filters) {
ReadLock rl(&mu_);
auto iter = filter_cache_.find(fltr_blk_handle.offset());
if (iter != filter_cache_.end()) {
RecordTick(statistics(), BLOCK_CACHE_FILTER_HIT);
*cached = true;
return {iter->second, nullptr};
}
}
auto filter =
table_->GetFilter(fltr_blk_handle, is_a_filter_partition, no_io);
if (pin_cached_filters && filter.IsSet()) {
WriteLock wl(&mu_);
std::pair<uint64_t, FilterBlockReader*> pair(fltr_blk_handle.offset(),
filter.value);
auto succ = filter_cache_.insert(pair).second;
if (succ) {
handle_list_.push_back(filter.cache_handle);
} // Otherwise it is already inserted by a concurrent thread
*cached = true;
}
return filter;
} else {
auto filter = table_->ReadFilter(fltr_blk_handle, is_a_filter_partition);
return {filter, nullptr};
}
}
size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const {
return idx_on_fltr_blk_->size();
}
} // namespace rocksdb

View File

@ -7,14 +7,17 @@
#include <list>
#include <string>
#include <vector>
#include <unordered_map>
#include "db/dbformat.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
#include "table/full_filter_block.h"
#include "table/index_builder.h"
#include "util/autovector.h"
namespace rocksdb {
@ -49,4 +52,40 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder {
PartitionedIndexBuilder* const p_index_builder_;
};
class PartitionedFilterBlockReader : public FilterBlockReader {
public:
explicit PartitionedFilterBlockReader(const SliceTransform* prefix_extractor,
bool whole_key_filtering,
BlockContents&& contents,
FilterBitsReader* filter_bits_reader,
Statistics* stats,
const Comparator& comparator,
const BlockBasedTable* table);
virtual ~PartitionedFilterBlockReader();
virtual bool IsBlockBased() override { return false; }
virtual bool KeyMayMatch(
const Slice& key, uint64_t block_offset = kNotValid,
const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) override;
virtual bool PrefixMayMatch(
const Slice& prefix, uint64_t block_offset = kNotValid,
const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) override;
virtual size_t ApproximateMemoryUsage() const override;
private:
Slice GetFilterPartitionHandle(const Slice& entry);
BlockBasedTable::CachableEntry<FilterBlockReader> GetFilterPartition(
Slice* handle, const bool no_io, bool* cached);
const SliceTransform* prefix_extractor_;
std::unique_ptr<Block> idx_on_fltr_blk_;
const Comparator& comparator_;
const BlockBasedTable* table_;
std::unordered_map<uint64_t, FilterBlockReader*> filter_cache_;
autovector<Cache::Handle*> handle_list_;
port::RWMutex mu_;
};
} // namespace rocksdb

View File

@ -0,0 +1,242 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <map>
#include "rocksdb/filter_policy.h"
#include "table/partitioned_filter_block.h"
#include "util/coding.h"
#include "util/hash.h"
#include "util/logging.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
std::map<uint64_t, Slice> slices;
class MockedBlockBasedTable : public BlockBasedTable {
public:
explicit MockedBlockBasedTable(Rep* rep) : BlockBasedTable(rep) {}
virtual CachableEntry<FilterBlockReader> GetFilter(
const BlockHandle& filter_blk_handle, const bool is_a_filter_partition,
bool no_io) const override {
Slice slice = slices[filter_blk_handle.offset()];
auto obj = new FullFilterBlockReader(
nullptr, true, BlockContents(slice, false, kNoCompression),
rep_->table_options.filter_policy->GetFilterBitsReader(slice), nullptr);
return {obj, nullptr};
}
};
class PartitionedFilterBlockTest : public testing::Test {
public:
BlockBasedTableOptions table_options_;
InternalKeyComparator icomp = InternalKeyComparator(BytewiseComparator());
PartitionedFilterBlockTest() {
table_options_.filter_policy.reset(NewBloomFilterPolicy(10, false));
table_options_.no_block_cache = true; // Otherwise BlockBasedTable::Close
// will access variable that are not
// initialized in our mocked version
}
std::shared_ptr<Cache> cache_;
~PartitionedFilterBlockTest() {}
const std::string keys[4] = {"afoo", "bar", "box", "hello"};
const std::string missing_keys[2] = {"missing", "other"};
int last_offset = 10;
BlockHandle Write(const Slice& slice) {
BlockHandle bh(last_offset + 1, slice.size());
slices[bh.offset()] = slice;
last_offset += bh.size();
return bh;
}
PartitionedIndexBuilder* NewIndexBuilder() {
return PartitionedIndexBuilder::CreateIndexBuilder(&icomp, table_options_);
}
PartitionedFilterBlockBuilder* NewBuilder(
PartitionedIndexBuilder* const p_index_builder) {
return new PartitionedFilterBlockBuilder(
nullptr, table_options_.whole_key_filtering,
table_options_.filter_policy->GetFilterBitsBuilder(),
table_options_.index_block_restart_interval, p_index_builder);
}
std::unique_ptr<MockedBlockBasedTable> table;
PartitionedFilterBlockReader* NewReader(
PartitionedFilterBlockBuilder* builder) {
BlockHandle bh;
Status status;
Slice slice;
do {
slice = builder->Finish(bh, &status);
bh = Write(slice);
} while (status.IsIncomplete());
const Options options;
const ImmutableCFOptions ioptions(options);
const EnvOptions env_options;
table.reset(new MockedBlockBasedTable(new BlockBasedTable::Rep(
ioptions, env_options, table_options_, icomp, false)));
auto reader = new PartitionedFilterBlockReader(
nullptr, true, BlockContents(slice, false, kNoCompression), nullptr,
nullptr, *icomp.user_comparator(), table.get());
return reader;
}
void VerifyReader(PartitionedFilterBlockBuilder* builder,
bool empty = false) {
std::unique_ptr<PartitionedFilterBlockReader> reader(NewReader(builder));
// Querying added keys
const bool no_io = true;
for (auto key : keys) {
auto ikey = InternalKey(key, 0, ValueType::kTypeValue);
const Slice ikey_slice = Slice(*ikey.rep());
ASSERT_TRUE(reader->KeyMayMatch(key, kNotValid, !no_io, &ikey_slice));
}
{
// querying a key twice
auto ikey = InternalKey(keys[0], 0, ValueType::kTypeValue);
const Slice ikey_slice = Slice(*ikey.rep());
ASSERT_TRUE(reader->KeyMayMatch(keys[0], kNotValid, !no_io, &ikey_slice));
}
// querying missing keys
for (auto key : missing_keys) {
auto ikey = InternalKey(key, 0, ValueType::kTypeValue);
const Slice ikey_slice = Slice(*ikey.rep());
if (empty) {
ASSERT_TRUE(reader->KeyMayMatch(key, kNotValid, !no_io, &ikey_slice));
} else {
// assuming a good hash function
ASSERT_FALSE(reader->KeyMayMatch(key, kNotValid, !no_io, &ikey_slice));
}
}
}
void TestBlockPerKey() {
table_options_.index_per_partition = 1;
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
std::unique_ptr<PartitionedFilterBlockBuilder> builder(
NewBuilder(pib.get()));
int i = 0;
builder->Add(keys[i]);
CutABlock(pib.get(), keys[i], keys[i + 1]);
i++;
builder->Add(keys[i]);
CutABlock(pib.get(), keys[i], keys[i + 1]);
i++;
builder->Add(keys[i]);
builder->Add(keys[i]);
CutABlock(pib.get(), keys[i], keys[i + 1]);
i++;
builder->Add(keys[i]);
CutABlock(pib.get(), keys[i]);
VerifyReader(builder.get());
}
void TestBlockPerTwoKeys() {
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
std::unique_ptr<PartitionedFilterBlockBuilder> builder(
NewBuilder(pib.get()));
int i = 0;
builder->Add(keys[i]);
i++;
builder->Add(keys[i]);
CutABlock(pib.get(), keys[i], keys[i + 1]);
i++;
builder->Add(keys[i]);
builder->Add(keys[i]);
i++;
builder->Add(keys[i]);
CutABlock(pib.get(), keys[i]);
VerifyReader(builder.get());
}
void TestBlockPerAllKeys() {
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
std::unique_ptr<PartitionedFilterBlockBuilder> builder(
NewBuilder(pib.get()));
int i = 0;
builder->Add(keys[i]);
i++;
builder->Add(keys[i]);
i++;
builder->Add(keys[i]);
builder->Add(keys[i]);
i++;
builder->Add(keys[i]);
CutABlock(pib.get(), keys[i]);
VerifyReader(builder.get());
}
void CutABlock(PartitionedIndexBuilder* builder,
const std::string& user_key) {
// Assuming a block is cut, add an entry to the index
std::string key =
std::string(*InternalKey(user_key, 0, ValueType::kTypeValue).rep());
BlockHandle dont_care_block_handle(1, 1);
builder->AddIndexEntry(&key, nullptr, dont_care_block_handle);
}
void CutABlock(PartitionedIndexBuilder* builder, const std::string& user_key,
const std::string& next_user_key) {
// Assuming a block is cut, add an entry to the index
std::string key =
std::string(*InternalKey(user_key, 0, ValueType::kTypeValue).rep());
std::string next_key = std::string(
*InternalKey(next_user_key, 0, ValueType::kTypeValue).rep());
BlockHandle dont_care_block_handle(1, 1);
Slice slice = Slice(next_key.data(), next_key.size());
builder->AddIndexEntry(&key, &slice, dont_care_block_handle);
}
};
TEST_F(PartitionedFilterBlockTest, EmptyBuilder) {
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
std::unique_ptr<PartitionedFilterBlockBuilder> builder(NewBuilder(pib.get()));
const bool empty = true;
VerifyReader(builder.get(), empty);
}
TEST_F(PartitionedFilterBlockTest, OneBlock) {
int num_keys = sizeof(keys) / sizeof(*keys);
for (int i = 1; i < num_keys + 1; i++) {
table_options_.index_per_partition = i;
TestBlockPerAllKeys();
}
}
TEST_F(PartitionedFilterBlockTest, TwoBlocksPerKey) {
int num_keys = sizeof(keys) / sizeof(*keys);
for (int i = 1; i < num_keys + 1; i++) {
table_options_.index_per_partition = i;
TestBlockPerTwoKeys();
}
}
TEST_F(PartitionedFilterBlockTest, OneBlockPerKey) {
int num_keys = sizeof(keys) / sizeof(*keys);
for (int i = 1; i < num_keys + 1; i++) {
table_options_.index_per_partition = i;
TestBlockPerKey();
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}