Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
This commit is contained in:
akankshamahajan 2022-05-13 13:40:41 -07:00
parent 4f73d1d5dc
commit 6176006a09
4 changed files with 168 additions and 6 deletions

View File

@ -1003,6 +1003,10 @@ class LevelIterator final : public InternalIterator {
}
}
void PassOne(const Slice& target);
void PassTwo(const Slice& target);
TableCache* table_cache_;
const ReadOptions& read_options_;
const FileOptions& file_options_;
@ -1034,7 +1038,7 @@ class LevelIterator final : public InternalIterator {
bool is_next_read_sequential_;
};
void LevelIterator::Seek(const Slice& target) {
void LevelIterator::PassOne(const Slice& target) {
// Check whether the seek key fall under the same file
bool need_to_reseek = true;
if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
@ -1057,6 +1061,14 @@ void LevelIterator::Seek(const Slice& target) {
if (file_iter_.iter() != nullptr) {
file_iter_.Seek(target);
}
}
void LevelIterator::PassTwo(const Slice& target) {
if (read_options_.seek_optimization) {
if (file_iter_.iter() != nullptr) {
file_iter_.Seek(target);
}
}
if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
!read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
file_iter_.iter() != nullptr && file_iter_.Valid()) {
@ -1091,6 +1103,22 @@ void LevelIterator::Seek(const Slice& target) {
CheckMayBeOutOfLowerBound();
}
void LevelIterator::Seek(const Slice& target) {
if (read_options_.seek_optimization) {
// For the first pass, status will be OK.
if (status() == Status::OK()) {
PassOne(target);
} else if (status() == Status::TryAgain()) {
// second pass.
PassTwo(target);
}
} else {
// Sequential flow.
PassOne(target);
PassTwo(target);
}
}
void LevelIterator::SeekForPrev(const Slice& target) {
size_t new_file_index = FindFile(icomparator_, *flevel_, target);
if (new_file_index >= flevel_->num_files) {

View File

@ -9,11 +9,16 @@
#include "table/block_based/block_based_table_iterator.h"
namespace ROCKSDB_NAMESPACE {
void BlockBasedTableIterator::Seek(const Slice& target) { SeekImpl(&target); }
void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr); }
void BlockBasedTableIterator::Seek(const Slice& target) {
SeekImpl(&target, true);
}
void BlockBasedTableIterator::SeekImpl(const Slice* target) {
void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr, false); }
void BlockBasedTableIterator::PassOne(const Slice* target,
bool& needs_second_pass,
bool async_call) {
is_out_of_bound_ = false;
is_at_first_key_from_index_ = false;
if (target && !CheckPrefixMayMatch(*target, IterDirection::kForward)) {
@ -74,7 +79,14 @@ void BlockBasedTableIterator::SeekImpl(const Slice* target) {
} else {
// Need to use the data block.
if (!same_block) {
if (async_call) {
AsyncInitDataBlock(/* first_pass= */ true);
async_status_ = Status::TryAgain();
} else {
InitDataBlock();
}
needs_second_pass = true;
return;
} else {
// When the user does a reseek, the iterate_upper_bound might have
// changed. CheckDataBlockWithinUpperBound() needs to be called
@ -92,6 +104,25 @@ void BlockBasedTableIterator::SeekImpl(const Slice* target) {
FindKeyForward();
}
CheckOutOfBound();
if (target) {
assert(!Valid() || icomp_.Compare(*target, key()) <= 0);
}
}
void BlockBasedTableIterator::PassTwo(const Slice* target, bool async_call) {
if (async_call) {
// AsyncInitDataBlock for poll and results.
AsyncInitDataBlock(/*first_pass=*/false);
}
if (target) {
block_iter_.Seek(*target);
} else {
block_iter_.SeekToFirst();
}
FindKeyForward();
CheckOutOfBound();
if (target) {
@ -99,6 +130,28 @@ void BlockBasedTableIterator::SeekImpl(const Slice* target) {
}
}
void BlockBasedTableIterator::SeekImpl(const Slice* target, bool two_pass) {
bool needs_second_pass = false;
// In case of SeekToFirst move to normal flow.
if (read_options_.seek_optimization && two_pass) {
if (async_status_ == Status::OK()) {
// First pass.
PassOne(target, needs_second_pass, /* async_call = */ true);
} else if (async_status_ == Status::TryAgain()) {
// Second pass.
PassTwo(target, /* async_call = */ true);
// Reset async_status_.
async_status_ = Status::OK();
}
} else {
// Sequential flow.
PassOne(target, needs_second_pass, /* async_call = */ false);
if (needs_second_pass) {
PassTwo(target, /* async_call = */ false);
}
}
}
void BlockBasedTableIterator::SeekForPrev(const Slice& target) {
is_out_of_bound_ = false;
is_at_first_key_from_index_ = false;
@ -246,6 +299,50 @@ void BlockBasedTableIterator::InitDataBlock() {
}
}
void BlockBasedTableIterator::AsyncInitDataBlock(bool first_pass) {
if (first_pass) {
BlockHandle data_block_handle = index_iter_->value().handle;
if (!block_iter_points_to_real_block_ ||
data_block_handle.offset() != prev_block_offset_ ||
// if previous attempt of reading the block missed cache, try again
block_iter_.status().IsIncomplete()) {
if (block_iter_points_to_real_block_) {
ResetDataIter();
}
auto* rep = table_->get_rep();
bool is_for_compaction =
lookup_context_.caller == TableReaderCaller::kCompaction;
// Prefetch additional data for range scans (iterators).
// Implicit auto readahead:
// Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0.
// Explicit user requested readahead:
// Enabled from the very first IO when ReadOptions.readahead_size is
// set.
block_prefetcher_.PrefetchIfNeeded(
rep, data_block_handle, read_options_.readahead_size,
is_for_compaction, read_options_.async_io);
Status s;
table_->NewDataBlockIterator<DataBlockIter>(
read_options_, data_block_handle, &block_iter_, BlockType::kData,
/*get_context=*/nullptr, &lookup_context_, s,
block_prefetcher_.prefetch_buffer(),
/*for_compaction=*/is_for_compaction);
/*
Make asynchronous call instead.
*/
}
} else {
// second pass.
// call Poll() or wait for the results.
// TODO akanksha: point block in second pass else status will return based
// on block_iter_points_to_real_block_.
block_iter_points_to_real_block_ = true;
CheckDataBlockWithinUpperBound();
}
}
bool BlockBasedTableIterator::MaterializeCurrentBlock() {
assert(is_at_first_key_from_index_);
assert(!block_iter_points_to_real_block_);

View File

@ -95,7 +95,11 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
if (!index_iter_->status().ok() && !index_iter_->status().IsNotFound()) {
return index_iter_->status();
} else if (block_iter_points_to_real_block_) {
// block_iter_points_to_real_block_ is set false in ResetDataIter and true
// in InitDataBlock.
return block_iter_.status();
} else if (read_options_.seek_optimization) {
return async_status_;
} else {
return Status::OK();
}
@ -236,10 +240,13 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
// TODO(Zhongyi): pick a better name
bool need_upper_bound_check_;
Status async_status_;
// If `target` is null, seek to first.
void SeekImpl(const Slice* target);
void SeekImpl(const Slice* target, bool two_pass);
void InitDataBlock();
void AsyncInitDataBlock(bool first_pass);
bool MaterializeCurrentBlock();
void FindKeyForward();
void FindBlockForward();
@ -271,5 +278,8 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
}
return true;
}
void PassOne(const Slice* target, bool& needs_second_pass, bool async_call);
void PassTwo(const Slice* target, bool async_call);
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -112,6 +112,10 @@ class MergingIterator : public InternalIterator {
}
PERF_COUNTER_ADD(seek_child_seek_count, 1);
if (child.status() == Status::TryAgain()) {
continue;
}
{
// Strictly, we timed slightly more than min heap operation,
// but these operations are very cheap.
@ -119,6 +123,14 @@ class MergingIterator : public InternalIterator {
AddToMinHeapOrCheckStatus(&child);
}
}
for (auto& child : children_) {
if (child.status() == Status::TryAgain()) {
child.Seek(target);
AddToMinHeapOrCheckStatus(&child);
}
}
direction_ = kForward;
{
PERF_TIMER_GUARD(seek_min_heap_time);
@ -359,6 +371,9 @@ void MergingIterator::SwitchToForward() {
for (auto& child : children_) {
if (&child != current_) {
child.Seek(target);
if (child.status() == Status::TryAgain()) {
continue;
}
if (child.Valid() && comparator_->Equal(target, child.key())) {
assert(child.status().ok());
child.Next();
@ -366,6 +381,18 @@ void MergingIterator::SwitchToForward() {
}
AddToMinHeapOrCheckStatus(&child);
}
for (auto& child : children_) {
if (child.status() == Status::TryAgain()) {
child.Seek(target);
if (child.Valid() && comparator_->Equal(target, child.key())) {
assert(child.status().ok());
child.Next();
}
AddToMinHeapOrCheckStatus(&child);
}
}
direction_ = kForward;
}