Reduce heavy hitter for Get operation
Summary: This PR addresses the following heavy hitters in `Get` operation by moving calls to `StatisticsImpl::recordTick` from `BlockBasedTable` to `Version::Get` - rocksdb.block.cache.bytes.write - rocksdb.block.cache.add - rocksdb.block.cache.data.miss - rocksdb.block.cache.data.bytes.insert - rocksdb.block.cache.data.add - rocksdb.block.cache.hit - rocksdb.block.cache.data.hit - rocksdb.block.cache.bytes.read The db_bench statistics before and after the change are: |1GB block read|Children |Self |Command |Shared Object |Symbol| |---|---|---|---|---|---| |master: |4.22% |1.31% |db_bench |db_bench |[.] rocksdb::StatisticsImpl::recordTick| |updated: |0.51% |0.21% |db_bench |db_bench |[.] rocksdb::StatisticsImpl::recordTick| | |0.14% |0.14% |db_bench |db_bench |[.] rocksdb::GetContext::record_counters| |1MB block read|Children |Self |Command |Shared Object |Symbol| |---|---|---|---|---|---| |master: |3.48% |1.08% |db_bench |db_bench |[.] rocksdb::StatisticsImpl::recordTick| |updated: |0.80% |0.31% |db_bench |db_bench |[.] rocksdb::StatisticsImpl::recordTick| | |0.35% |0.35% |db_bench |db_bench |[.] rocksdb::GetContext::record_counters| Closes https://github.com/facebook/rocksdb/pull/3172 Differential Revision: D6330532 Pulled By: miasantreble fbshipit-source-id: 2b492959e00a3db29e9437ecdcc5e48ca4ec5741
This commit is contained in:
parent
243ca14116
commit
9196e80b15
@ -980,10 +980,12 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
|
||||
user_comparator(), internal_comparator());
|
||||
FdWithKeyRange* f = fp.GetNextFile();
|
||||
|
||||
while (f != nullptr) {
|
||||
if (get_context.sample()) {
|
||||
sample_file_read_inc(f->file_metadata);
|
||||
}
|
||||
|
||||
*status = table_cache_->Get(
|
||||
read_options, *internal_comparator(), f->fd, ikey, &get_context,
|
||||
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
|
||||
@ -995,10 +997,21 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
return;
|
||||
}
|
||||
|
||||
// report the counters before returning
|
||||
if (get_context.State() != GetContext::kNotFound &&
|
||||
get_context.State() != GetContext::kMerge) {
|
||||
for (uint32_t t = 0; t < Tickers::TICKER_ENUM_MAX; t++) {
|
||||
if (get_context.tickers_value[t] > 0) {
|
||||
RecordTick(db_statistics_, t, get_context.tickers_value[t]);
|
||||
}
|
||||
}
|
||||
}
|
||||
switch (get_context.State()) {
|
||||
case GetContext::kNotFound:
|
||||
// Keep searching in other files
|
||||
break;
|
||||
case GetContext::kMerge:
|
||||
break;
|
||||
case GetContext::kFound:
|
||||
if (fp.GetHitFileLevel() == 0) {
|
||||
RecordTick(db_statistics_, GET_HIT_L0);
|
||||
@ -1015,8 +1028,6 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
case GetContext::kCorrupt:
|
||||
*status = Status::Corruption("corrupted key for ", user_key);
|
||||
return;
|
||||
case GetContext::kMerge:
|
||||
break;
|
||||
case GetContext::kBlobIndex:
|
||||
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
|
||||
*status = Status::NotSupported(
|
||||
@ -1027,6 +1038,11 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
f = fp.GetNextFile();
|
||||
}
|
||||
|
||||
for (uint32_t t = 0; t < Tickers::TICKER_ENUM_MAX; t++) {
|
||||
if (get_context.tickers_value[t] > 0) {
|
||||
RecordTick(db_statistics_, t, get_context.tickers_value[t]);
|
||||
}
|
||||
}
|
||||
if (GetContext::kMerge == get_context.State()) {
|
||||
if (!merge_operator_) {
|
||||
*status = Status::InvalidArgument(
|
||||
|
@ -126,22 +126,37 @@ Slice GetCacheKeyFromOffset(const char* cache_key_prefix,
|
||||
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
|
||||
Tickers block_cache_miss_ticker,
|
||||
Tickers block_cache_hit_ticker,
|
||||
Statistics* statistics) {
|
||||
Statistics* statistics,
|
||||
GetContext* get_context) {
|
||||
auto cache_handle = block_cache->Lookup(key, statistics);
|
||||
if (cache_handle != nullptr) {
|
||||
PERF_COUNTER_ADD(block_cache_hit_count, 1);
|
||||
// overall cache hit
|
||||
RecordTick(statistics, BLOCK_CACHE_HIT);
|
||||
// total bytes read from cache
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_READ,
|
||||
block_cache->GetUsage(cache_handle));
|
||||
// block-type specific cache hit
|
||||
RecordTick(statistics, block_cache_hit_ticker);
|
||||
if (get_context != nullptr) {
|
||||
// overall cache hit
|
||||
get_context->RecordCounters(BLOCK_CACHE_HIT, 1);
|
||||
// total bytes read from cache
|
||||
get_context->RecordCounters(BLOCK_CACHE_BYTES_READ,
|
||||
block_cache->GetUsage(cache_handle));
|
||||
// block-type specific cache hit
|
||||
get_context->RecordCounters(block_cache_hit_ticker, 1);
|
||||
} else {
|
||||
// overall cache hit
|
||||
RecordTick(statistics, BLOCK_CACHE_HIT);
|
||||
// total bytes read from cache
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_READ,
|
||||
block_cache->GetUsage(cache_handle));
|
||||
RecordTick(statistics, block_cache_hit_ticker);
|
||||
}
|
||||
} else {
|
||||
// overall cache miss
|
||||
RecordTick(statistics, BLOCK_CACHE_MISS);
|
||||
// block-type specific cache miss
|
||||
RecordTick(statistics, block_cache_miss_ticker);
|
||||
if (get_context != nullptr) {
|
||||
// overall cache miss
|
||||
get_context->RecordCounters(BLOCK_CACHE_MISS, 1);
|
||||
// block-type specific cache miss
|
||||
get_context->RecordCounters(block_cache_miss_ticker, 1);
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_MISS);
|
||||
RecordTick(statistics, block_cache_miss_ticker);
|
||||
}
|
||||
}
|
||||
|
||||
return cache_handle;
|
||||
@ -253,9 +268,11 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
|
||||
compression_dict = rep->compression_dict_block->data;
|
||||
}
|
||||
const bool is_index = true;
|
||||
s = table_->MaybeLoadDataBlockToCache(prefetch_buffer.get(), rep, ro,
|
||||
handle, compression_dict, &block,
|
||||
is_index);
|
||||
// TODO: Support counter batch update for partitioned index and
|
||||
// filter blocks
|
||||
s = table_->MaybeLoadDataBlockToCache(
|
||||
prefetch_buffer.get(), rep, ro, handle, compression_dict, &block,
|
||||
is_index, nullptr /* get_context */);
|
||||
|
||||
assert(s.ok() || block.value == nullptr);
|
||||
if (s.ok() && block.value != nullptr) {
|
||||
@ -779,7 +796,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
|
||||
ReadOptions read_options;
|
||||
s = MaybeLoadDataBlockToCache(
|
||||
prefetch_buffer.get(), rep, read_options, rep->range_del_handle,
|
||||
Slice() /* compression_dict */, &rep->range_del_entry);
|
||||
Slice() /* compression_dict */, &rep->range_del_entry,
|
||||
false /* is_index */, nullptr /* get_context */);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_WARN(
|
||||
rep->ioptions.info_log,
|
||||
@ -955,8 +973,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
||||
Cache* block_cache, Cache* block_cache_compressed,
|
||||
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
|
||||
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
|
||||
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
|
||||
bool is_index) {
|
||||
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
|
||||
GetContext* get_context) {
|
||||
Status s;
|
||||
Block* compressed_block = nullptr;
|
||||
Cache::Handle* block_cache_compressed_handle = nullptr;
|
||||
@ -967,7 +985,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
||||
block->cache_handle = GetEntryFromCache(
|
||||
block_cache, block_cache_key,
|
||||
is_index ? BLOCK_CACHE_INDEX_MISS : BLOCK_CACHE_DATA_MISS,
|
||||
is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT, statistics);
|
||||
is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT, statistics,
|
||||
get_context);
|
||||
if (block->cache_handle != nullptr) {
|
||||
block->value =
|
||||
reinterpret_cast<Block*>(block_cache->Value(block->cache_handle));
|
||||
@ -1020,18 +1039,36 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
||||
block_cache->TEST_mark_as_data_block(block_cache_key,
|
||||
block->value->usable_size());
|
||||
if (s.ok()) {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
if (is_index) {
|
||||
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT,
|
||||
block->value->usable_size());
|
||||
if (get_context != nullptr) {
|
||||
get_context->RecordCounters(BLOCK_CACHE_ADD, 1);
|
||||
get_context->RecordCounters(BLOCK_CACHE_BYTES_WRITE,
|
||||
block->value->usable_size());
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT,
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
|
||||
block->value->usable_size());
|
||||
}
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
|
||||
block->value->usable_size());
|
||||
if (is_index) {
|
||||
if (get_context != nullptr) {
|
||||
get_context->RecordCounters(BLOCK_CACHE_INDEX_ADD, 1);
|
||||
get_context->RecordCounters(BLOCK_CACHE_INDEX_BYTES_INSERT,
|
||||
block->value->usable_size());
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT,
|
||||
block->value->usable_size());
|
||||
}
|
||||
} else {
|
||||
if (get_context != nullptr) {
|
||||
get_context->RecordCounters(BLOCK_CACHE_DATA_ADD, 1);
|
||||
get_context->RecordCounters(BLOCK_CACHE_DATA_BYTES_INSERT,
|
||||
block->value->usable_size());
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT,
|
||||
block->value->usable_size());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
|
||||
delete block->value;
|
||||
@ -1051,7 +1088,7 @@ Status BlockBasedTable::PutDataBlockToCache(
|
||||
const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
|
||||
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
|
||||
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
|
||||
Cache::Priority priority) {
|
||||
Cache::Priority priority, GetContext* get_context) {
|
||||
assert(raw_block->compression_type() == kNoCompression ||
|
||||
block_cache_compressed != nullptr);
|
||||
|
||||
@ -1104,18 +1141,36 @@ Status BlockBasedTable::PutDataBlockToCache(
|
||||
block->value->usable_size());
|
||||
if (s.ok()) {
|
||||
assert(block->cache_handle != nullptr);
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
if (is_index) {
|
||||
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT,
|
||||
block->value->usable_size());
|
||||
if (get_context != nullptr) {
|
||||
get_context->RecordCounters(BLOCK_CACHE_ADD, 1);
|
||||
get_context->RecordCounters(BLOCK_CACHE_BYTES_WRITE,
|
||||
block->value->usable_size());
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT,
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
|
||||
block->value->usable_size());
|
||||
}
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
|
||||
block->value->usable_size());
|
||||
if (is_index) {
|
||||
if (get_context != nullptr) {
|
||||
get_context->RecordCounters(BLOCK_CACHE_INDEX_ADD, 1);
|
||||
get_context->RecordCounters(BLOCK_CACHE_INDEX_BYTES_INSERT,
|
||||
block->value->usable_size());
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT,
|
||||
block->value->usable_size());
|
||||
}
|
||||
} else {
|
||||
if (get_context != nullptr) {
|
||||
get_context->RecordCounters(BLOCK_CACHE_DATA_ADD, 1);
|
||||
get_context->RecordCounters(BLOCK_CACHE_DATA_BYTES_INSERT,
|
||||
block->value->usable_size());
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT,
|
||||
block->value->usable_size());
|
||||
}
|
||||
}
|
||||
assert(reinterpret_cast<Block*>(
|
||||
block_cache->Value(block->cache_handle)) == block->value);
|
||||
} else {
|
||||
@ -1188,16 +1243,18 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
|
||||
}
|
||||
|
||||
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
|
||||
FilePrefetchBuffer* prefetch_buffer, bool no_io) const {
|
||||
FilePrefetchBuffer* prefetch_buffer, bool no_io,
|
||||
GetContext* get_context) const {
|
||||
const BlockHandle& filter_blk_handle = rep_->filter_handle;
|
||||
const bool is_a_filter_partition = true;
|
||||
return GetFilter(prefetch_buffer, filter_blk_handle, !is_a_filter_partition,
|
||||
no_io);
|
||||
no_io, get_context);
|
||||
}
|
||||
|
||||
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
|
||||
FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle,
|
||||
const bool is_a_filter_partition, bool no_io) const {
|
||||
const bool is_a_filter_partition, bool no_io,
|
||||
GetContext* get_context) const {
|
||||
// If cache_index_and_filter_blocks is false, filter should be pre-populated.
|
||||
// We will return rep_->filter anyway. rep_->filter can be nullptr if filter
|
||||
// read fails at Open() time. We don't want to reload again since it will
|
||||
@ -1227,7 +1284,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
|
||||
Statistics* statistics = rep_->ioptions.statistics;
|
||||
auto cache_handle =
|
||||
GetEntryFromCache(block_cache, key, BLOCK_CACHE_FILTER_MISS,
|
||||
BLOCK_CACHE_FILTER_HIT, statistics);
|
||||
BLOCK_CACHE_FILTER_HIT, statistics, get_context);
|
||||
|
||||
FilterBlockReader* filter = nullptr;
|
||||
if (cache_handle != nullptr) {
|
||||
@ -1247,10 +1304,19 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
|
||||
? Cache::Priority::HIGH
|
||||
: Cache::Priority::LOW);
|
||||
if (s.ok()) {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_FILTER_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT, filter->size());
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter->size());
|
||||
if (get_context != nullptr) {
|
||||
get_context->RecordCounters(BLOCK_CACHE_ADD, 1);
|
||||
get_context->RecordCounters(BLOCK_CACHE_BYTES_WRITE, filter->size());
|
||||
get_context->RecordCounters(BLOCK_CACHE_FILTER_ADD, 1);
|
||||
get_context->RecordCounters(BLOCK_CACHE_FILTER_BYTES_INSERT,
|
||||
filter->size());
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter->size());
|
||||
RecordTick(statistics, BLOCK_CACHE_FILTER_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT,
|
||||
filter->size());
|
||||
}
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
|
||||
delete filter;
|
||||
@ -1264,7 +1330,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
|
||||
|
||||
InternalIterator* BlockBasedTable::NewIndexIterator(
|
||||
const ReadOptions& read_options, BlockIter* input_iter,
|
||||
CachableEntry<IndexReader>* index_entry) {
|
||||
CachableEntry<IndexReader>* index_entry, GetContext* get_context) {
|
||||
// index reader has already been pre-populated.
|
||||
if (rep_->index_reader) {
|
||||
return rep_->index_reader->NewIterator(
|
||||
@ -1287,7 +1353,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
|
||||
Statistics* statistics = rep_->ioptions.statistics;
|
||||
auto cache_handle =
|
||||
GetEntryFromCache(block_cache, key, BLOCK_CACHE_INDEX_MISS,
|
||||
BLOCK_CACHE_INDEX_HIT, statistics);
|
||||
BLOCK_CACHE_INDEX_HIT, statistics, get_context);
|
||||
|
||||
if (cache_handle == nullptr && no_io) {
|
||||
if (input_iter != nullptr) {
|
||||
@ -1322,10 +1388,15 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
|
||||
|
||||
if (s.ok()) {
|
||||
size_t usable_size = index_reader->usable_size();
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
if (get_context != nullptr) {
|
||||
get_context->RecordCounters(BLOCK_CACHE_ADD, 1);
|
||||
get_context->RecordCounters(BLOCK_CACHE_BYTES_WRITE, usable_size);
|
||||
} else {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usable_size);
|
||||
}
|
||||
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
|
||||
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, usable_size);
|
||||
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usable_size);
|
||||
} else {
|
||||
if (index_reader != nullptr) {
|
||||
delete index_reader;
|
||||
@ -1359,13 +1430,14 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
|
||||
|
||||
InternalIterator* BlockBasedTable::NewDataBlockIterator(
|
||||
Rep* rep, const ReadOptions& ro, const Slice& index_value,
|
||||
BlockIter* input_iter, bool is_index) {
|
||||
BlockIter* input_iter, bool is_index, GetContext* get_context) {
|
||||
BlockHandle handle;
|
||||
Slice input = index_value;
|
||||
// We intentionally allow extra stuff in index_value so that we
|
||||
// can add more features in the future.
|
||||
Status s = handle.DecodeFrom(&input);
|
||||
return NewDataBlockIterator(rep, ro, handle, input_iter, is_index, s);
|
||||
return NewDataBlockIterator(rep, ro, handle, input_iter, is_index,
|
||||
get_context, s);
|
||||
}
|
||||
|
||||
// Convert an index iterator value (i.e., an encoded BlockHandle)
|
||||
@ -1374,7 +1446,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
|
||||
// If input_iter is not null, update this iter and return it
|
||||
InternalIterator* BlockBasedTable::NewDataBlockIterator(
|
||||
Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
|
||||
BlockIter* input_iter, bool is_index, Status s) {
|
||||
BlockIter* input_iter, bool is_index, GetContext* get_context, Status s) {
|
||||
PERF_TIMER_GUARD(new_table_block_iter_nanos);
|
||||
|
||||
const bool no_io = (ro.read_tier == kBlockCacheTier);
|
||||
@ -1386,7 +1458,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
|
||||
compression_dict = rep->compression_dict_block->data;
|
||||
}
|
||||
s = MaybeLoadDataBlockToCache(nullptr /*prefetch_buffer*/, rep, ro, handle,
|
||||
compression_dict, &block, is_index);
|
||||
compression_dict, &block, is_index,
|
||||
get_context);
|
||||
}
|
||||
|
||||
// Didn't get any data from block caches.
|
||||
@ -1437,7 +1510,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
|
||||
Status BlockBasedTable::MaybeLoadDataBlockToCache(
|
||||
FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro,
|
||||
const BlockHandle& handle, Slice compression_dict,
|
||||
CachableEntry<Block>* block_entry, bool is_index) {
|
||||
CachableEntry<Block>* block_entry, bool is_index, GetContext* get_context) {
|
||||
assert(block_entry != nullptr);
|
||||
const bool no_io = (ro.read_tier == kBlockCacheTier);
|
||||
Cache* block_cache = rep->table_options.block_cache.get();
|
||||
@ -1468,7 +1541,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
|
||||
s = GetDataBlockFromCache(
|
||||
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
|
||||
block_entry, rep->table_options.format_version, compression_dict,
|
||||
rep->table_options.read_amp_bytes_per_bit, is_index);
|
||||
rep->table_options.read_amp_bytes_per_bit, is_index, get_context);
|
||||
|
||||
if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
|
||||
std::unique_ptr<Block> raw_block;
|
||||
@ -1487,11 +1560,11 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
|
||||
block_entry, raw_block.release(), rep->table_options.format_version,
|
||||
compression_dict, rep->table_options.read_amp_bytes_per_bit,
|
||||
is_index,
|
||||
is_index &&
|
||||
rep->table_options
|
||||
.cache_index_and_filter_blocks_with_high_priority
|
||||
is_index && rep->table_options
|
||||
.cache_index_and_filter_blocks_with_high_priority
|
||||
? Cache::Priority::HIGH
|
||||
: Cache::Priority::LOW);
|
||||
: Cache::Priority::LOW,
|
||||
get_context);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1535,8 +1608,9 @@ BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator(
|
||||
&rep->internal_comparator, nullptr, true, rep->ioptions.statistics);
|
||||
}
|
||||
}
|
||||
return NewDataBlockIterator(rep, read_options_, handle, nullptr, is_index_,
|
||||
s);
|
||||
return NewDataBlockIterator(rep, read_options_, handle,
|
||||
/* input_iter */ nullptr, is_index_,
|
||||
/* get_context */ nullptr, s);
|
||||
}
|
||||
|
||||
bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch(
|
||||
@ -1730,8 +1804,9 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
|
||||
const bool no_io = read_options.read_tier == kBlockCacheTier;
|
||||
CachableEntry<FilterBlockReader> filter_entry;
|
||||
if (!skip_filters) {
|
||||
filter_entry = GetFilter(/*prefetch_buffer*/ nullptr,
|
||||
read_options.read_tier == kBlockCacheTier);
|
||||
filter_entry =
|
||||
GetFilter(/*prefetch_buffer*/ nullptr,
|
||||
read_options.read_tier == kBlockCacheTier, get_context);
|
||||
}
|
||||
FilterBlockReader* filter = filter_entry.value;
|
||||
|
||||
@ -1741,7 +1816,8 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
|
||||
RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL);
|
||||
} else {
|
||||
BlockIter iiter_on_stack;
|
||||
auto iiter = NewIndexIterator(read_options, &iiter_on_stack);
|
||||
auto iiter = NewIndexIterator(read_options, &iiter_on_stack,
|
||||
/* index_entry */ nullptr, get_context);
|
||||
std::unique_ptr<InternalIterator> iiter_unique_ptr;
|
||||
if (iiter != &iiter_on_stack) {
|
||||
iiter_unique_ptr.reset(iiter);
|
||||
@ -1765,7 +1841,8 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
|
||||
break;
|
||||
} else {
|
||||
BlockIter biter;
|
||||
NewDataBlockIterator(rep_, read_options, iiter->value(), &biter);
|
||||
NewDataBlockIterator(rep_, read_options, iiter->value(), &biter, false,
|
||||
get_context);
|
||||
|
||||
if (read_options.read_tier == kBlockCacheTier &&
|
||||
biter.status().IsIncomplete()) {
|
||||
|
@ -215,15 +215,14 @@ class BlockBasedTable : public TableReader {
|
||||
private:
|
||||
friend class MockedBlockBasedTable;
|
||||
// input_iter: if it is not null, update this one and return it as Iterator
|
||||
static InternalIterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro,
|
||||
const Slice& index_value,
|
||||
BlockIter* input_iter = nullptr,
|
||||
bool is_index = false);
|
||||
static InternalIterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro,
|
||||
const BlockHandle& block_hanlde,
|
||||
BlockIter* input_iter = nullptr,
|
||||
bool is_index = false,
|
||||
Status s = Status());
|
||||
static InternalIterator* NewDataBlockIterator(
|
||||
Rep* rep, const ReadOptions& ro, const Slice& index_value,
|
||||
BlockIter* input_iter = nullptr, bool is_index = false,
|
||||
GetContext* get_context = nullptr);
|
||||
static InternalIterator* NewDataBlockIterator(
|
||||
Rep* rep, const ReadOptions& ro, const BlockHandle& block_hanlde,
|
||||
BlockIter* input_iter = nullptr, bool is_index = false,
|
||||
GetContext* get_context = nullptr, Status s = Status());
|
||||
// If block cache enabled (compressed or uncompressed), looks for the block
|
||||
// identified by handle in (1) uncompressed cache, (2) compressed cache, and
|
||||
// then (3) file. If found, inserts into the cache(s) that were searched
|
||||
@ -238,16 +237,19 @@ class BlockBasedTable : public TableReader {
|
||||
const BlockHandle& handle,
|
||||
Slice compression_dict,
|
||||
CachableEntry<Block>* block_entry,
|
||||
bool is_index = false);
|
||||
bool is_index = false,
|
||||
GetContext* get_context = nullptr);
|
||||
|
||||
// For the following two functions:
|
||||
// if `no_io == true`, we will not try to read filter/index from sst file
|
||||
// were they not present in cache yet.
|
||||
CachableEntry<FilterBlockReader> GetFilter(
|
||||
FilePrefetchBuffer* prefetch_buffer = nullptr, bool no_io = false) const;
|
||||
FilePrefetchBuffer* prefetch_buffer = nullptr, bool no_io = false,
|
||||
GetContext* get_context = nullptr) const;
|
||||
virtual CachableEntry<FilterBlockReader> GetFilter(
|
||||
FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle,
|
||||
const bool is_a_filter_partition, bool no_io) const;
|
||||
const bool is_a_filter_partition, bool no_io,
|
||||
GetContext* get_context) const;
|
||||
|
||||
// Get the iterator from the index reader.
|
||||
// If input_iter is not set, return new Iterator
|
||||
@ -261,7 +263,8 @@ class BlockBasedTable : public TableReader {
|
||||
// kBlockCacheTier
|
||||
InternalIterator* NewIndexIterator(
|
||||
const ReadOptions& read_options, BlockIter* input_iter = nullptr,
|
||||
CachableEntry<IndexReader>* index_entry = nullptr);
|
||||
CachableEntry<IndexReader>* index_entry = nullptr,
|
||||
GetContext* get_context = nullptr);
|
||||
|
||||
// Read block cache from block caches (if set): block_cache and
|
||||
// block_cache_compressed.
|
||||
@ -275,7 +278,7 @@ class BlockBasedTable : public TableReader {
|
||||
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
|
||||
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
|
||||
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
|
||||
bool is_index = false);
|
||||
bool is_index = false, GetContext* get_context = nullptr);
|
||||
|
||||
// Put a raw block (maybe compressed) to the corresponding block caches.
|
||||
// This method will perform decompression against raw_block if needed and then
|
||||
@ -293,7 +296,8 @@ class BlockBasedTable : public TableReader {
|
||||
const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
|
||||
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
|
||||
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
|
||||
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW);
|
||||
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
|
||||
GetContext* get_context = nullptr);
|
||||
|
||||
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
|
||||
// after a call to Seek(key), until handle_result returns false.
|
||||
|
@ -87,6 +87,13 @@ void GetContext::SaveValue(const Slice& value, SequenceNumber seq) {
|
||||
}
|
||||
}
|
||||
|
||||
void GetContext::RecordCounters(Tickers ticker, size_t val) {
|
||||
if (ticker == Tickers::TICKER_ENUM_MAX) {
|
||||
return;
|
||||
}
|
||||
tickers_value[ticker] += static_cast<uint64_t>(val);
|
||||
}
|
||||
|
||||
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
|
||||
const Slice& value, Cleanable* value_pinner) {
|
||||
assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include "db/range_del_aggregator.h"
|
||||
#include "db/read_callback.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/statistics.h"
|
||||
#include "rocksdb/types.h"
|
||||
#include "table/block.h"
|
||||
|
||||
@ -26,6 +27,7 @@ class GetContext {
|
||||
kMerge, // saver contains the current merge result (the operands)
|
||||
kBlobIndex,
|
||||
};
|
||||
uint64_t tickers_value[Tickers::TICKER_ENUM_MAX] = {0};
|
||||
|
||||
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
|
||||
Logger* logger, Statistics* statistics, GetState init_state,
|
||||
@ -72,6 +74,8 @@ class GetContext {
|
||||
return true;
|
||||
}
|
||||
|
||||
void RecordCounters(Tickers ticker, size_t val);
|
||||
|
||||
private:
|
||||
const Comparator* ucmp_;
|
||||
const MergeOperator* merge_operator_;
|
||||
|
@ -231,7 +231,8 @@ PartitionedFilterBlockReader::GetFilterPartition(
|
||||
}
|
||||
}
|
||||
return table_->GetFilter(/*prefetch_buffer*/ nullptr, fltr_blk_handle,
|
||||
is_a_filter_partition, no_io);
|
||||
is_a_filter_partition, no_io,
|
||||
/* get_context */ nullptr);
|
||||
} else {
|
||||
auto filter = table_->ReadFilter(prefetch_buffer, fltr_blk_handle,
|
||||
is_a_filter_partition);
|
||||
@ -295,7 +296,8 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
|
||||
const bool no_io = true;
|
||||
const bool is_a_filter_partition = true;
|
||||
auto filter = table_->GetFilter(prefetch_buffer.get(), handle,
|
||||
is_a_filter_partition, !no_io);
|
||||
is_a_filter_partition, !no_io,
|
||||
/* get_context */ nullptr);
|
||||
if (LIKELY(filter.IsSet())) {
|
||||
if (pin) {
|
||||
filter_map_[handle.offset()] = std::move(filter);
|
||||
|
@ -29,7 +29,8 @@ class MockedBlockBasedTable : public BlockBasedTable {
|
||||
|
||||
virtual CachableEntry<FilterBlockReader> GetFilter(
|
||||
FilePrefetchBuffer*, const BlockHandle& filter_blk_handle,
|
||||
const bool /* unused */, bool /* unused */) const override {
|
||||
const bool /* unused */, bool /* unused */,
|
||||
GetContext* /* unused */) const override {
|
||||
Slice slice = slices[filter_blk_handle.offset()];
|
||||
auto obj = new FullFilterBlockReader(
|
||||
nullptr, true, BlockContents(slice, false, kNoCompression),
|
||||
|
Loading…
x
Reference in New Issue
Block a user