Customized BlockBasedTableIterator and LevelIterator

Summary:
Use a customzied BlockBasedTableIterator and LevelIterator to replace current implementations leveraging two-level-iterator. Hope the customized logic will make code easier to understand. As a side effect, BlockBasedTableIterator reduces the allocation for the data block iterator object, and avoid the virtual function call to it, because we can directly reference BlockIter, a final class. Similarly, LevelIterator reduces virtual function call to the dummy iterator iterating the file metadata. It also enabled further optimization.

The upper bound check is also moved from index block to data block. This implementation fits this iterator better. After the change, forwared iterator is slightly optimized to ensure we trim those iterators.

The two-level-iterator now is only used by partitioned index, so it is simplified.
Closes https://github.com/facebook/rocksdb/pull/3406

Differential Revision: D6809041

Pulled By: siying

fbshipit-source-id: 7da3b9b1d3c8e9d9405302c15920af1fcaf50ffa
This commit is contained in:
Siying Dong 2018-02-12 16:57:56 -08:00 committed by Facebook Github Bot
parent 8a04ee4fd1
commit b555ed30a4
7 changed files with 664 additions and 340 deletions

View File

@ -383,14 +383,13 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
if (!l0_iters_[i]->status().ok()) {
immutable_status_ = l0_iters_[i]->status();
} else if (l0_iters_[i]->Valid()) {
if (!IsOverUpperBound(l0_iters_[i]->key())) {
immutable_min_heap_.push(l0_iters_[i]);
} else {
has_iter_trimmed_for_upper_bound_ = true;
DeleteIterator(l0_iters_[i]);
l0_iters_[i] = nullptr;
}
} else if (l0_iters_[i]->Valid() &&
!IsOverUpperBound(l0_iters_[i]->key())) {
immutable_min_heap_.push(l0_iters_[i]);
} else {
has_iter_trimmed_for_upper_bound_ = true;
DeleteIterator(l0_iters_[i]);
l0_iters_[i] = nullptr;
}
}
@ -417,15 +416,14 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
if (!level_iters_[level - 1]->status().ok()) {
immutable_status_ = level_iters_[level - 1]->status();
} else if (level_iters_[level - 1]->Valid()) {
if (!IsOverUpperBound(level_iters_[level - 1]->key())) {
immutable_min_heap_.push(level_iters_[level - 1]);
} else {
// Nothing in this level is interesting. Remove.
has_iter_trimmed_for_upper_bound_ = true;
DeleteIterator(level_iters_[level - 1]);
level_iters_[level - 1] = nullptr;
}
} else if (level_iters_[level - 1]->Valid() &&
!IsOverUpperBound(level_iters_[level - 1]->key())) {
immutable_min_heap_.push(level_iters_[level - 1]);
} else {
// Nothing in this level is interesting. Remove.
has_iter_trimmed_for_upper_bound_ = true;
DeleteIterator(level_iters_[level - 1]);
level_iters_[level - 1] = nullptr;
}
}
}

View File

