Preload l0 index partitions

Summary:
This fixes the existing logic for pinning l0 index partitions. The patch preloads the partitions into block cache and pin them if they belong to level 0 and pin_l0 is set.

The drawback is that it does many small IOs when preloading all the partitions into the cache is direct io is enabled. Working for a solution for that.
Closes https://github.com/facebook/rocksdb/pull/2661

Differential Revision: D5554010

Pulled By: maysamyabandeh

fbshipit-source-id: 1e6f32a3524d71355c77d4138516dcfb601ca7b2
This commit is contained in:
Maysam Yabandeh 2017-08-18 10:53:03 -07:00 committed by Facebook Github Bot
parent bddd5d3630
commit 1efc600ddf
5 changed files with 127 additions and 69 deletions

View File

@ -1,5 +1,7 @@
# Rocksdb Change Log
## Unreleased
## 5.7.8 (08/14/2017)
### New Features
* Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators.
* Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1.

View File

@ -5,7 +5,7 @@
#pragma once
#define ROCKSDB_MAJOR 5
#define ROCKSDB_MINOR 7
#define ROCKSDB_MINOR 8
#define ROCKSDB_PATCH 0
// Do not use these. We made the mistake of declaring macros starting with

View File

@ -186,23 +186,87 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
// Filters are already checked before seeking the index
const bool skip_filters = true;
const bool is_index = true;
Cleanable* block_cache_cleaner = nullptr;
const bool pin_cached_indexes =
level_ == 0 &&
table_->rep_->table_options.pin_l0_filter_and_index_blocks_in_cache;
if (pin_cached_indexes) {
// Keep partition indexes into the cache as long as the partition index
// reader object is alive
block_cache_cleaner = this;
}
return NewTwoLevelIterator(
new BlockBasedTable::BlockEntryIteratorState(
table_, ReadOptions(), icomparator_, skip_filters, is_index,
block_cache_cleaner),
partition_map_.size() ? &partition_map_ : nullptr),
index_block_->NewIterator(icomparator_, nullptr, true));
// TODO(myabandeh): Update TwoLevelIterator to be able to make use of
// on-stack
// BlockIter while the state is on heap
// on-stack BlockIter while the state is on heap. Currentlly it assumes
// the first level iter is always on heap and will attempt to delete it
// in its destructor.
}
virtual void CacheDependencies(bool pin) override {
// Before read partitions, prefetch them to avoid lots of IOs
auto rep = table_->rep_;
BlockIter biter;
BlockHandle handle;
index_block_->NewIterator(icomparator_, &biter, true);
// Index partitions are assumed to be consecuitive. Prefetch them all.
// Read the first block offset
biter.SeekToFirst();
Slice input = biter.value();
Status s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
"Could not read first index partition");
return;
}
uint64_t prefetch_off = handle.offset();
// Read the last block's offset
biter.SeekToLast();
input = biter.value();
s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
"Could not read last index partition");
return;
}
uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize;
uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
auto& file = table_->rep_->file;
prefetch_buffer.reset(new FilePrefetchBuffer());
s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len);
// After prefetch, read the partitions one by one
biter.SeekToFirst();
auto ro = ReadOptions();
Cache* block_cache = rep->table_options.block_cache.get();
for (; biter.Valid(); biter.Next()) {
input = biter.value();
s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
"Could not read index partition");
continue;
}
BlockBasedTable::CachableEntry<Block> block;
Slice compression_dict;
if (rep->compression_dict_block) {
compression_dict = rep->compression_dict_block->data;
}
const bool is_index = true;
s = table_->MaybeLoadDataBlockToCache(prefetch_buffer.get(), rep, ro,
handle, compression_dict, &block,
is_index);
if (s.ok() && block.value != nullptr) {
assert(block.cache_handle != nullptr);
if (pin) {
partition_map_[handle.offset()] = block;
RegisterCleanup(&ReleaseCachedEntry, block_cache, block.cache_handle);
} else {
block_cache->Release(block.cache_handle);
}
}
}
}
virtual size_t size() const override { return index_block_->size(); }
@ -222,13 +286,12 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
const int level)
: IndexReader(icomparator, stats),
table_(table),
index_block_(std::move(index_block)),
level_(level) {
index_block_(std::move(index_block)) {
assert(index_block_ != nullptr);
}
BlockBasedTable* table_;
std::unique_ptr<Block> index_block_;
int level_;
std::map<uint64_t, BlockBasedTable::CachableEntry<Block>> partition_map_;
};
// Index that allows binary search lookup for the first key of each block.
@ -708,10 +771,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
} else {
if (found_range_del_block && !rep->range_del_handle.IsNull()) {
ReadOptions read_options;
// TODO: try to use prefetched buffer too.
s = MaybeLoadDataBlockToCache(rep, read_options, rep->range_del_handle,
Slice() /* compression_dict */,
&rep->range_del_entry);
s = MaybeLoadDataBlockToCache(
prefetch_buffer.get(), rep, read_options, rep->range_del_handle,
Slice() /* compression_dict */, &rep->range_del_entry);
if (!s.ok()) {
ROCKS_LOG_WARN(
rep->ioptions.info_log,
@ -740,21 +802,22 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// Always prefetch index and filter for level 0
if (table_options.cache_index_and_filter_blocks) {
if (prefetch_index_and_filter_in_cache || level == 0) {
const bool pin =
rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
level == 0;
assert(table_options.block_cache != nullptr);
// Hack: Call NewIndexIterator() to implicitly add index to the
// block_cache
// if pin_l0_filter_and_index_blocks_in_cache is true and this is
// a level0 file, then we will pass in this pointer to rep->index
// to NewIndexIterator(), which will save the index block in there
// else it's a nullptr and nothing special happens
CachableEntry<IndexReader>* index_entry = nullptr;
if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
level == 0) {
index_entry = &rep->index_entry;
}
CachableEntry<IndexReader> index_entry;
unique_ptr<InternalIterator> iter(
new_table->NewIndexIterator(ReadOptions(), nullptr, index_entry));
new_table->NewIndexIterator(ReadOptions(), nullptr, &index_entry));
index_entry.value->CacheDependencies(pin);
if (pin) {
rep->index_entry = std::move(index_entry);
} else {
index_entry.Release(table_options.block_cache.get());
}
s = iter->status();
if (s.ok()) {
@ -764,8 +827,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// a level0 file, then save it in rep_->filter_entry; it will be
// released in the destructor only, hence it will be pinned in the
// cache while this reader is alive
if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
level == 0) {
if (pin) {
rep->filter_entry = filter_entry;
if (rep->filter_entry.value != nullptr) {
rep->filter_entry.value->SetLevel(level);
@ -1305,8 +1367,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
if (rep->compression_dict_block) {
compression_dict = rep->compression_dict_block->data;
}
s = MaybeLoadDataBlockToCache(rep, ro, handle, compression_dict, &block,
is_index);
s = MaybeLoadDataBlockToCache(nullptr /*prefetch_buffer*/, rep, ro, handle,
compression_dict, &block, is_index);
}
// Didn't get any data from block caches.
@ -1355,8 +1417,9 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
}
Status BlockBasedTable::MaybeLoadDataBlockToCache(
Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
Slice compression_dict, CachableEntry<Block>* block_entry, bool is_index) {
FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro,
const BlockHandle& handle, Slice compression_dict,
CachableEntry<Block>* block_entry, bool is_index) {
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep->table_options.block_cache.get();
Cache* block_cache_compressed =
@ -1392,12 +1455,11 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
std::unique_ptr<Block> raw_block;
{
StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
s = ReadBlockFromFile(rep->file.get(), nullptr /* prefetch_buffer*/,
rep->footer, ro, handle, &raw_block,
rep->ioptions, block_cache_compressed == nullptr,
compression_dict, rep->persistent_cache_options,
rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit);
s = ReadBlockFromFile(
rep->file.get(), prefetch_buffer, rep->footer, ro, handle,
&raw_block, rep->ioptions, block_cache_compressed == nullptr,
compression_dict, rep->persistent_cache_options, rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit);
}
if (s.ok()) {
@ -1420,14 +1482,14 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
BlockBasedTable::BlockEntryIteratorState::BlockEntryIteratorState(
BlockBasedTable* table, const ReadOptions& read_options,
const InternalKeyComparator* icomparator, bool skip_filters, bool is_index,
Cleanable* block_cache_cleaner)
std::map<uint64_t, CachableEntry<Block>>* block_map)
: TwoLevelIteratorState(table->rep_->ioptions.prefix_extractor != nullptr),
table_(table),
read_options_(read_options),
icomparator_(icomparator),
skip_filters_(skip_filters),
is_index_(is_index),
block_cache_cleaner_(block_cache_cleaner) {}
block_map_(block_map) {}
InternalIterator*
BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator(
@ -1436,23 +1498,15 @@ BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator(
BlockHandle handle;
Slice input = index_value;
Status s = handle.DecodeFrom(&input);
auto iter = NewDataBlockIterator(table_->rep_, read_options_, handle, nullptr,
is_index_, s);
if (block_cache_cleaner_) {
uint64_t offset = handle.offset();
{
ReadLock rl(&cleaner_mu);
if (cleaner_set.find(offset) != cleaner_set.end()) {
// already have a reference to the block cache objects
return iter;
}
}
WriteLock wl(&cleaner_mu);
cleaner_set.insert(offset);
// Keep the data into cache until the cleaner cleansup
iter->DelegateCleanupsTo(block_cache_cleaner_);
auto rep = table_->rep_;
if (block_map_) {
auto block = block_map_->find(handle.offset());
assert(block != block_map_->end());
return block->second.value->NewIterator(&rep->internal_comparator, nullptr,
true, rep->ioptions.statistics);
}
return iter;
return NewDataBlockIterator(rep, read_options_, handle, nullptr, is_index_,
s);
}
bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch(

View File

@ -181,6 +181,8 @@ class BlockBasedTable : public TableReader {
// that was allocated in block cache.
virtual size_t ApproximateMemoryUsage() const = 0;
virtual void CacheDependencies(bool /* unused */) {}
protected:
const InternalKeyComparator* icomparator_;
@ -227,7 +229,8 @@ class BlockBasedTable : public TableReader {
// @param block_entry value is set to the uncompressed block if found. If
// in uncompressed block cache, also sets cache_handle to reference that
// block.
static Status MaybeLoadDataBlockToCache(Rep* rep, const ReadOptions& ro,
static Status MaybeLoadDataBlockToCache(FilePrefetchBuffer* prefetch_buffer,
Rep* rep, const ReadOptions& ro,
const BlockHandle& handle,
Slice compression_dict,
CachableEntry<Block>* block_entry,
@ -345,11 +348,11 @@ class BlockBasedTable : public TableReader {
// Maitaning state of a two-level iteration on a partitioned index structure
class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
public:
BlockEntryIteratorState(BlockBasedTable* table,
const ReadOptions& read_options,
const InternalKeyComparator* icomparator,
bool skip_filters, bool is_index = false,
Cleanable* block_cache_cleaner = nullptr);
BlockEntryIteratorState(
BlockBasedTable* table, const ReadOptions& read_options,
const InternalKeyComparator* icomparator, bool skip_filters,
bool is_index = false,
std::map<uint64_t, CachableEntry<Block>>* block_map = nullptr);
InternalIterator* NewSecondaryIterator(const Slice& index_value) override;
bool PrefixMayMatch(const Slice& internal_key) override;
bool KeyReachedUpperBound(const Slice& internal_key) override;
@ -362,8 +365,7 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
bool skip_filters_;
// true if the 2nd level iterator is on indexes instead of on user data.
bool is_index_;
Cleanable* block_cache_cleaner_;
std::set<uint64_t> cleaner_set;
std::map<uint64_t, CachableEntry<Block>>* block_map_;
port::RWMutex cleaner_mu;
};

View File

@ -2174,7 +2174,7 @@ std::map<std::string, size_t> MockCache::marked_data_in_cache_;
// Block cache can contain raw data blocks as well as general objects. If an
// object depends on the table to be live, it then must be destructed before the
// table is closed. This test makese sure that the only items remains in the
// table is closed. This test makes sure that the only items remains in the
// cache after the table is closed are raw data blocks.
TEST_F(BlockBasedTableTest, NoObjectInCacheAfterTableClose) {
for (auto index_type :