diff --git a/db/version_set.cc b/db/version_set.cc index b21761c5b..8c4a33618 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1003,6 +1003,10 @@ class LevelIterator final : public InternalIterator { } } + void PassOne(const Slice& target); + + void PassTwo(const Slice& target); + TableCache* table_cache_; const ReadOptions& read_options_; const FileOptions& file_options_; @@ -1034,7 +1038,7 @@ class LevelIterator final : public InternalIterator { bool is_next_read_sequential_; }; -void LevelIterator::Seek(const Slice& target) { +void LevelIterator::PassOne(const Slice& target) { // Check whether the seek key fall under the same file bool need_to_reseek = true; if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) { @@ -1057,6 +1061,14 @@ void LevelIterator::Seek(const Slice& target) { if (file_iter_.iter() != nullptr) { file_iter_.Seek(target); } +} + +void LevelIterator::PassTwo(const Slice& target) { + if (read_options_.seek_optimization) { + if (file_iter_.iter() != nullptr) { + file_iter_.Seek(target); + } + } if (SkipEmptyFileForward() && prefix_extractor_ != nullptr && !read_options_.total_order_seek && !read_options_.auto_prefix_mode && file_iter_.iter() != nullptr && file_iter_.Valid()) { @@ -1091,6 +1103,22 @@ void LevelIterator::Seek(const Slice& target) { CheckMayBeOutOfLowerBound(); } +void LevelIterator::Seek(const Slice& target) { + if (read_options_.seek_optimization) { + // For the first pass, status will be OK. + if (status() == Status::OK()) { + PassOne(target); + } else if (status() == Status::TryAgain()) { + // second pass. + PassTwo(target); + } + } else { + // Sequential flow. + PassOne(target); + PassTwo(target); + } +} + void LevelIterator::SeekForPrev(const Slice& target) { size_t new_file_index = FindFile(icomparator_, *flevel_, target); if (new_file_index >= flevel_->num_files) { diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 75b595cc1..62feb3d03 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -9,11 +9,16 @@ #include "table/block_based/block_based_table_iterator.h" namespace ROCKSDB_NAMESPACE { -void BlockBasedTableIterator::Seek(const Slice& target) { SeekImpl(&target); } -void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr); } +void BlockBasedTableIterator::Seek(const Slice& target) { + SeekImpl(&target, true); +} -void BlockBasedTableIterator::SeekImpl(const Slice* target) { +void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr, false); } + +void BlockBasedTableIterator::PassOne(const Slice* target, + bool& needs_second_pass, + bool async_call) { is_out_of_bound_ = false; is_at_first_key_from_index_ = false; if (target && !CheckPrefixMayMatch(*target, IterDirection::kForward)) { @@ -74,7 +79,14 @@ void BlockBasedTableIterator::SeekImpl(const Slice* target) { } else { // Need to use the data block. if (!same_block) { - InitDataBlock(); + if (async_call) { + AsyncInitDataBlock(/* first_pass= */ true); + async_status_ = Status::TryAgain(); + } else { + InitDataBlock(); + } + needs_second_pass = true; + return; } else { // When the user does a reseek, the iterate_upper_bound might have // changed. CheckDataBlockWithinUpperBound() needs to be called @@ -92,6 +104,25 @@ void BlockBasedTableIterator::SeekImpl(const Slice* target) { FindKeyForward(); } + CheckOutOfBound(); + if (target) { + assert(!Valid() || icomp_.Compare(*target, key()) <= 0); + } +} + +void BlockBasedTableIterator::PassTwo(const Slice* target, bool async_call) { + if (async_call) { + // AsyncInitDataBlock for poll and results. + AsyncInitDataBlock(/*first_pass=*/false); + } + + if (target) { + block_iter_.Seek(*target); + } else { + block_iter_.SeekToFirst(); + } + + FindKeyForward(); CheckOutOfBound(); if (target) { @@ -99,6 +130,28 @@ void BlockBasedTableIterator::SeekImpl(const Slice* target) { } } +void BlockBasedTableIterator::SeekImpl(const Slice* target, bool two_pass) { + bool needs_second_pass = false; + // In case of SeekToFirst move to normal flow. + if (read_options_.seek_optimization && two_pass) { + if (async_status_ == Status::OK()) { + // First pass. + PassOne(target, needs_second_pass, /* async_call = */ true); + } else if (async_status_ == Status::TryAgain()) { + // Second pass. + PassTwo(target, /* async_call = */ true); + // Reset async_status_. + async_status_ = Status::OK(); + } + } else { + // Sequential flow. + PassOne(target, needs_second_pass, /* async_call = */ false); + if (needs_second_pass) { + PassTwo(target, /* async_call = */ false); + } + } +} + void BlockBasedTableIterator::SeekForPrev(const Slice& target) { is_out_of_bound_ = false; is_at_first_key_from_index_ = false; @@ -246,6 +299,50 @@ void BlockBasedTableIterator::InitDataBlock() { } } +void BlockBasedTableIterator::AsyncInitDataBlock(bool first_pass) { + if (first_pass) { + BlockHandle data_block_handle = index_iter_->value().handle; + if (!block_iter_points_to_real_block_ || + data_block_handle.offset() != prev_block_offset_ || + // if previous attempt of reading the block missed cache, try again + block_iter_.status().IsIncomplete()) { + if (block_iter_points_to_real_block_) { + ResetDataIter(); + } + auto* rep = table_->get_rep(); + + bool is_for_compaction = + lookup_context_.caller == TableReaderCaller::kCompaction; + // Prefetch additional data for range scans (iterators). + // Implicit auto readahead: + // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. + // Explicit user requested readahead: + // Enabled from the very first IO when ReadOptions.readahead_size is + // set. + block_prefetcher_.PrefetchIfNeeded( + rep, data_block_handle, read_options_.readahead_size, + is_for_compaction, read_options_.async_io); + + Status s; + table_->NewDataBlockIterator( + read_options_, data_block_handle, &block_iter_, BlockType::kData, + /*get_context=*/nullptr, &lookup_context_, s, + block_prefetcher_.prefetch_buffer(), + /*for_compaction=*/is_for_compaction); + /* + Make asynchronous call instead. + */ + } + } else { + // second pass. + // call Poll() or wait for the results. + // TODO akanksha: point block in second pass else status will return based + // on block_iter_points_to_real_block_. + block_iter_points_to_real_block_ = true; + CheckDataBlockWithinUpperBound(); + } +} + bool BlockBasedTableIterator::MaterializeCurrentBlock() { assert(is_at_first_key_from_index_); assert(!block_iter_points_to_real_block_); diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index fe659d9d0..d0f5a1e5e 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -95,7 +95,11 @@ class BlockBasedTableIterator : public InternalIteratorBase { if (!index_iter_->status().ok() && !index_iter_->status().IsNotFound()) { return index_iter_->status(); } else if (block_iter_points_to_real_block_) { + // block_iter_points_to_real_block_ is set false in ResetDataIter and true + // in InitDataBlock. return block_iter_.status(); + } else if (read_options_.seek_optimization) { + return async_status_; } else { return Status::OK(); } @@ -236,10 +240,13 @@ class BlockBasedTableIterator : public InternalIteratorBase { // TODO(Zhongyi): pick a better name bool need_upper_bound_check_; + Status async_status_; + // If `target` is null, seek to first. - void SeekImpl(const Slice* target); + void SeekImpl(const Slice* target, bool two_pass); void InitDataBlock(); + void AsyncInitDataBlock(bool first_pass); bool MaterializeCurrentBlock(); void FindKeyForward(); void FindBlockForward(); @@ -271,5 +278,8 @@ class BlockBasedTableIterator : public InternalIteratorBase { } return true; } + + void PassOne(const Slice* target, bool& needs_second_pass, bool async_call); + void PassTwo(const Slice* target, bool async_call); }; } // namespace ROCKSDB_NAMESPACE diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index a221a5b25..4db87785a 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -112,6 +112,10 @@ class MergingIterator : public InternalIterator { } PERF_COUNTER_ADD(seek_child_seek_count, 1); + + if (child.status() == Status::TryAgain()) { + continue; + } { // Strictly, we timed slightly more than min heap operation, // but these operations are very cheap. @@ -119,6 +123,14 @@ class MergingIterator : public InternalIterator { AddToMinHeapOrCheckStatus(&child); } } + + for (auto& child : children_) { + if (child.status() == Status::TryAgain()) { + child.Seek(target); + AddToMinHeapOrCheckStatus(&child); + } + } + direction_ = kForward; { PERF_TIMER_GUARD(seek_min_heap_time); @@ -359,6 +371,9 @@ void MergingIterator::SwitchToForward() { for (auto& child : children_) { if (&child != current_) { child.Seek(target); + if (child.status() == Status::TryAgain()) { + continue; + } if (child.Valid() && comparator_->Equal(target, child.key())) { assert(child.status().ok()); child.Next(); @@ -366,6 +381,18 @@ void MergingIterator::SwitchToForward() { } AddToMinHeapOrCheckStatus(&child); } + + for (auto& child : children_) { + if (child.status() == Status::TryAgain()) { + child.Seek(target); + if (child.Valid() && comparator_->Equal(target, child.key())) { + assert(child.status().ok()); + child.Next(); + } + AddToMinHeapOrCheckStatus(&child); + } + } + direction_ = kForward; }