@ -424,129 +424,261 @@ bool SomeFileOverlapsRange(
}
namespace {
// An internal iterator. For a given version/level pair, yields
// information about the files in the level. For a given entry, key()
// is the largest key that occurs in the file, and value() is an
// 16-byte value containing the file number and file size, both
// encoded using EncodeFixed64.
class LevelFileNumIterator : public InternalIterator {
class LevelIterator final : public InternalIterator {
public:
LevelFileNumIterator(const InternalKeyComparator& icmp,
const LevelFilesBrief* flevel, bool should_sample)
: icmp_(icmp),
flevel_(flevel),
index_(static_cast<uint32_t>(flevel->num_files)),
current_value_(0, 0, 0), // Marks as invalid
should_sample_(should_sample) {}
virtual bool Valid() const override { return index_ < flevel_->num_files; }
virtual void Seek(const Slice& target) override {
index_ = FindFile(icmp_, *flevel_, target);
}
virtual void SeekForPrev(const Slice& target) override {
SeekForPrevImpl(target, &icmp_);
}
virtual void SeekToFirst() override { index_ = 0; }
virtual void SeekToLast() override {
index_ = (flevel_->num_files == 0)
? 0
: static_cast<uint32_t>(flevel_->num_files) - 1;
}
virtual void Next() override {
assert(Valid());
index_++;
}
virtual void Prev() override {
assert(Valid());
if (index_ == 0) {
index_ = static_cast<uint32_t>(flevel_->num_files); // Marks as invalid
} else {
index_--;
}
}
Slice key() const override {
assert(Valid());
return flevel_->files[index_].largest_key;
}
Slice value() const override {
assert(Valid());
auto file_meta = flevel_->files[index_];
if (should_sample_) {
sample_file_read_inc(file_meta.file_metadata);
}
current_value_ = file_meta.fd;
return Slice(reinterpret_cast<const char*>(&current_value_),
sizeof(FileDescriptor));
}
virtual Status status() const override { return Status::OK(); }
private:
const InternalKeyComparator icmp_;
const LevelFilesBrief* flevel_;
uint32_t index_;
mutable FileDescriptor current_value_;
bool should_sample_;
};
class LevelFileIteratorState : public TwoLevelIteratorState {
public:
// @param skip_filters Disables loading/accessing the filter block
LevelFileIteratorState(TableCache* table_cache,
const ReadOptions& read_options,
const EnvOptions& env_options,
const InternalKeyComparator& icomparator,
HistogramImpl* file_read_hist, bool for_compaction,
bool prefix_enabled, bool skip_filters, int level,
RangeDelAggregator* range_del_agg)
: TwoLevelIteratorState(prefix_enabled),
table_cache_(table_cache),
LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
const EnvOptions& env_options,
const InternalKeyComparator& icomparator,
const LevelFilesBrief* flevel, bool should_sample,
HistogramImpl* file_read_hist, bool for_compaction,
bool skip_filters, int level, RangeDelAggregator* range_del_agg)
: table_cache_(table_cache),
read_options_(read_options),
env_options_(env_options),
icomparator_(icomparator),
flevel_(flevel),
file_read_hist_(file_read_hist),
should_sample_(should_sample),
for_compaction_(for_compaction),
skip_filters_(skip_filters),
file_index_(flevel_->num_files),
level_(level),
range_del_agg_(range_del_agg) {}
range_del_agg_(range_del_agg),
pinned_iters_mgr_(nullptr) {
// Empty level is not supported.
assert(flevel_ != nullptr && flevel_->num_files > 0);
}
InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override {
if (meta_handle.size() != sizeof(FileDescriptor)) {
return NewErrorInternalIterator(
Status::Corruption("FileReader invoked with unexpected value"));
virtual ~LevelIterator() { delete file_iter_.Set(nullptr); }
virtual void Seek(const Slice& target) override;
virtual void SeekForPrev(const Slice& target) override;
virtual void SeekToFirst() override;
virtual void SeekToLast() override;
virtual void Next() override;
virtual void Prev() override;
virtual bool Valid() const override { return file_iter_.Valid(); }
virtual Slice key() const override {
assert(Valid());
return file_iter_.key();
}
virtual Slice value() const override {
assert(Valid());
return file_iter_.value();
}
virtual Status status() const override {
// It'd be nice if status() returned a const Status& instead of a Status
if (!status_.ok()) {
return status_;
} else if (file_iter_.iter() != nullptr) {
return file_iter_.status();
}
const FileDescriptor* fd =
reinterpret_cast<const FileDescriptor*>(meta_handle.data());
return table_cache_->NewIterator(
read_options_, env_options_, icomparator_, *fd, range_del_agg_,
nullptr /* don't need reference to table */, file_read_hist_,
for_compaction_, nullptr /* arena */, skip_filters_, level_);
return Status::OK();
}
virtual void SetPinnedItersMgr(
PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
if (file_iter_.iter()) {
file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
}
}
virtual bool IsKeyPinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
file_iter_.iter() && file_iter_.IsKeyPinned();
}
virtual bool IsValuePinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
file_iter_.iter() && file_iter_.IsValuePinned();
}
bool PrefixMayMatch(const Slice& internal_key) override {
return true;
private:
void SkipEmptyFileForward();
void SkipEmptyFileBackward();
void SetFileIterator(InternalIterator* iter);
void InitFileIterator(size_t new_file_index);
const Slice& file_smallest_key(size_t file_index) {
assert(file_index < flevel_->num_files);
return flevel_->files[file_index].smallest_key;
}
bool KeyReachedUpperBound(const Slice& internal_key) override {
bool KeyReachedUpperBound(const Slice& internal_key) {
return read_options_.iterate_upper_bound != nullptr &&
icomparator_.user_comparator()->Compare(
ExtractUserKey(internal_key),
*read_options_.iterate_upper_bound) >= 0;
}
private:
InternalIterator* NewFileIterator() {
assert(file_index_ < flevel_->num_files);
auto file_meta = flevel_->files[file_index_];
if (should_sample_) {
sample_file_read_inc(file_meta.file_metadata);
}
return table_cache_->NewIterator(
read_options_, env_options_, icomparator_, file_meta.fd, range_del_agg_,
nullptr /* don't need reference to table */, file_read_hist_,
for_compaction_, nullptr /* arena */, skip_filters_, level_);
}
TableCache* table_cache_;
const ReadOptions read_options_;
const EnvOptions& env_options_;
const InternalKeyComparator& icomparator_;
const LevelFilesBrief* flevel_;
mutable FileDescriptor current_value_;
HistogramImpl* file_read_hist_;
bool should_sample_;
bool for_compaction_;
bool skip_filters_;
size_t file_index_;
int level_;
RangeDelAggregator* range_del_agg_;
IteratorWrapper file_iter_; // May be nullptr
PinnedIteratorsManager* pinned_iters_mgr_;
Status status_;
};
void LevelIterator::Seek(const Slice& target) {
size_t new_file_index = FindFile(icomparator_, *flevel_, target);
InitFileIterator(new_file_index);
if (file_iter_.iter() != nullptr) {
file_iter_.Seek(target);
}
SkipEmptyFileForward();
}
void LevelIterator::SeekForPrev(const Slice& target) {
size_t new_file_index = FindFile(icomparator_, *flevel_, target);
if (new_file_index >= flevel_->num_files) {
new_file_index = flevel_->num_files - 1;
}
InitFileIterator(new_file_index);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekForPrev(target);
SkipEmptyFileBackward();
}
}
void LevelIterator::SeekToFirst() {
InitFileIterator(0);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToFirst();
}
SkipEmptyFileForward();
}
void LevelIterator::SeekToLast() {
InitFileIterator(flevel_->num_files - 1);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToLast();
}
SkipEmptyFileBackward();
}
void LevelIterator::Next() {
assert(Valid());
file_iter_.Next();
SkipEmptyFileForward();
}
void LevelIterator::Prev() {
assert(Valid());
file_iter_.Prev();
SkipEmptyFileBackward();
}
void LevelIterator::SkipEmptyFileForward() {
// For an error (IO error, checksum mismatch, etc), we skip the file
// and move to the next one and continue reading data.
// TODO this behavior is from LevelDB. We should revisit it.
while (file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && !file_iter_.status().IsIncomplete())) {
if (file_iter_.iter() != nullptr && !file_iter_.Valid() &&
file_iter_.iter()->IsOutOfBound()) {
return;
}
// Move to next file
if (file_index_ >= flevel_->num_files - 1) {
// Already at the last file
SetFileIterator(nullptr);
return;
}
if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
SetFileIterator(nullptr);
return;
}
InitFileIterator(file_index_ + 1);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToFirst();
}
}
}
void LevelIterator::SkipEmptyFileBackward() {
while (file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && !file_iter_.status().IsIncomplete())) {
// Move to previous file
if (file_index_ == 0) {
// Already the first file
SetFileIterator(nullptr);
return;
}
InitFileIterator(file_index_ - 1);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToLast();
}
}
}
void LevelIterator::SetFileIterator(InternalIterator* iter) {
if (file_iter_.iter() != nullptr && status_.ok()) {
// TODO right now we don't invalidate the iterator even if the status is
// not OK. We should consider to do that so that it is harder for users to
// skip errors.
status_ = file_iter_.status();
}
if (pinned_iters_mgr_ && iter) {
iter->SetPinnedItersMgr(pinned_iters_mgr_);
}
InternalIterator* old_iter = file_iter_.Set(iter);
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
pinned_iters_mgr_->PinIterator(old_iter);
} else {
delete old_iter;
}
}
void LevelIterator::InitFileIterator(size_t new_file_index) {
if (new_file_index >= flevel_->num_files) {
file_index_ = new_file_index;
SetFileIterator(nullptr);
return;
} else {
// If the file iterator shows incomplete, we try it again if users seek
// to the same file, as this time we may go to a different data block
// which is cached in block cache.
//
if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
new_file_index == file_index_) {
// file_iter_ is already constructed with this iterator, so
// no need to change anything
} else {
file_index_ = new_file_index;
InternalIterator* iter = NewFileIterator();
SetFileIterator(iter);
}
}
}
// A wrapper of version builder which references the current version in
// constructor and unref it in the destructor.
// Both of the constructor and destructor need to be called inside DB Mutex.
@ -854,24 +986,18 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
sample_file_read_inc(meta);
}
}
} else {
} else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState));
auto* state = new (mem)
LevelFileIteratorState(cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(),
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */,
cfd_->ioptions()->prefix_extractor != nullptr,
IsFilterSkipped(level), level, range_del_agg);
mem = arena->AllocateAligned(sizeof(LevelFileNumIterator));
auto* first_level_iter = new (mem) LevelFileNumIterator(
auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
merge_iter_builder->AddIterator(new (mem) LevelIterator(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
should_sample_file_read());
merge_iter_builder->AddIterator(
NewTwoLevelIterator(state, first_level_iter, arena, false));
should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */, IsFilterSkipped(level), level,
range_del_agg));
}
}
@ -3732,17 +3858,13 @@ InternalIterator* VersionSet::MakeInputIterator(
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new LevelFileIteratorState(
cfd->table_cache(), read_options, env_options_compactions,
cfd->internal_comparator(),
nullptr /* no per level latency histogram */,
true /* for_compaction */, false /* prefix enabled */,
false /* skip_filters */, (int)which /* level */,
range_del_agg),
new LevelFileNumIterator(cfd->internal_comparator(),
c->input_levels(which),
false /* don't sample compaction */));
list[num++] = new LevelIterator(
cfd->table_cache(), read_options, env_options_compactions,
cfd->internal_comparator(), c->input_levels(which),
false /* should_sample */,
nullptr /* no per level latency histogram */,
true /* for_compaction */, false /* skip_filters */,
(int)which /* level */, range_del_agg);
}
}
}

