ForwardIterator: reset incomplete iterators on Seek()
Summary: When reading from kBlockCacheTier, ForwardIterator's internal child iterators may end up in the incomplete state (read was unable to complete without doing disk I/O). `ForwardIterator::status()` will correctly report that; however, the iterator may be stuck in that state until all sub-iterators are rebuilt: * `NeedToSeekImmutable()` may return false even if some sub-iterators are incomplete * one of the child iterators may be an empty iterator without any state other that the kIncomplete status (created using `NewErrorIterator()`); seeking on any such iterator has no effect -- we need to construct it again Akin to rebuilding iterators after a superversion bump, this diff makes forward iterator reset all incomplete child iterators when `Seek()` or `Next()` are called. Test Plan: TEST_TMPDIR=/dev/shm/rocksdbtest ROCKSDB_TESTS=TailingIterator ./db_test Reviewers: igor, sdong, ljin Reviewed By: ljin Subscribers: lovro, march, leveldb Differential Revision: https://reviews.facebook.net/D22575
This commit is contained in:
parent
722d80c374
commit
0f9c43ea36
@ -6,9 +6,10 @@
|
||||
#ifndef ROCKSDB_LITE
|
||||
#include "db/forward_iterator.h"
|
||||
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <limits>
|
||||
|
||||
#include "db/db_impl.h"
|
||||
#include "db/db_iter.h"
|
||||
#include "db/column_family.h"
|
||||
@ -37,12 +38,16 @@ class LevelIterator : public Iterator {
|
||||
assert(file_index < files_.size());
|
||||
if (file_index != file_index_) {
|
||||
file_index_ = file_index;
|
||||
file_iter_.reset(cfd_->table_cache()->NewIterator(
|
||||
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
|
||||
files_[file_index_]->fd, nullptr /* table_reader_ptr */, false));
|
||||
Reset();
|
||||
}
|
||||
valid_ = false;
|
||||
}
|
||||
void Reset() {
|
||||
assert(file_index_ < files_.size());
|
||||
file_iter_.reset(cfd_->table_cache()->NewIterator(
|
||||
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
|
||||
files_[file_index_]->fd, nullptr /* table_reader_ptr */, false));
|
||||
}
|
||||
void SeekToLast() override {
|
||||
status_ = Status::NotSupported("LevelIterator::SeekToLast()");
|
||||
valid_ = false;
|
||||
@ -63,12 +68,15 @@ class LevelIterator : public Iterator {
|
||||
assert(file_iter_ != nullptr);
|
||||
file_iter_->Seek(internal_key);
|
||||
valid_ = file_iter_->Valid();
|
||||
assert(valid_);
|
||||
}
|
||||
void Next() override {
|
||||
assert(valid_);
|
||||
file_iter_->Next();
|
||||
while (!file_iter_->Valid()) {
|
||||
for (;;) {
|
||||
if (file_iter_->status().IsIncomplete() || file_iter_->Valid()) {
|
||||
valid_ = !file_iter_->status().IsIncomplete();
|
||||
return;
|
||||
}
|
||||
if (file_index_ + 1 >= files_.size()) {
|
||||
valid_ = false;
|
||||
return;
|
||||
@ -76,7 +84,6 @@ class LevelIterator : public Iterator {
|
||||
SetFileIndex(file_index_ + 1);
|
||||
file_iter_->SeekToFirst();
|
||||
}
|
||||
valid_ = file_iter_->Valid();
|
||||
}
|
||||
Slice key() const override {
|
||||
assert(valid_);
|
||||
@ -160,6 +167,8 @@ void ForwardIterator::SeekToFirst() {
|
||||
if (sv_ == nullptr ||
|
||||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
|
||||
RebuildIterators();
|
||||
} else if (status_.IsIncomplete()) {
|
||||
ResetIncompleteIterators();
|
||||
}
|
||||
SeekInternal(Slice(), true);
|
||||
}
|
||||
@ -168,6 +177,8 @@ void ForwardIterator::Seek(const Slice& internal_key) {
|
||||
if (sv_ == nullptr ||
|
||||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
|
||||
RebuildIterators();
|
||||
} else if (status_.IsIncomplete()) {
|
||||
ResetIncompleteIterators();
|
||||
}
|
||||
SeekInternal(internal_key, false);
|
||||
}
|
||||
@ -211,7 +222,15 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
|
||||
}
|
||||
l0_iters_[i]->Seek(internal_key);
|
||||
}
|
||||
if (l0_iters_[i]->Valid()) {
|
||||
|
||||
if (l0_iters_[i]->status().IsIncomplete()) {
|
||||
// if any of the immutable iterators is incomplete (no-io option was
|
||||
// used), we are unable to reliably find the smallest key
|
||||
assert(read_options_.read_tier == kBlockCacheTier);
|
||||
status_ = l0_iters_[i]->status();
|
||||
valid_ = false;
|
||||
return;
|
||||
} else if (l0_iters_[i]->Valid()) {
|
||||
immutable_min_heap_.push(l0_iters_[i]);
|
||||
}
|
||||
}
|
||||
@ -280,7 +299,14 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
|
||||
level_iters_[level - 1]->SetFileIndex(f_idx);
|
||||
seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
|
||||
level_iters_[level - 1]->Seek(internal_key);
|
||||
if (level_iters_[level - 1]->Valid()) {
|
||||
|
||||
if (level_iters_[level - 1]->status().IsIncomplete()) {
|
||||
// see above
|
||||
assert(read_options_.read_tier == kBlockCacheTier);
|
||||
status_ = level_iters_[level - 1]->status();
|
||||
valid_ = false;
|
||||
return;
|
||||
} else if (level_iters_[level - 1]->Valid()) {
|
||||
immutable_min_heap_.push(level_iters_[level - 1]);
|
||||
}
|
||||
}
|
||||
@ -304,7 +330,7 @@ void ForwardIterator::Next() {
|
||||
assert(valid_);
|
||||
|
||||
if (sv_ == nullptr ||
|
||||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
|
||||
sv_->version_number != cfd_->GetSuperVersionNumber()) {
|
||||
std::string current_key = key().ToString();
|
||||
Slice old_key(current_key.data(), current_key.size());
|
||||
|
||||
@ -320,9 +346,17 @@ void ForwardIterator::Next() {
|
||||
}
|
||||
|
||||
current_->Next();
|
||||
if (current_->Valid() && current_ != mutable_iter_) {
|
||||
immutable_min_heap_.push(current_);
|
||||
if (current_ != mutable_iter_) {
|
||||
if (current_->status().IsIncomplete()) {
|
||||
assert(read_options_.read_tier == kBlockCacheTier);
|
||||
status_ = current_->status();
|
||||
valid_ = false;
|
||||
return;
|
||||
} else if (current_->Valid()) {
|
||||
immutable_min_heap_.push(current_);
|
||||
}
|
||||
}
|
||||
|
||||
UpdateCurrent();
|
||||
}
|
||||
|
||||
@ -389,6 +423,29 @@ void ForwardIterator::RebuildIterators() {
|
||||
is_prev_set_ = false;
|
||||
}
|
||||
|
||||
void ForwardIterator::ResetIncompleteIterators() {
|
||||
const auto& l0_files = sv_->current->files_[0];
|
||||
for (uint32_t i = 0; i < l0_iters_.size(); ++i) {
|
||||
assert(i < l0_files.size());
|
||||
if (!l0_iters_[i]->status().IsIncomplete()) {
|
||||
continue;
|
||||
}
|
||||
delete l0_iters_[i];
|
||||
l0_iters_[i] = cfd_->table_cache()->NewIterator(
|
||||
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
|
||||
l0_files[i]->fd);
|
||||
}
|
||||
|
||||
for (auto* level_iter : level_iters_) {
|
||||
if (level_iter && level_iter->status().IsIncomplete()) {
|
||||
level_iter->Reset();
|
||||
}
|
||||
}
|
||||
|
||||
current_ = nullptr;
|
||||
is_prev_set_ = false;
|
||||
}
|
||||
|
||||
void ForwardIterator::UpdateCurrent() {
|
||||
if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
|
||||
current_ = nullptr;
|
||||
@ -417,7 +474,7 @@ void ForwardIterator::UpdateCurrent() {
|
||||
}
|
||||
|
||||
bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
|
||||
if (!is_prev_set_) {
|
||||
if (!valid_ || !is_prev_set_) {
|
||||
return true;
|
||||
}
|
||||
Slice prev_key = prev_key_.GetKey();
|
||||
|
@ -73,6 +73,7 @@ class ForwardIterator : public Iterator {
|
||||
private:
|
||||
void Cleanup();
|
||||
void RebuildIterators();
|
||||
void ResetIncompleteIterators();
|
||||
void SeekInternal(const Slice& internal_key, bool seek_to_first);
|
||||
void UpdateCurrent();
|
||||
bool NeedToSeekImmutable(const Slice& internal_key);
|
||||
|
@ -172,8 +172,9 @@ void TwoLevelIterator::InitDataBlock() {
|
||||
SetSecondLevelIterator(nullptr);
|
||||
} else {
|
||||
Slice handle = first_level_iter_.value();
|
||||
if (second_level_iter_.iter() != nullptr
|
||||
&& handle.compare(data_block_handle_) == 0) {
|
||||
if (second_level_iter_.iter() != nullptr &&
|
||||
!second_level_iter_.status().IsIncomplete() &&
|
||||
handle.compare(data_block_handle_) == 0) {
|
||||
// second_level_iter is already constructed with this iterator, so
|
||||
// no need to change anything
|
||||
} else {
|
||||
|
Loading…
Reference in New Issue
Block a user