View File

@ -210,13 +210,16 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
virtual InternalIterator* NewIterator(BlockIter* iter = nullptr,
bool dont_care = true) override {
// Filters are already checked before seeking the index
const bool skip_filters = true;
const bool is_index = true;
return NewTwoLevelIterator(
new BlockBasedTable::BlockEntryIteratorState(
table_, ReadOptions(), icomparator_, skip_filters, is_index,
partition_map_.size() ? &partition_map_ : nullptr),
index_block_->NewIterator(icomparator_, nullptr, true));
if (!partition_map_.empty()) {
return NewTwoLevelIterator(
new BlockBasedTable::PartitionedIndexIteratorState(
table_, partition_map_.size() ? &partition_map_ : nullptr),
index_block_->NewIterator(icomparator_, nullptr, true));
} else {
return new BlockBasedTableIterator(
table_, ReadOptions(), *icomparator_,
index_block_->NewIterator(icomparator_, nullptr, true), false);
}
// TODO(myabandeh): Update TwoLevelIterator to be able to make use of
// on-stack BlockIter while the state is on heap. Currentlly it assumes
// the first level iter is always on heap and will attempt to delete it
@ -1632,91 +1635,37 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
return s;
}
BlockBasedTable::BlockEntryIteratorState::BlockEntryIteratorState(
BlockBasedTable* table, const ReadOptions& read_options,
const InternalKeyComparator* icomparator, bool skip_filters, bool is_index,
BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState(
BlockBasedTable* table,
std::unordered_map<uint64_t, CachableEntry<Block>>* block_map)
: TwoLevelIteratorState(table->rep_->ioptions.prefix_extractor != nullptr),
table_(table),
read_options_(read_options),
icomparator_(icomparator),
skip_filters_(skip_filters),
is_index_(is_index),
block_map_(block_map) {}
: table_(table), block_map_(block_map) {}
const size_t BlockBasedTable::BlockEntryIteratorState::kMaxReadaheadSize =
256 * 1024;
const size_t BlockBasedTableIterator::kMaxReadaheadSize = 256 * 1024;
InternalIterator*
BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator(
BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator(
const Slice& index_value) {
// Return a block iterator on the index partition
BlockHandle handle;
Slice input = index_value;
Status s = handle.DecodeFrom(&input);
auto rep = table_->rep_;
if (block_map_) {
auto block = block_map_->find(handle.offset());
// This is a possible scenario since block cache might not have had space
// for the partition
if (block != block_map_->end()) {
PERF_COUNTER_ADD(block_cache_hit_count, 1);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_INDEX_HIT);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_HIT);
Cache* block_cache = rep->table_options.block_cache.get();
assert(block_cache);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(block->second.cache_handle));
return block->second.value->NewIterator(
&rep->internal_comparator, nullptr, true, rep->ioptions.statistics);
}
auto rep = table_->get_rep();
auto block = block_map_->find(handle.offset());
// This is a possible scenario since block cache might not have had space
// for the partition
if (block != block_map_->end()) {
PERF_COUNTER_ADD(block_cache_hit_count, 1);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_INDEX_HIT);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_HIT);
Cache* block_cache = rep->table_options.block_cache.get();
assert(block_cache);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(block->second.cache_handle));
return block->second.value->NewIterator(&rep->internal_comparator, nullptr,
true, rep->ioptions.statistics);
}
// Automatically prefetch additional data when a range scan (iterator) does
// more than 2 sequential IOs. This is enabled only when
// ReadOptions.readahead_size is 0.
if (read_options_.readahead_size == 0) {
if (num_file_reads_ < 2) {
num_file_reads_++;
} else if (handle.offset() + static_cast<size_t>(handle.size()) +
kBlockTrailerSize >
readahead_limit_) {
num_file_reads_++;
// Do not readahead more than kMaxReadaheadSize.
readahead_size_ =
std::min(BlockBasedTable::BlockEntryIteratorState::kMaxReadaheadSize,
readahead_size_);
table_->rep_->file->Prefetch(handle.offset(), readahead_size_);
readahead_limit_ = handle.offset() + readahead_size_;
// Keep exponentially increasing readahead size until kMaxReadaheadSize.
readahead_size_ *= 2;
}
}
return NewDataBlockIterator(rep, read_options_, handle,
/* input_iter */ nullptr, is_index_,
/* get_context */ nullptr, s);
}
bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch(
const Slice& internal_key) {
if (read_options_.total_order_seek || skip_filters_) {
return true;
}
return table_->PrefixMayMatch(internal_key);
}
bool BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound(
const Slice& internal_key) {
bool reached_upper_bound = read_options_.iterate_upper_bound != nullptr &&
icomparator_ != nullptr &&
icomparator_->user_comparator()->Compare(
ExtractUserKey(internal_key),
*read_options_.iterate_upper_bound) >= 0;
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound",
&reached_upper_bound);
return reached_upper_bound;
// Create an empty iterator
return new BlockIter();
}
// This will be broken if the user specifies an unusual implementation
@ -1820,13 +1769,224 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
return may_match;
}
void BlockBasedTableIterator::Seek(const Slice& target) {
if (!CheckPrefixMayMatch(target)) {
ResetDataIter();
return;
}
SavePrevIndexValue();
index_iter_->Seek(target);
if (!index_iter_->Valid()) {
ResetDataIter();
return;
}
InitDataBlock();
data_block_iter_.Seek(target);
FindKeyForward();
assert(!data_block_iter_.Valid() ||
icomp_.Compare(target, data_block_iter_.key()) <= 0);
}
void BlockBasedTableIterator::SeekForPrev(const Slice& target) {
if (!CheckPrefixMayMatch(target)) {
ResetDataIter();
return;
}
SavePrevIndexValue();
// Call Seek() rather than SeekForPrev() in the index block, because the
// target data block will likely to contain the position for `target`, the
// same as Seek(), rather than than before.
// For example, if we have three data blocks, each containing two keys:
// [2, 4] [6, 8] [10, 12]
// (the keys in the index block would be [4, 8, 12])
// and the user calls SeekForPrev(7), we need to go to the second block,
// just like if they call Seek(7).
// The only case where the block is difference is when they seek to a position
// in the boundary. For example, if they SeekForPrev(5), we should go to the
// first block, rather than the second. However, we don't have the information
// to distinguish the two unless we read the second block. In this case, we'll
// end up with reading two blocks.
index_iter_->Seek(target);
if (!index_iter_->Valid()) {
index_iter_->SeekToLast();
if (!index_iter_->Valid()) {
ResetDataIter();
block_iter_points_to_real_block_ = false;
return;
}
}
InitDataBlock();
data_block_iter_.SeekForPrev(target);
FindKeyBackward();
assert(!data_block_iter_.Valid() ||
icomp_.Compare(target, data_block_iter_.key()) >= 0);
}
void BlockBasedTableIterator::SeekToFirst() {
SavePrevIndexValue();
index_iter_->SeekToFirst();
if (!index_iter_->Valid()) {
ResetDataIter();
return;
}
InitDataBlock();
data_block_iter_.SeekToFirst();
FindKeyForward();
}
void BlockBasedTableIterator::SeekToLast() {
SavePrevIndexValue();
index_iter_->SeekToLast();
if (!index_iter_->Valid()) {
ResetDataIter();
return;
}
InitDataBlock();
data_block_iter_.SeekToLast();
FindKeyBackward();
}
void BlockBasedTableIterator::Next() {
assert(block_iter_points_to_real_block_);
data_block_iter_.Next();
FindKeyForward();
}
void BlockBasedTableIterator::Prev() {
assert(block_iter_points_to_real_block_);
data_block_iter_.Prev();
FindKeyBackward();
}
void BlockBasedTableIterator::InitDataBlock() {
BlockHandle data_block_handle;
Slice handle_slice = index_iter_->value();
if (!block_iter_points_to_real_block_ ||
handle_slice.compare(prev_index_value_) != 0) {
if (block_iter_points_to_real_block_) {
ResetDataIter();
}
Status s = data_block_handle.DecodeFrom(&handle_slice);
auto* rep = table_->get_rep();
// Automatically prefetch additional data when a range scan (iterator) does
// more than 2 sequential IOs. This is enabled only when
// ReadOptions.readahead_size is 0.
if (read_options_.readahead_size == 0) {
if (num_file_reads_ < 2) {
num_file_reads_++;
} else if (data_block_handle.offset() +
static_cast<size_t>(data_block_handle.size()) +
kBlockTrailerSize >
readahead_limit_) {
num_file_reads_++;
// Do not readahead more than kMaxReadaheadSize.
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_);
table_->get_rep()->file->Prefetch(data_block_handle.offset(),
readahead_size_);
readahead_limit_ = data_block_handle.offset() + readahead_size_;
// Keep exponentially increasing readahead size until kMaxReadaheadSize.
readahead_size_ *= 2;
}
}
BlockBasedTable::NewDataBlockIterator(rep, read_options_, data_block_handle,
&data_block_iter_, false,
/* get_context */ nullptr, s);
block_iter_points_to_real_block_ = true;
}
}
void BlockBasedTableIterator::FindKeyForward() {
is_out_of_bound_ = false;
// TODO the while loop inherits from two-level-iterator. We don't know
// whether a block can be empty so it can be replaced by an "if".
while (!data_block_iter_.Valid()) {
if (!data_block_iter_.status().ok()) {
return;
}
ResetDataIter();
// We used to check the current index key for upperbound.
// It will only save a data reading for a small percentage of use cases,
// so for code simplicity, we removed it. We can add it back if there is a
// significnat performance regression.
index_iter_->Next();
if (index_iter_->Valid()) {
InitDataBlock();
data_block_iter_.SeekToFirst();
} else {
return;
}
}
// Check upper bound on the current key
bool reached_upper_bound =
(read_options_.iterate_upper_bound != nullptr &&
block_iter_points_to_real_block_ && data_block_iter_.Valid() &&
icomp_.user_comparator()->Compare(ExtractUserKey(data_block_iter_.key()),
*read_options_.iterate_upper_bound) >=
0);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound",
&reached_upper_bound);
if (reached_upper_bound) {
is_out_of_bound_ = true;
ResetDataIter();
return;
}
}
void BlockBasedTableIterator::FindKeyBackward() {
while (!data_block_iter_.Valid()) {
if (!data_block_iter_.status().ok()) {
return;
}
ResetDataIter();
index_iter_->Prev();
if (index_iter_->Valid()) {
InitDataBlock();
data_block_iter_.SeekToLast();
} else {
return;
}
}
// We could have check lower bound here too, but we opt not to do it for
// code simplicity.
}
InternalIterator* BlockBasedTable::NewIterator(const ReadOptions& read_options,
Arena* arena,
bool skip_filters) {
return NewTwoLevelIterator(
new BlockEntryIteratorState(this, read_options,
&rep_->internal_comparator, skip_filters),
NewIndexIterator(read_options), arena);
if (arena == nullptr) {
return new BlockBasedTableIterator(
this, read_options, rep_->internal_comparator,
NewIndexIterator(read_options),
!skip_filters && !read_options.total_order_seek &&
rep_->ioptions.prefix_extractor != nullptr);
} else {
auto* mem = arena->AllocateAligned(sizeof(BlockBasedTableIterator));
return new (mem) BlockBasedTableIterator(
this, read_options, rep_->internal_comparator,
NewIndexIterator(read_options),
!skip_filters && !read_options.total_order_seek &&
rep_->ioptions.prefix_extractor != nullptr);
}
}
InternalIterator* BlockBasedTable::NewRangeTombstoneIterator(
@ -1854,8 +2014,8 @@ InternalIterator* BlockBasedTable::NewRangeTombstoneIterator(
}
std::string str;
rep_->range_del_handle.EncodeTo(&str);
// The meta-block exists but isn't in uncompressed block cache (maybe because
// it is disabled), so go through the full lookup process.
// The meta-block exists but isn't in uncompressed block cache (maybe
// because it is disabled), so go through the full lookup process.
return NewDataBlockIterator(rep_, read_options, Slice(str));
}
@ -1932,8 +2092,8 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
if (read_options.read_tier == kBlockCacheTier &&
biter.status().IsIncomplete()) {
// couldn't get block from block_cache
// Update Saver.state to Found because we are only looking for whether
// we can guarantee the key is not there when "no_io" is set
// Update Saver.state to Found because we are only looking for
// whether we can guarantee the key is not there when "no_io" is set
get_context->MarkKeyMayExist();
break;
}
@ -2098,8 +2258,8 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
Slice cache_key =
GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
handle, cache_key_storage);
GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, handle,
cache_key_storage);
Slice ckey;
s = GetDataBlockFromCache(

View File

@ -22,6 +22,7 @@
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "table/persistent_cache_helper.h"
@ -33,8 +34,6 @@
namespace rocksdb {
class Block;
class BlockIter;
class BlockHandle;
class Cache;
class FilterBlockReader;
@ -201,29 +200,36 @@ class BlockBasedTable : public TableReader {
// The key retrieved are internal keys.
Status GetKVPairsFromDataBlocks(std::vector<KVPairBlock>* kv_pair_blocks);
class BlockEntryIteratorState;
template <class TValue>
struct CachableEntry;
struct Rep;
Rep* get_rep() { return rep_; }
// input_iter: if it is not null, update this one and return it as Iterator
static BlockIter* NewDataBlockIterator(Rep* rep, const ReadOptions& ro,
const Slice& index_value,
BlockIter* input_iter = nullptr,
bool is_index = false,
GetContext* get_context = nullptr);
static BlockIter* NewDataBlockIterator(Rep* rep, const ReadOptions& ro,
const BlockHandle& block_hanlde,
BlockIter* input_iter = nullptr,
bool is_index = false,
GetContext* get_context = nullptr,
Status s = Status());
class PartitionedIndexIteratorState;
friend class PartitionIndexReader;
protected:
template <class TValue>
struct CachableEntry;
struct Rep;
Rep* rep_;
explicit BlockBasedTable(Rep* rep) : rep_(rep) {}
private:
friend class MockedBlockBasedTable;
static std::atomic<uint64_t> next_cache_key_id_;
// input_iter: if it is not null, update this one and return it as Iterator
static BlockIter* NewDataBlockIterator(
Rep* rep, const ReadOptions& ro, const Slice& index_value,
BlockIter* input_iter = nullptr, bool is_index = false,
GetContext* get_context = nullptr);
static BlockIter* NewDataBlockIterator(
Rep* rep, const ReadOptions& ro, const BlockHandle& block_hanlde,
BlockIter* input_iter = nullptr, bool is_index = false,
GetContext* get_context = nullptr, Status s = Status());
// If block cache enabled (compressed or uncompressed), looks for the block
// identified by handle in (1) uncompressed cache, (2) compressed cache, and
@ -357,35 +363,18 @@ class BlockBasedTable : public TableReader {
};
// Maitaning state of a two-level iteration on a partitioned index structure
class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
class BlockBasedTable::PartitionedIndexIteratorState
: public TwoLevelIteratorState {
public:
BlockEntryIteratorState(
BlockBasedTable* table, const ReadOptions& read_options,
const InternalKeyComparator* icomparator, bool skip_filters,
bool is_index = false,
PartitionedIndexIteratorState(
BlockBasedTable* table,
std::unordered_map<uint64_t, CachableEntry<Block>>* block_map = nullptr);
InternalIterator* NewSecondaryIterator(const Slice& index_value) override;
bool PrefixMayMatch(const Slice& internal_key) override;
bool KeyReachedUpperBound(const Slice& internal_key) override;
private:
// Don't own table_
BlockBasedTable* table_;
const ReadOptions read_options_;
const InternalKeyComparator* icomparator_;
bool skip_filters_;
// true if the 2nd level iterator is on indexes instead of on user data.
bool is_index_;
std::unordered_map<uint64_t, CachableEntry<Block>>* block_map_;
port::RWMutex cleaner_mu;
static const size_t kInitReadaheadSize = 8 * 1024;
// Found that 256 KB readahead size provides the best performance, based on
// experiments.
static const size_t kMaxReadaheadSize;
size_t readahead_size_ = kInitReadaheadSize;
size_t readahead_limit_ = 0;
int num_file_reads_ = 0;
};
// CachableEntry represents the entries that *may* be fetched from block cache.
@ -504,4 +493,121 @@ struct BlockBasedTable::Rep {
bool closed = false;
};
class BlockBasedTableIterator : public InternalIterator {
public:
BlockBasedTableIterator(BlockBasedTable* table,
const ReadOptions& read_options,
const InternalKeyComparator& icomp,
InternalIterator* index_iter, bool check_filter)
: table_(table),
read_options_(read_options),
icomp_(icomp),
index_iter_(index_iter),
pinned_iters_mgr_(nullptr),
block_iter_points_to_real_block_(false),
check_filter_(check_filter) {}
~BlockBasedTableIterator() { delete index_iter_; }
void Seek(const Slice& target) override;
void SeekForPrev(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
void Next() override;
void Prev() override;
bool Valid() const override {
return block_iter_points_to_real_block_ && data_block_iter_.Valid();
}
Slice key() const override {
assert(Valid());
return data_block_iter_.key();
}
Slice value() const override {
assert(Valid());
return data_block_iter_.value();
}
Status status() const override {
// It'd be nice if status() returned a const Status& instead of a Status
if (!index_iter_->status().ok()) {
return index_iter_->status();
} else if (block_iter_points_to_real_block_ &&
!data_block_iter_.status().ok()) {
return data_block_iter_.status();
} else {
return Status::OK();
}
}
bool IsOutOfBound() override { return is_out_of_bound_; }
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
}
bool IsKeyPinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
block_iter_points_to_real_block_;
}
bool IsValuePinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
block_iter_points_to_real_block_;
}
bool CheckPrefixMayMatch(const Slice& ikey) {
if (check_filter_ && !table_->PrefixMayMatch(ikey)) {
// TODO remember the iterator is invalidated because of prefix
// match. This can avoid the upper level file iterator to falsely
// believe the position is the end of the SST file and move to
// the first key of the next file.
ResetDataIter();
return false;
}
return true;
}
void ResetDataIter() {
if (block_iter_points_to_real_block_) {
if (pinned_iters_mgr_ != nullptr) {
data_block_iter_.DelegateCleanupsTo(pinned_iters_mgr_);
}
data_block_iter_.~BlockIter();
new (&data_block_iter_) BlockIter();
block_iter_points_to_real_block_ = false;
}
}
void SavePrevIndexValue() {
if (block_iter_points_to_real_block_) {
// Reseek. If they end up with the same data block, we shouldn't re-fetch
// the same data block.
Slice v = index_iter_->value();
prev_index_value_.assign(v.data(), v.size());
}
}
void InitDataBlock();
void FindKeyForward();
void FindKeyBackward();
private:
BlockBasedTable* table_;
const ReadOptions read_options_;
const InternalKeyComparator& icomp_;
InternalIterator* index_iter_;
PinnedIteratorsManager* pinned_iters_mgr_;
BlockIter data_block_iter_;
bool block_iter_points_to_real_block_;
bool is_out_of_bound_ = false;
bool check_filter_;
// TODO use block offset instead
std::string prev_index_value_;
static const size_t kInitReadaheadSize = 8 * 1024;
// Found that 256 KB readahead size provides the best performance, based on
// experiments.
static const size_t kMaxReadaheadSize;
size_t readahead_size_ = kInitReadaheadSize;
size_t readahead_limit_ = 0;
int num_file_reads_ = 0;
};
} // namespace rocksdb

View File

@ -69,6 +69,10 @@ class InternalIterator : public Cleanable {
// satisfied without doing some IO, then this returns Status::Incomplete().
virtual Status status() const = 0;
// True if the iterator is invalidated because it is out of the iterator
// upper bound
virtual bool IsOutOfBound() { return false; }
// Pass the PinnedIteratorsManager to the Iterator, most Iterators dont
// communicate with PinnedIteratorsManager so default implementation is no-op
// but for Iterators that need to communicate with PinnedIteratorsManager

View File

@ -22,22 +22,9 @@ namespace {
class TwoLevelIterator : public InternalIterator {
public:
explicit TwoLevelIterator(TwoLevelIteratorState* state,
InternalIterator* first_level_iter,
bool need_free_iter_and_state);
InternalIterator* first_level_iter);
virtual ~TwoLevelIterator() {
// Assert that the TwoLevelIterator is never deleted while Pinning is
// Enabled.
assert(!pinned_iters_mgr_ ||
(pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled()));
first_level_iter_.DeleteIter(!need_free_iter_and_state_);
second_level_iter_.DeleteIter(false);
if (need_free_iter_and_state_) {
delete state_;
} else {
state_->~TwoLevelIteratorState();
}
}
virtual ~TwoLevelIterator() { delete state_; }
virtual void Seek(const Slice& target) override;
virtual void SeekForPrev(const Slice& target) override;
@ -68,20 +55,9 @@ class TwoLevelIterator : public InternalIterator {
}
virtual void SetPinnedItersMgr(
PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
first_level_iter_.SetPinnedItersMgr(pinned_iters_mgr);
if (second_level_iter_.iter()) {
second_level_iter_.SetPinnedItersMgr(pinned_iters_mgr);
}
}
virtual bool IsKeyPinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
second_level_iter_.iter() && second_level_iter_.IsKeyPinned();
}
virtual bool IsValuePinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
second_level_iter_.iter() && second_level_iter_.IsValuePinned();
}
virtual bool IsKeyPinned() const override { return false; }
virtual bool IsValuePinned() const override { return false; }
private:
void SaveError(const Status& s) {
@ -95,8 +71,6 @@ class TwoLevelIterator : public InternalIterator {
TwoLevelIteratorState* state_;
IteratorWrapper first_level_iter_;
IteratorWrapper second_level_iter_; // May be nullptr
bool need_free_iter_and_state_;
PinnedIteratorsManager* pinned_iters_mgr_;
Status status_;
// If second_level_iter is non-nullptr, then "data_block_handle_" holds the
// "index_value" passed to block_function_ to create the second_level_iter.
@ -104,19 +78,10 @@ class TwoLevelIterator : public InternalIterator {
};
TwoLevelIterator::TwoLevelIterator(TwoLevelIteratorState* state,
InternalIterator* first_level_iter,
bool need_free_iter_and_state)
: state_(state),
first_level_iter_(first_level_iter),
need_free_iter_and_state_(need_free_iter_and_state),
pinned_iters_mgr_(nullptr) {}
InternalIterator* first_level_iter)
: state_(state), first_level_iter_(first_level_iter) {}
void TwoLevelIterator::Seek(const Slice& target) {
if (state_->check_prefix_may_match &&
!state_->PrefixMayMatch(target)) {
SetSecondLevelIterator(nullptr);
return;
}
first_level_iter_.Seek(target);
InitDataBlock();
@ -127,10 +92,6 @@ void TwoLevelIterator::Seek(const Slice& target) {
}
void TwoLevelIterator::SeekForPrev(const Slice& target) {
if (state_->check_prefix_may_match && !state_->PrefixMayMatch(target)) {
SetSecondLevelIterator(nullptr);
return;
}
first_level_iter_.Seek(target);
InitDataBlock();
if (second_level_iter_.iter() != nullptr) {
@ -183,8 +144,7 @@ void TwoLevelIterator::SkipEmptyDataBlocksForward() {
(!second_level_iter_.Valid() &&
!second_level_iter_.status().IsIncomplete())) {
// Move to next block
if (!first_level_iter_.Valid() ||
state_->KeyReachedUpperBound(first_level_iter_.key())) {
if (!first_level_iter_.Valid()) {
SetSecondLevelIterator(nullptr);
return;
}
@ -217,17 +177,8 @@ void TwoLevelIterator::SetSecondLevelIterator(InternalIterator* iter) {
if (second_level_iter_.iter() != nullptr) {
SaveError(second_level_iter_.status());
}
if (pinned_iters_mgr_ && iter) {
iter->SetPinnedItersMgr(pinned_iters_mgr_);
}
InternalIterator* old_iter = second_level_iter_.Set(iter);
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
pinned_iters_mgr_->PinIterator(old_iter);
} else {
delete old_iter;
}
delete old_iter;
}
void TwoLevelIterator::InitDataBlock() {
@ -251,17 +202,7 @@ void TwoLevelIterator::InitDataBlock() {
} // namespace
InternalIterator* NewTwoLevelIterator(TwoLevelIteratorState* state,
InternalIterator* first_level_iter,
Arena* arena,
bool need_free_iter_and_state) {
if (arena == nullptr) {
return new TwoLevelIterator(state, first_level_iter,
need_free_iter_and_state);
} else {
auto mem = arena->AllocateAligned(sizeof(TwoLevelIterator));
return new (mem)
TwoLevelIterator(state, first_level_iter, need_free_iter_and_state);
}
InternalIterator* first_level_iter) {
return new TwoLevelIterator(state, first_level_iter);
}
} // namespace rocksdb

View File

@ -19,16 +19,10 @@ class InternalKeyComparator;
class Arena;
struct TwoLevelIteratorState {
explicit TwoLevelIteratorState(bool _check_prefix_may_match)
: check_prefix_may_match(_check_prefix_may_match) {}
TwoLevelIteratorState() {}
virtual ~TwoLevelIteratorState() {}
virtual InternalIterator* NewSecondaryIterator(const Slice& handle) = 0;
virtual bool PrefixMayMatch(const Slice& internal_key) = 0;
virtual bool KeyReachedUpperBound(const Slice& internal_key) = 0;
// If call PrefixMayMatch()
bool check_prefix_may_match;
};
@ -47,7 +41,6 @@ struct TwoLevelIteratorState {
// need_free_iter_and_state: free `state` and `first_level_iter` if
// true. Otherwise, just call destructor.
extern InternalIterator* NewTwoLevelIterator(
TwoLevelIteratorState* state, InternalIterator* first_level_iter,
Arena* arena = nullptr, bool need_free_iter_and_state = true);
TwoLevelIteratorState* state, InternalIterator* first_level_iter);
} // namespace rocksdb