From 8bf555f487d1de84a4fb19cb97b9ae1a8dbebc60 Mon Sep 17 00:00:00 2001 From: Mike Kolupaev Date: Thu, 17 May 2018 02:44:14 -0700 Subject: [PATCH] Change and clarify the relationship between Valid(), status() and Seek*() for all iterators. Also fix some bugs Summary: Before this PR, Iterator/InternalIterator may simultaneously have non-ok status() and Valid() = true. That state means that the last operation failed, but the iterator is nevertheless positioned on some unspecified record. Likely intended uses of that are: * If some sst files are corrupted, a normal iterator can be used to read the data from files that are not corrupted. * When using read_tier = kBlockCacheTier, read the data that's in block cache, skipping over the data that is not. However, this behavior wasn't documented well (and until recently the wiki on github had misleading incorrect information). In the code there's a lot of confusion about the relationship between status() and Valid(), and about whether Seek()/SeekToLast()/etc reset the status or not. There were a number of bugs caused by this confusion, both inside rocksdb and in the code that uses rocksdb (including ours). This PR changes the convention to: * If status() is not ok, Valid() always returns false. * Any seek operation resets status. (Before the PR, it depended on iterator type and on particular error.) This does sacrifice the two use cases listed above, but siying said it's ok. Overview of the changes: * A commit that adds missing status checks in MergingIterator. This fixes a bug that actually affects us, and we need it fixed. `DBIteratorTest.NonBlockingIterationBugRepro` explains the scenario. * Changes to lots of iterator types to make all of them conform to the new convention. Some bug fixes along the way. By far the biggest changes are in DBIter, which is a big messy piece of code; I tried to make it less big and messy but mostly failed. * A stress-test for DBIter, to gain some confidence that I didn't break it. It does a few million random operations on the iterator, while occasionally modifying the underlying data (like ForwardIterator does) and occasionally returning non-ok status from internal iterator. To find the iterator types that needed changes I searched for "public .*Iterator" in the code. Here's an overview of all 27 iterator types: Iterators that didn't need changes: * status() is always ok(), or Valid() is always false: MemTableIterator, ModelIter, TestIterator, KVIter (2 classes with this name anonymous namespaces), LoggingForwardVectorIterator, VectorIterator, MockTableIterator, EmptyIterator, EmptyInternalIterator. * Thin wrappers that always pass through Valid() and status(): ArenaWrappedDBIter, TtlIterator, InternalIteratorFromIterator. Iterators with changes (see inline comments for details): * DBIter - an overhaul: - It used to silently skip corrupted keys (`FindParseableKey()`), which seems dangerous. This PR makes it just stop immediately after encountering a corrupted key, just like it would for other kinds of corruption. Let me know if there was actually some deeper meaning in this behavior and I should put it back. - It had a few code paths silently discarding subiterator's status. The stress test caught a few. - The backwards iteration code path was expecting the internal iterator's set of keys to be immutable. It's probably always true in practice at the moment, since ForwardIterator doesn't support backwards iteration, but this PR fixes it anyway. See added DBIteratorTest.ReverseToForwardBug for an example. - Some parts of backwards iteration code path even did things like `assert(iter_->Valid())` after a seek, which is never a safe assumption. - It used to not reset status on seek for some types of errors. - Some simplifications and better comments. - Some things got more complicated from the added error handling. I'm open to ideas for how to make it nicer. * MergingIterator - check status after every operation on every subiterator, and in some places assert that valid subiterators have ok status. * ForwardIterator - changed to the new convention, also slightly simplified. * ForwardLevelIterator - fixed some bugs and simplified. * LevelIterator - simplified. * TwoLevelIterator - changed to the new convention. Also fixed a bug that would make SeekForPrev() sometimes silently ignore errors from first_level_iter_. * BlockBasedTableIterator - minor changes. * BlockIter - replaced `SetStatus()` with `Invalidate()` to make sure non-ok BlockIter is always invalid. * PlainTableIterator - some seeks used to not reset status. * CuckooTableIterator - tiny code cleanup. * ManagedIterator - fixed some bugs. * BaseDeltaIterator - changed to the new convention and fixed a bug. * BlobDBIterator - seeks used to not reset status. * KeyConvertingIterator - some small change. Closes https://github.com/facebook/rocksdb/pull/3810 Differential Revision: D7888019 Pulled By: al13n321 fbshipit-source-id: 4aaf6d3421c545d16722a815b2fa2e7912bc851d --- CMakeLists.txt | 1 + HISTORY.md | 2 + Makefile | 4 + TARGETS | 5 + db/corruption_test.cc | 4 +- db/db_iter.cc | 614 ++++++++++------- db/db_iter_stress_test.cc | 652 ++++++++++++++++++ db/db_iter_test.cc | 95 ++- db/db_iterator_test.cc | 40 ++ db/forward_iterator.cc | 46 +- db/managed_iterator.cc | 56 +- db/managed_iterator.h | 2 +- db/version_set.cc | 29 +- include/rocksdb/iterator.h | 6 +- src.mk | 1 + table/block.cc | 5 +- table/block.h | 15 +- table/block_based_table_reader.cc | 12 +- table/block_based_table_reader.h | 7 +- table/cuckoo_table_reader.cc | 3 +- table/internal_iterator.h | 6 +- table/iterator_wrapper.h | 1 + table/merging_iterator.cc | 71 +- table/plain_table_reader.cc | 4 + table/table_test.cc | 3 +- table/two_level_iterator.cc | 13 +- utilities/blob_db/blob_db_iterator.h | 1 + .../write_batch_with_index.cc | 29 + 28 files changed, 1345 insertions(+), 382 deletions(-) create mode 100644 db/db_iter_stress_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 160a53641..7a5df4213 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -845,6 +845,7 @@ if(WITH_TESTS) db/db_inplace_update_test.cc db/db_io_failure_test.cc db/db_iter_test.cc + db/db_iter_stress_test.cc db/db_iterator_test.cc db/db_log_iter_test.cc db/db_memtable_test.cc diff --git a/HISTORY.md b/HISTORY.md index 7e32e73f7..1c482dd71 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,8 @@ * Touch-up to write-related counters in PerfContext. New counters added: write_scheduling_flushes_compactions_time, write_thread_wait_nanos. Counters whose behavior was fixed or modified: write_memtable_time, write_pre_and_post_process_time, write_delay_time. * Posix Env's NewRandomRWFile() will fail if the file doesn't exist. * Now, `DBOptions::use_direct_io_for_flush_and_compaction` only applies to background writes, and `DBOptions::use_direct_reads` applies to both user reads and background reads. This conforms with Linux's `open(2)` manpage, which advises against simultaneously reading a file in buffered and direct modes, due to possibly undefined behavior and degraded performance. +* Iterator::Valid() always returns false if !status().ok(). So, now when doing a Seek() followed by some Next()s, there's no need to check status() after every operation. +* Iterator::Seek()/SeekForPrev()/SeekToFirst()/SeekToLast() always resets status(). ### New Features * Introduce TTL for level compaction so that all files older than ttl go through the compaction process to get rid of old data. diff --git a/Makefile b/Makefile index 9702fe278..5d0092d11 100644 --- a/Makefile +++ b/Makefile @@ -402,6 +402,7 @@ TESTS = \ db_blob_index_test \ db_bloom_filter_test \ db_iter_test \ + db_iter_stress_test \ db_log_iter_test \ db_compaction_filter_test \ db_compaction_test \ @@ -1195,6 +1196,9 @@ db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) db_iter_test: db/db_iter_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_iter_stress_test: db/db_iter_stress_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_universal_compaction_test: db/db_universal_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 20390f073..7bd3a45dc 100644 --- a/TARGETS +++ b/TARGETS @@ -560,6 +560,11 @@ ROCKS_TESTS = [ "db/db_iter_test.cc", "serial", ], + [ + "db_iter_stress_test", + "db/db_iter_stress_test.cc", + "serial", + ], [ "db_iterator_test", "db/db_iterator_test.cc", diff --git a/db/corruption_test.cc b/db/corruption_test.cc index cbd125b98..dbc09e753 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -333,9 +333,9 @@ TEST_F(CorruptionTest, TableFileIndexData) { Corrupt(kTableFile, -2000, 500); Reopen(); dbi = reinterpret_cast(db_); - // one full file should be readable, since only one was corrupted + // one full file may be readable, since only one was corrupted // the other file should be fully non-readable, since index was corrupted - Check(5000, 5000); + Check(0, 5000); ASSERT_NOK(dbi->VerifyChecksum()); } diff --git a/db/db_iter.cc b/db/db_iter.cc index 29b538de9..d2d1d0451 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -52,8 +52,13 @@ class DBIter final: public Iterator { public: // The following is grossly complicated. TODO: clean it up // Which direction is the iterator currently moving? - // (1) When moving forward, the internal iterator is positioned at - // the exact entry that yields this->key(), this->value() + // (1) When moving forward: + // (1a) if current_entry_is_merged_ = false, the internal iterator is + // positioned at the exact entry that yields this->key(), this->value() + // (1b) if current_entry_is_merged_ = true, the internal iterator is + // positioned immediately after the last entry that contributed to the + // current this->value(). That entry may or may not have key equal to + // this->key(). // (2) When moving backwards, the internal iterator is positioned // just before all entries whose user key == this->key(). enum Direction { @@ -194,6 +199,7 @@ class DBIter final: public Iterator { if (status_.ok()) { return iter_->status(); } else { + assert(!valid_); return status_; } } @@ -235,18 +241,21 @@ class DBIter final: public Iterator { void set_valid(bool v) { valid_ = v; } private: - void ReverseToForward(); - void ReverseToBackward(); - void PrevInternal(); - void FindParseableKey(ParsedInternalKey* ikey, Direction direction); + // For all methods in this block: + // PRE: iter_->Valid() && status_.ok() + // Return false if there was an error, and status() is non-ok, valid_ = false; + // in this case callers would usually stop what they were doing and return. + bool ReverseToForward(); + bool ReverseToBackward(); bool FindValueForCurrentKey(); bool FindValueForCurrentKeyUsingSeek(); - void FindPrevUserKey(); - void FindNextUserKey(); - inline void FindNextUserEntry(bool skipping, bool prefix_check); - void FindNextUserEntryInternal(bool skipping, bool prefix_check); + bool FindUserKeyBeforeSavedKey(); + inline bool FindNextUserEntry(bool skipping, bool prefix_check); + bool FindNextUserEntryInternal(bool skipping, bool prefix_check); bool ParseKey(ParsedInternalKey* key); - void MergeValuesNewToOld(); + bool MergeValuesNewToOld(); + + void PrevInternal(); bool TooManyInternalKeysSkipped(bool increment = true); bool IsVisible(SequenceNumber sequence); @@ -336,6 +345,7 @@ class DBIter final: public Iterator { inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { if (!ParseInternalKey(iter_->key(), ikey)) { status_ = Status::Corruption("corrupted internal key in DBIter"); + valid_ = false; ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s", iter_->key().ToString(true).c_str()); return false; @@ -346,12 +356,16 @@ inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { void DBIter::Next() { assert(valid_); + assert(status_.ok()); // Release temporarily pinned blocks from last operation ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); + bool ok = true; if (direction_ == kReverse) { - ReverseToForward(); + if (!ReverseToForward()) { + ok = false; + } } else if (iter_->Valid() && !current_entry_is_merged_) { // If the current value is not a merge, the iter position is the // current key, which is already returned. We can safely issue a @@ -365,13 +379,12 @@ void DBIter::Next() { if (statistics_ != nullptr) { local_stats_.next_count_++; } - // Now we point to the next internal position, for both of merge and - // not merge cases. - if (!iter_->Valid()) { + if (ok && iter_->Valid()) { + FindNextUserEntry(true /* skipping the current user key */, + prefix_same_as_start_); + } else { valid_ = false; - return; } - FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_); if (statistics_ != nullptr && valid_) { local_stats_.next_found_count_++; local_stats_.bytes_read_ += (key().size() + value().size()); @@ -392,15 +405,16 @@ void DBIter::Next() { // keys against the prefix of the seeked key. Set to false when // performing a seek without a key (e.g. SeekToFirst). Set to // prefix_same_as_start_ for other iterations. -inline void DBIter::FindNextUserEntry(bool skipping, bool prefix_check) { +inline bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) { PERF_TIMER_GUARD(find_next_user_entry_time); - FindNextUserEntryInternal(skipping, prefix_check); + return FindNextUserEntryInternal(skipping, prefix_check); } // Actual implementation of DBIter::FindNextUserEntry() -void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { +bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // Loop until we hit an acceptable entry to yield assert(iter_->Valid()); + assert(status_.ok()); assert(direction_ == kForward); current_entry_is_merged_ = false; @@ -420,9 +434,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { do { if (!ParseKey(&ikey_)) { - // Skip corrupted keys. - iter_->Next(); - continue; + return false; } if (iterate_upper_bound_ != nullptr && @@ -437,7 +449,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { } if (TooManyInternalKeysSkipped()) { - return; + return false; } if (IsVisible(ikey_.sequence)) { @@ -455,18 +467,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // if iterartor specified start_seqnum we // 1) return internal key, including the type // 2) return ikey only if ikey.seqnum >= start_seqnum_ - // not that if deletion seqnum is < start_seqnum_ we + // note that if deletion seqnum is < start_seqnum_ we // just skip it like in normal iterator. if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) { saved_key_.SetInternalKey(ikey_); - valid_=true; - return; + valid_ = true; + return true; } else { saved_key_.SetUserKey( ikey_.user_key, !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); - skipping = true; - PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + skipping = true; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); } break; case kTypeValue: @@ -480,7 +492,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { if (ikey_.sequence >= start_seqnum_) { saved_key_.SetInternalKey(ikey_); valid_ = true; - return; + return true; } else { // this key and all previous versions shouldn't be included, // skipping @@ -507,14 +519,15 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { "Encounter unexpected blob index. Please open DB with " "rocksdb::blob_db::BlobDB instead."); valid_ = false; - } else { - is_blob_ = true; - valid_ = true; + return false; } - return; + + is_blob_ = true; + valid_ = true; + return true; } else { valid_ = true; - return; + return true; } } break; @@ -535,8 +548,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // value current_entry_is_merged_ = true; valid_ = true; - MergeValuesNewToOld(); // Go to a different state machine - return; + return MergeValuesNewToOld(); // Go to a different state machine } break; default: @@ -545,13 +557,14 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { } } } else { - // This key was inserted after our snapshot was taken. PERF_COUNTER_ADD(internal_recent_skipped_count, 1); - // Here saved_key_ may contain some old key, or the default empty key, or - // key assigned by some random other method. We don't care. - if (user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey()) <= - 0) { + // This key was inserted after our snapshot was taken. + // If this happens too many times in a row for the same user key, we want + // to seek to the target sequence number. + int cmp = + user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey()); + if (cmp == 0 || (skipping && cmp <= 0)) { num_skipped++; } else { saved_key_.SetUserKey( @@ -591,7 +604,9 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { iter_->Next(); } } while (iter_->Valid()); + valid_ = false; + return iter_->status().ok(); } // Merge values of the same user key starting from the current iter_ position @@ -600,12 +615,12 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // saved_key_ stores the user key // POST: saved_value_ has the merged value for the user key // iter_ points to the next entry (or invalid) -void DBIter::MergeValuesNewToOld() { +bool DBIter::MergeValuesNewToOld() { if (!merge_operator_) { ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null."); status_ = Status::InvalidArgument("merge_operator_ must be set."); valid_ = false; - return; + return false; } // Temporarily pin the blocks that hold merge operands @@ -621,8 +636,7 @@ void DBIter::MergeValuesNewToOld() { for (iter_->Next(); iter_->Valid(); iter_->Next()) { TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand"); if (!ParseKey(&ikey)) { - // skip corrupted key - continue; + return false; } if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { @@ -639,7 +653,6 @@ void DBIter::MergeValuesNewToOld() { } else if (kTypeValue == ikey.type) { // hit a put, merge the put value with operands and store the // final result in saved_value_. We are done! - // ignore corruption if there is any. const Slice val = iter_->value(); s = MergeHelper::TimedFullMerge( merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(), @@ -647,10 +660,15 @@ void DBIter::MergeValuesNewToOld() { if (!s.ok()) { valid_ = false; status_ = s; + return false; } // iter_ is positioned after put iter_->Next(); - return; + if (!iter_->status().ok()) { + valid_ = false; + return false; + } + return true; } else if (kTypeMerge == ikey.type) { // hit a merge, add the value as an operand and run associative merge. // when complete, add result to operands and continue. @@ -668,12 +686,17 @@ void DBIter::MergeValuesNewToOld() { Status::NotSupported("Blob DB does not support merge operator."); } valid_ = false; - return; + return false; } else { assert(false); } } + if (!iter_->status().ok()) { + valid_ = false; + return false; + } + // we either exhausted all internal keys under this user key, or hit // a deletion marker. // feed null as the existing value to the merge operator, such that @@ -685,17 +708,27 @@ void DBIter::MergeValuesNewToOld() { if (!s.ok()) { valid_ = false; status_ = s; + return false; } + + assert(status_.ok()); + return true; } void DBIter::Prev() { assert(valid_); + assert(status_.ok()); ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); + bool ok = true; if (direction_ == kForward) { - ReverseToBackward(); + if (!ReverseToBackward()) { + ok = false; + } + } + if (ok) { + PrevInternal(); } - PrevInternal(); if (statistics_ != nullptr) { local_stats_.prev_count_++; if (valid_) { @@ -705,71 +738,76 @@ void DBIter::Prev() { } } -void DBIter::ReverseToForward() { - if (prefix_extractor_ != nullptr && !total_order_seek_) { +bool DBIter::ReverseToForward() { + assert(iter_->status().ok()); + + // When moving backwards, iter_ is positioned on _previous_ key, which may + // not exist or may have different prefix than the current key(). + // If that's the case, seek iter_ to current key. + if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) { IterKey last_key; last_key.SetInternalKey(ParsedInternalKey( saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); iter_->Seek(last_key.GetInternalKey()); } - FindNextUserKey(); + direction_ = kForward; - if (!iter_->Valid()) { - iter_->SeekToFirst(); - range_del_agg_.InvalidateTombstoneMapPositions(); + // Skip keys less than the current key() (a.k.a. saved_key_). + while (iter_->Valid()) { + ParsedInternalKey ikey; + if (!ParseKey(&ikey)) { + return false; + } + if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >= + 0) { + return true; + } + iter_->Next(); } + + if (!iter_->status().ok()) { + valid_ = false; + return false; + } + + return true; } -void DBIter::ReverseToBackward() { - if (prefix_extractor_ != nullptr && !total_order_seek_) { - IterKey last_key; - last_key.SetInternalKey(ParsedInternalKey(saved_key_.GetUserKey(), 0, - kValueTypeForSeekForPrev)); - iter_->SeekForPrev(last_key.GetInternalKey()); - } - if (current_entry_is_merged_) { - // Not placed in the same key. Need to call Prev() until finding the - // previous key. - if (!iter_->Valid()) { - iter_->SeekToLast(); - range_del_agg_.InvalidateTombstoneMapPositions(); - } - ParsedInternalKey ikey; - FindParseableKey(&ikey, kReverse); - while (iter_->Valid() && - user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) > - 0) { - assert(ikey.sequence != kMaxSequenceNumber); - if (!IsVisible(ikey.sequence)) { - PERF_COUNTER_ADD(internal_recent_skipped_count, 1); - } else { - PERF_COUNTER_ADD(internal_key_skipped_count, 1); - } - iter_->Prev(); - FindParseableKey(&ikey, kReverse); - } - } -#ifndef NDEBUG - if (iter_->Valid()) { - ParsedInternalKey ikey; - assert(ParseKey(&ikey)); - assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) <= - 0); - } -#endif +// Move iter_ to the key before saved_key_. +bool DBIter::ReverseToBackward() { + assert(iter_->status().ok()); + + // When current_entry_is_merged_ is true, iter_ may be positioned on the next + // key, which may not exist or may have prefix different from current. + // If that's the case, seek to saved_key_. + if (current_entry_is_merged_ && + ((prefix_extractor_ != nullptr && !total_order_seek_) || + !iter_->Valid())) { + IterKey last_key; + // Using kMaxSequenceNumber and kValueTypeForSeek + // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller + // than saved_key_. + last_key.SetInternalKey(ParsedInternalKey( + saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); + if (prefix_extractor_ != nullptr && !total_order_seek_) { + iter_->SeekForPrev(last_key.GetInternalKey()); + } else { + // Some iterators may not support SeekForPrev(), so we avoid using it + // when prefix seek mode is disabled. This is somewhat expensive + // (an extra Prev(), as well as an extra change of direction of iter_), + // so we may need to reconsider it later. + iter_->Seek(last_key.GetInternalKey()); + if (!iter_->Valid() && iter_->status().ok()) { + iter_->SeekToLast(); + } + } + } - FindPrevUserKey(); direction_ = kReverse; + return FindUserKeyBeforeSavedKey(); } void DBIter::PrevInternal() { - if (!iter_->Valid()) { - valid_ = false; - return; - } - - ParsedInternalKey ikey; - while (iter_->Valid()) { saved_key_.SetUserKey( ExtractUserKey(iter_->key()), @@ -791,38 +829,41 @@ void DBIter::PrevInternal() { return; } - if (FindValueForCurrentKey()) { - if (!iter_->Valid()) { - return; - } - FindParseableKey(&ikey, kReverse); - if (user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { - FindPrevUserKey(); - } + if (!FindValueForCurrentKey()) { // assigns valid_ + return; + } + + // Whether or not we found a value for current key, we need iter_ to end up + // on a smaller key. + if (!FindUserKeyBeforeSavedKey()) { + return; + } + + if (valid_) { + // Found the value. return; } if (TooManyInternalKeysSkipped(false)) { return; } - - if (!iter_->Valid()) { - break; - } - FindParseableKey(&ikey, kReverse); - if (user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { - FindPrevUserKey(); - } } + // We haven't found any key - iterator is not valid - // Or the prefix is different than start prefix - assert(!iter_->Valid()); valid_ = false; } -// This function checks, if the entry with biggest sequence_number <= sequence_ -// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in -// saved_value_ +// Used for backwards iteration. +// Looks at the entries with user key saved_key_ and finds the most up-to-date +// value for it, or executes a merge, or determines that the value was deleted. +// Sets valid_ to true if the value is found and is ready to be presented to +// the user through value(). +// Sets valid_ to false if the value was deleted, and we should try another key. +// Returns false if an error occurred, and !status().ok() and !valid_. +// +// PRE: iter_ is positioned on the last entry with user key equal to saved_key_. +// POST: iter_ is positioned on one of the entries equal to saved_key_, or on +// the entry just before them, or on the entry just after them. bool DBIter::FindValueForCurrentKey() { assert(iter_->Valid()); merge_context_.Clear(); @@ -832,20 +873,27 @@ bool DBIter::FindValueForCurrentKey() { ValueType last_not_merge_type = kTypeDeletion; ValueType last_key_entry_type = kTypeDeletion; - ParsedInternalKey ikey; - FindParseableKey(&ikey, kReverse); - // Temporarily pin blocks that hold (merge operands / the value) ReleaseTempPinnedData(); TempPinData(); size_t num_skipped = 0; - while (iter_->Valid() && IsVisible(ikey.sequence) && - user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { + while (iter_->Valid()) { + ParsedInternalKey ikey; + if (!ParseKey(&ikey)) { + return false; + } + + if (!IsVisible(ikey.sequence) || + !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { + break; + } if (TooManyInternalKeysSkipped()) { return false; } - // We iterate too much: let's use Seek() to avoid too much key comparisons + // This user key has lots of entries. + // We're going from old to new, and it's taking too long. Let's do a Seek() + // and go from new to old. This helps when a key was overwritten many times. if (num_skipped >= max_skip_) { return FindValueForCurrentKeyUsingSeek(); } @@ -892,10 +940,13 @@ bool DBIter::FindValueForCurrentKey() { } PERF_COUNTER_ADD(internal_key_skipped_count, 1); - assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())); iter_->Prev(); ++num_skipped; - FindParseableKey(&ikey, kReverse); + } + + if (!iter_->status().ok()) { + valid_ = false; + return false; } Status s; @@ -905,7 +956,7 @@ bool DBIter::FindValueForCurrentKey() { case kTypeSingleDeletion: case kTypeRangeDeletion: valid_ = false; - return false; + return true; case kTypeMerge: current_entry_is_merged_ = true; if (last_not_merge_type == kTypeDeletion || @@ -926,7 +977,7 @@ bool DBIter::FindValueForCurrentKey() { Status::NotSupported("Blob DB does not support merge operator."); } valid_ = false; - return true; + return false; } else { assert(last_not_merge_type == kTypeValue); s = MergeHelper::TimedFullMerge( @@ -936,7 +987,7 @@ bool DBIter::FindValueForCurrentKey() { } break; case kTypeValue: - // do nothing - we've already has value in saved_value_ + // do nothing - we've already has value in pinned_value_ break; case kTypeBlobIndex: if (!allow_blob_) { @@ -945,7 +996,7 @@ bool DBIter::FindValueForCurrentKey() { "Encounter unexpected blob index. Please open DB with " "rocksdb::blob_db::BlobDB instead."); valid_ = false; - return true; + return false; } is_blob_ = true; break; @@ -953,17 +1004,19 @@ bool DBIter::FindValueForCurrentKey() { assert(false); break; } - if (s.ok()) { - valid_ = true; - } else { + if (!s.ok()) { valid_ = false; status_ = s; + return false; } + valid_ = true; return true; } // This function is used in FindValueForCurrentKey. // We use Seek() function instead of Prev() to find necessary value +// TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld(). +// Would be nice to reuse some code. bool DBIter::FindValueForCurrentKeyUsingSeek() { // FindValueForCurrentKey will enable pinning before calling // FindValueForCurrentKeyUsingSeek() @@ -974,26 +1027,38 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { iter_->Seek(last_key); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); - // assume there is at least one parseable key for this user key - ParsedInternalKey ikey; - FindParseableKey(&ikey, kForward); - assert(iter_->Valid()); - assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())); - // In case read_callback presents, the value we seek to may not be visible. - // Seek for the next value that's visible. - while (!IsVisible(ikey.sequence)) { + // Find the next value that's visible. + ParsedInternalKey ikey; + while (true) { + if (!iter_->Valid()) { + valid_ = false; + return iter_->status().ok(); + } + + if (!ParseKey(&ikey)) { + return false; + } + if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { + // No visible values for this key, even though FindValueForCurrentKey() + // has seen some. This is possible if we're using a tailing iterator, and + // the entries were discarded in a compaction. + valid_ = false; + return true; + } + + if (IsVisible(ikey.sequence)) { + break; + } + iter_->Next(); - FindParseableKey(&ikey, kForward); - assert(iter_->Valid()); - assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())); } if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || range_del_agg_.ShouldDelete( ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { valid_ = false; - return false; + return true; } if (ikey.type == kTypeBlobIndex && !allow_blob_) { ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); @@ -1001,7 +1066,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { "Encounter unexpected blob index. Please open DB with " "rocksdb::blob_db::BlobDB instead."); valid_ = false; - return true; + return false; } if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { assert(iter_->IsValuePinned()); @@ -1012,115 +1077,147 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { // kTypeMerge. We need to collect all kTypeMerge values and save them // in operands + assert(ikey.type == kTypeMerge); current_entry_is_merged_ = true; merge_context_.Clear(); - while ( - iter_->Valid() && - user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) && - ikey.type == kTypeMerge && - !range_del_agg_.ShouldDelete( - ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { - merge_context_.PushOperand(iter_->value(), - iter_->IsValuePinned() /* operand_pinned */); - PERF_COUNTER_ADD(internal_merge_count, 1); + merge_context_.PushOperand(iter_->value(), + iter_->IsValuePinned() /* operand_pinned */); + while (true) { iter_->Next(); - FindParseableKey(&ikey, kForward); - } - Status s; - if (!iter_->Valid() || - !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) || - ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || - range_del_agg_.ShouldDelete( - ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { - s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(), - nullptr, merge_context_.GetOperands(), - &saved_value_, logger_, statistics_, env_, - &pinned_value_, true); - // Make iter_ valid and point to saved_key_ - if (!iter_->Valid() || - !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { - iter_->Seek(last_key); - RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); + if (!iter_->Valid()) { + if (!iter_->status().ok()) { + valid_ = false; + return false; + } + break; } - if (s.ok()) { + if (!ParseKey(&ikey)) { + return false; + } + if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { + break; + } + + if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || + range_del_agg_.ShouldDelete( + ikey, + RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { + break; + } else if (ikey.type == kTypeValue) { + const Slice val = iter_->value(); + Status s = MergeHelper::TimedFullMerge( + merge_operator_, saved_key_.GetUserKey(), &val, + merge_context_.GetOperands(), &saved_value_, logger_, statistics_, + env_, &pinned_value_, true); + if (!s.ok()) { + valid_ = false; + status_ = s; + return false; + } valid_ = true; - } else { + return true; + } else if (ikey.type == kTypeMerge) { + merge_context_.PushOperand(iter_->value(), + iter_->IsValuePinned() /* operand_pinned */); + PERF_COUNTER_ADD(internal_merge_count, 1); + } else if (ikey.type == kTypeBlobIndex) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else { + status_ = + Status::NotSupported("Blob DB does not support merge operator."); + } valid_ = false; - status_ = s; + return false; + } else { + assert(false); } - return true; } - const Slice& val = iter_->value(); - s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(), - &val, merge_context_.GetOperands(), - &saved_value_, logger_, statistics_, env_, - &pinned_value_, true); - if (s.ok()) { - valid_ = true; - } else { + Status s = MergeHelper::TimedFullMerge( + merge_operator_, saved_key_.GetUserKey(), nullptr, + merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_, + &pinned_value_, true); + if (!s.ok()) { valid_ = false; status_ = s; + return false; } + + // Make sure we leave iter_ in a good state. If it's valid and we don't care + // about prefixes, that's already good enough. Otherwise it needs to be + // seeked to the current key. + if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) { + if (prefix_extractor_ != nullptr && !total_order_seek_) { + iter_->SeekForPrev(last_key); + } else { + iter_->Seek(last_key); + if (!iter_->Valid() && iter_->status().ok()) { + iter_->SeekToLast(); + } + } + RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); + } + + valid_ = true; return true; } -// Used in Next to change directions -// Go to next user key -// Don't use Seek(), -// because next user key will be very close -void DBIter::FindNextUserKey() { - if (!iter_->Valid()) { - return; - } - ParsedInternalKey ikey; - FindParseableKey(&ikey, kForward); - while (iter_->Valid() && - !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { - iter_->Next(); - FindParseableKey(&ikey, kForward); - } -} - -// Go to previous user_key -void DBIter::FindPrevUserKey() { - if (!iter_->Valid()) { - return; - } +// Move backwards until the key smaller than saved_key_. +// Changes valid_ only if return value is false. +bool DBIter::FindUserKeyBeforeSavedKey() { + assert(status_.ok()); size_t num_skipped = 0; - ParsedInternalKey ikey; - FindParseableKey(&ikey, kReverse); - int cmp; - while (iter_->Valid() && - ((cmp = user_comparator_->Compare(ikey.user_key, - saved_key_.GetUserKey())) == 0 || - (cmp > 0 && !IsVisible(ikey.sequence)))) { - if (TooManyInternalKeysSkipped()) { - return; + while (iter_->Valid()) { + ParsedInternalKey ikey; + if (!ParseKey(&ikey)) { + return false; } - if (cmp == 0) { - if (num_skipped >= max_skip_) { - num_skipped = 0; - IterKey last_key; - last_key.SetInternalKey(ParsedInternalKey( - saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); - iter_->Seek(last_key.GetInternalKey()); - RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); - } else { - ++num_skipped; - } + if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) { + return true; } + + if (TooManyInternalKeysSkipped()) { + return false; + } + assert(ikey.sequence != kMaxSequenceNumber); if (!IsVisible(ikey.sequence)) { PERF_COUNTER_ADD(internal_recent_skipped_count, 1); } else { PERF_COUNTER_ADD(internal_key_skipped_count, 1); } + + if (num_skipped >= max_skip_) { + num_skipped = 0; + IterKey last_key; + last_key.SetInternalKey(ParsedInternalKey( + saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); + // It would be more efficient to use SeekForPrev() here, but some + // iterators may not support it. + iter_->Seek(last_key.GetInternalKey()); + RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); + if (!iter_->Valid()) { + break; + } + } else { + ++num_skipped; + } + iter_->Prev(); - FindParseableKey(&ikey, kReverse); } + + if (!iter_->status().ok()) { + valid_ = false; + return false; + } + + return true; } bool DBIter::TooManyInternalKeysSkipped(bool increment) { @@ -1140,19 +1237,9 @@ bool DBIter::IsVisible(SequenceNumber sequence) { (read_callback_ == nullptr || read_callback_->IsCommitted(sequence)); } -// Skip all unparseable keys -void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) { - while (iter_->Valid() && !ParseKey(ikey)) { - if (direction == kReverse) { - iter_->Prev(); - } else { - iter_->Next(); - } - } -} - void DBIter::Seek(const Slice& target) { StopWatch sw(env_, statistics_, DB_SEEK); + status_ = Status::OK(); ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); saved_key_.Clear(); @@ -1201,6 +1288,7 @@ void DBIter::Seek(const Slice& target) { void DBIter::SeekForPrev(const Slice& target) { StopWatch sw(env_, statistics_, DB_SEEK); + status_ = Status::OK(); ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); saved_key_.Clear(); @@ -1249,15 +1337,16 @@ void DBIter::SeekForPrev(const Slice& target) { } void DBIter::SeekToFirst() { - // Don't use iter_::Seek() if we set a prefix extractor - // because prefix seek will be used. - if (prefix_extractor_ != nullptr) { - max_skip_ = std::numeric_limits::max(); - } if (iterate_lower_bound_ != nullptr) { Seek(*iterate_lower_bound_); return; } + // Don't use iter_::Seek() if we set a prefix extractor + // because prefix seek will be used. + if (prefix_extractor_ != nullptr && !total_order_seek_) { + max_skip_ = std::numeric_limits::max(); + } + status_ = Status::OK(); direction_ = kForward; ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); @@ -1293,11 +1382,22 @@ void DBIter::SeekToFirst() { } void DBIter::SeekToLast() { + if (iterate_upper_bound_ != nullptr) { + // Seek to last key strictly less than ReadOptions.iterate_upper_bound. + SeekForPrev(*iterate_upper_bound_); + if (Valid() && user_comparator_->Equal(*iterate_upper_bound_, key())) { + ReleaseTempPinnedData(); + PrevInternal(); + } + return; + } + // Don't use iter_::Seek() if we set a prefix extractor // because prefix seek will be used. - if (prefix_extractor_ != nullptr) { + if (prefix_extractor_ != nullptr && !total_order_seek_) { max_skip_ = std::numeric_limits::max(); } + status_ = Status::OK(); direction_ = kReverse; ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); @@ -1308,21 +1408,7 @@ void DBIter::SeekToLast() { iter_->SeekToLast(); range_del_agg_.InvalidateTombstoneMapPositions(); } - // When the iterate_upper_bound is set to a value, - // it will seek to the last key before the - // ReadOptions.iterate_upper_bound - if (iter_->Valid() && iterate_upper_bound_ != nullptr) { - SeekForPrev(*iterate_upper_bound_); - range_del_agg_.InvalidateTombstoneMapPositions(); - if (!Valid()) { - return; - } else if (user_comparator_->Equal(*iterate_upper_bound_, key())) { - ReleaseTempPinnedData(); - PrevInternal(); - } - } else { - PrevInternal(); - } + PrevInternal(); if (statistics_ != nullptr) { RecordTick(statistics_, NUMBER_DB_SEEK); if (valid_) { diff --git a/db/db_iter_stress_test.cc b/db/db_iter_stress_test.cc new file mode 100644 index 000000000..f964bbad1 --- /dev/null +++ b/db/db_iter_stress_test.cc @@ -0,0 +1,652 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_iter.h" +#include "db/dbformat.h" +#include "rocksdb/comparator.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/testharness.h" +#include "utilities/merge_operators.h" + +#ifdef GFLAGS + +#include "util/gflags_compat.h" + +using GFLAGS_NAMESPACE::ParseCommandLineFlags; + +DEFINE_bool(verbose, false, + "Print huge, detailed trace. Intended for debugging failures."); + +#else + +void ParseCommandLineFlags(int*, char***, bool) {} +bool FLAGS_verbose = false; + +#endif + +namespace rocksdb { + +class DBIteratorStressTest : public testing::Test { + public: + Env* env_; + + DBIteratorStressTest() : env_(Env::Default()) {} +}; + +namespace { + +struct Entry { + std::string key; + ValueType type; // kTypeValue, kTypeDeletion, kTypeMerge + uint64_t sequence; + std::string ikey; // internal key, made from `key`, `sequence` and `type` + std::string value; + // If false, we'll pretend that this entry doesn't exist. + bool visible = true; + + bool operator<(const Entry& e) const { + if (key != e.key) return key < e.key; + return std::tie(sequence, type) > std::tie(e.sequence, e.type); + } +}; + +struct Data { + std::vector entries; + + // Indices in `entries` with `visible` = false. + std::vector hidden; + // Keys of entries whose `visible` changed since the last seek of iterators. + std::set recently_touched_keys; +}; + +struct StressTestIterator : public InternalIterator { + Data* data; + Random64* rnd; + InternalKeyComparator cmp; + + // Each operation will return error with this probability... + double error_probability = 0; + // ... and add/remove entries with this probability. + double mutation_probability = 0; + // The probability of adding vs removing entries will be chosen so that the + // amount of removed entries stays somewhat close to this number. + double target_hidden_fraction = 0; + // If true, print all mutations to stdout for debugging. + bool trace = false; + + int iter = -1; + Status status_; + + StressTestIterator(Data* _data, Random64* _rnd, const Comparator* _cmp) + : data(_data), rnd(_rnd), cmp(_cmp) {} + + bool Valid() const override { + if (iter >= 0 && iter < (int)data->entries.size()) { + assert(status_.ok()); + return true; + } + return false; + } + + Status status() const override { return status_; } + + bool MaybeFail() { + if (rnd->Next() >= + std::numeric_limits::max() * error_probability) { + return false; + } + if (rnd->Next() % 2) { + status_ = Status::Incomplete("test"); + } else { + status_ = Status::IOError("test"); + } + if (trace) { + std::cout << "injecting " << status_.ToString() << std::endl; + } + iter = -1; + return true; + } + + void MaybeMutate() { + if (rnd->Next() >= + std::numeric_limits::max() * mutation_probability) { + return; + } + do { + // If too many entries are hidden, hide less, otherwise hide more. + double hide_probability = + data->hidden.size() > data->entries.size() * target_hidden_fraction + ? 1. / 3 + : 2. / 3; + if (data->hidden.empty()) { + hide_probability = 1; + } + bool do_hide = + rnd->Next() < std::numeric_limits::max() * hide_probability; + if (do_hide) { + // Hide a random entry. + size_t idx = rnd->Next() % data->entries.size(); + Entry& e = data->entries[idx]; + if (e.visible) { + if (trace) { + std::cout << "hiding idx " << idx << std::endl; + } + e.visible = false; + data->hidden.push_back(idx); + data->recently_touched_keys.insert(e.key); + } else { + // Already hidden. Let's go unhide something instead, just because + // it's easy and it doesn't really matter what we do. + do_hide = false; + } + } + if (!do_hide) { + // Unhide a random entry. + size_t hi = rnd->Next() % data->hidden.size(); + size_t idx = data->hidden[hi]; + if (trace) { + std::cout << "unhiding idx " << idx << std::endl; + } + Entry& e = data->entries[idx]; + assert(!e.visible); + e.visible = true; + data->hidden[hi] = data->hidden.back(); + data->hidden.pop_back(); + data->recently_touched_keys.insert(e.key); + } + } while (rnd->Next() % 3 != 0); // do 3 mutations on average + } + + void SkipForward() { + while (iter < (int)data->entries.size() && !data->entries[iter].visible) { + ++iter; + } + } + void SkipBackward() { + while (iter >= 0 && !data->entries[iter].visible) { + --iter; + } + } + + void SeekToFirst() { + if (MaybeFail()) return; + MaybeMutate(); + + status_ = Status::OK(); + iter = 0; + SkipForward(); + } + void SeekToLast() { + if (MaybeFail()) return; + MaybeMutate(); + + status_ = Status::OK(); + iter = (int)data->entries.size() - 1; + SkipBackward(); + } + + void Seek(const Slice& target) { + if (MaybeFail()) return; + MaybeMutate(); + + status_ = Status::OK(); + // Binary search. + auto it = std::partition_point( + data->entries.begin(), data->entries.end(), + [&](const Entry& e) { return cmp.Compare(e.ikey, target) < 0; }); + iter = (int)(it - data->entries.begin()); + SkipForward(); + } + void SeekForPrev(const Slice& target) { + if (MaybeFail()) return; + MaybeMutate(); + + status_ = Status::OK(); + // Binary search. + auto it = std::partition_point( + data->entries.begin(), data->entries.end(), + [&](const Entry& e) { return cmp.Compare(e.ikey, target) <= 0; }); + iter = (int)(it - data->entries.begin()); + --iter; + SkipBackward(); + } + + void Next() { + assert(Valid()); + if (MaybeFail()) return; + MaybeMutate(); + ++iter; + SkipForward(); + } + void Prev() { + assert(Valid()); + if (MaybeFail()) return; + MaybeMutate(); + --iter; + SkipBackward(); + } + + Slice key() const { + assert(Valid()); + return data->entries[iter].ikey; + } + Slice value() const { + assert(Valid()); + return data->entries[iter].value; + } + + bool IsKeyPinned() const override { return true; } + bool IsValuePinned() const override { return true; } +}; + +// A small reimplementation of DBIter, supporting only some of the features, +// and doing everything in O(log n). +// Skips all keys that are in recently_touched_keys. +struct ReferenceIterator { + Data* data; + uint64_t sequence; // ignore entries with sequence number below this + + bool valid = false; + std::string key; + std::string value; + + ReferenceIterator(Data* _data, uint64_t _sequence) + : data(_data), sequence(_sequence) {} + + bool Valid() const { return valid; } + + // Finds the first entry with key + // greater/less/greater-or-equal/less-or-equal than `key`, depending on + // arguments: if `skip`, inequality is strict; if `forward`, it's + // greater/greater-or-equal, otherwise less/less-or-equal. + // Sets `key` to the result. + // If no such key exists, returns false. Doesn't check `visible`. + bool FindNextKey(bool skip, bool forward) { + valid = false; + auto it = std::partition_point(data->entries.begin(), data->entries.end(), + [&](const Entry& e) { + if (forward != skip) { + return e.key < key; + } else { + return e.key <= key; + } + }); + if (forward) { + if (it != data->entries.end()) { + key = it->key; + return true; + } + } else { + if (it != data->entries.begin()) { + --it; + key = it->key; + return true; + } + } + return false; + } + + bool FindValueForCurrentKey() { + if (data->recently_touched_keys.count(key)) { + return false; + } + + // Find the first entry for the key. The caller promises that it exists. + auto it = std::partition_point(data->entries.begin(), data->entries.end(), + [&](const Entry& e) { + if (e.key != key) { + return e.key < key; + } + return e.sequence > sequence; + }); + + // Find the first visible entry. + for (;; ++it) { + if (it == data->entries.end()) { + return false; + } + Entry& e = *it; + if (e.key != key) { + return false; + } + assert(e.sequence <= sequence); + if (!e.visible) continue; + if (e.type == kTypeDeletion) { + return false; + } + if (e.type == kTypeValue) { + value = e.value; + valid = true; + return true; + } + assert(e.type == kTypeMerge); + break; + } + + // Collect merge operands. + std::vector operands; + for (; it != data->entries.end(); ++it) { + Entry& e = *it; + if (e.key != key) { + break; + } + assert(e.sequence <= sequence); + if (!e.visible) continue; + if (e.type == kTypeDeletion) { + break; + } + operands.push_back(e.value); + if (e.type == kTypeValue) { + break; + } + } + + // Do a merge. + value = operands.back().ToString(); + for (int i = (int)operands.size() - 2; i >= 0; --i) { + value.append(","); + value.append(operands[i].data(), operands[i].size()); + } + + valid = true; + return true; + } + + // Start at `key` and move until we encounter a valid value. + // `forward` defines the direction of movement. + // If `skip` is true, we're looking for key not equal to `key`. + void DoTheThing(bool skip, bool forward) { + while (FindNextKey(skip, forward) && !FindValueForCurrentKey()) { + skip = true; + } + } + + void Seek(const Slice& target) { + key = target.ToString(); + DoTheThing(false, true); + } + void SeekForPrev(const Slice& target) { + key = target.ToString(); + DoTheThing(false, false); + } + void SeekToFirst() { Seek(""); } + void SeekToLast() { + key = data->entries.back().key; + DoTheThing(false, false); + } + void Next() { + assert(Valid()); + DoTheThing(true, true); + } + void Prev() { + assert(Valid()); + DoTheThing(true, false); + } +}; + +} // namespace + +// Use an internal iterator that sometimes returns errors and sometimes +// adds/removes entries on the fly. Do random operations on a DBIter and +// check results. +// TODO: can be improved for more coverage: +// * Override IsKeyPinned() and IsValuePinned() to actually use +// PinnedIteratorManager and check that there's no use-after free. +// * Try different combinations of prefix_extractor, total_order_seek, +// prefix_same_as_start, iterate_lower_bound, iterate_upper_bound. +TEST_F(DBIteratorStressTest, StressTest) { + // We use a deterministic RNG, and everything happens in a single thread. + Random64 rnd(826909345792864532ll); + + auto gen_key = [&](int max_key) { + int len = 0; + int a = max_key; + while (a) { + a /= 10; + ++len; + } + std::string s = ToString(rnd.Next() % (uint64_t)max_key); + s.insert(0, len - (int)s.size(), '0'); + return s; + }; + + Options options; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + ReadOptions ropt; + + size_t num_matching = 0; + size_t num_at_end = 0; + size_t num_not_ok = 0; + size_t num_recently_removed = 0; + + // Number of iterations for each combination of parameters + // (there are ~250 of those). + // Tweak this to change the test run time. + // As of the time of writing, the test takes ~4 seconds for value of 5000. + const int num_iterations = 5000; + // Enable this to print all the operations for debugging. + bool trace = FLAGS_verbose; + + for (int num_entries : {5, 10, 100}) { + for (double key_space : {0.1, 1.0, 3.0}) { + for (ValueType prevalent_entry_type : + {kTypeValue, kTypeDeletion, kTypeMerge}) { + for (double error_probability : {0.01, 0.1}) { + for (double mutation_probability : {0.01, 0.5}) { + for (double target_hidden_fraction : {0.1, 0.5}) { + std::string trace_str = + "entries: " + ToString(num_entries) + + ", key_space: " + ToString(key_space) + + ", error_probability: " + ToString(error_probability) + + ", mutation_probability: " + ToString(mutation_probability) + + ", target_hidden_fraction: " + + ToString(target_hidden_fraction); + SCOPED_TRACE(trace_str); + if (trace) { + std::cout << trace_str << std::endl; + } + + // Generate data. + Data data; + int max_key = (int)(num_entries * key_space) + 1; + for (int i = 0; i < num_entries; ++i) { + Entry e; + e.key = gen_key(max_key); + if (rnd.Next() % 10 != 0) { + e.type = prevalent_entry_type; + } else { + const ValueType types[] = {kTypeValue, kTypeDeletion, + kTypeMerge}; + e.type = + types[rnd.Next() % (sizeof(types) / sizeof(types[0]))]; + } + e.sequence = i; + e.value = "v" + ToString(i); + ParsedInternalKey internal_key(e.key, e.sequence, e.type); + AppendInternalKey(&e.ikey, internal_key); + + data.entries.push_back(e); + } + std::sort(data.entries.begin(), data.entries.end()); + if (trace) { + std::cout << "entries:"; + for (size_t i = 0; i < data.entries.size(); ++i) { + Entry& e = data.entries[i]; + std::cout + << "\n idx " << i << ": \"" << e.key << "\": \"" + << e.value << "\" seq: " << e.sequence << " type: " + << (e.type == kTypeValue + ? "val" + : e.type == kTypeDeletion ? "del" : "merge"); + } + std::cout << std::endl; + } + + std::unique_ptr db_iter; + std::unique_ptr ref_iter; + for (int iteration = 0; iteration < num_iterations; ++iteration) { + SCOPED_TRACE(iteration); + // Create a new iterator every ~30 operations. + if (db_iter == nullptr || rnd.Next() % 30 == 0) { + uint64_t sequence = rnd.Next() % (data.entries.size() + 2); + ref_iter.reset(new ReferenceIterator(&data, sequence)); + if (trace) { + std::cout << "new iterator, seq: " << sequence << std::endl; + } + + auto internal_iter = + new StressTestIterator(&data, &rnd, BytewiseComparator()); + internal_iter->error_probability = error_probability; + internal_iter->mutation_probability = mutation_probability; + internal_iter->target_hidden_fraction = + target_hidden_fraction; + internal_iter->trace = trace; + db_iter.reset(NewDBIterator( + env_, ropt, ImmutableCFOptions(options), + BytewiseComparator(), internal_iter, sequence, + options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); + } + + // Do a random operation. It's important to do it on ref_it + // later than on db_iter to make sure ref_it sees the correct + // recently_touched_keys. + std::string old_key; + bool forward = rnd.Next() % 2 > 0; + // Do Next()/Prev() ~90% of the time. + bool seek = !ref_iter->Valid() || rnd.Next() % 10 == 0; + if (trace) { + std::cout << iteration << ": "; + } + + if (!seek) { + assert(db_iter->Valid()); + old_key = ref_iter->key; + if (trace) { + std::cout << (forward ? "Next" : "Prev") << std::endl; + } + + if (forward) { + db_iter->Next(); + ref_iter->Next(); + } else { + db_iter->Prev(); + ref_iter->Prev(); + } + } else { + data.recently_touched_keys.clear(); + // Do SeekToFirst less often than Seek. + if (rnd.Next() % 4 == 0) { + if (trace) { + std::cout << (forward ? "SeekToFirst" : "SeekToLast") + << std::endl; + } + + if (forward) { + old_key = ""; + db_iter->SeekToFirst(); + ref_iter->SeekToFirst(); + } else { + old_key = data.entries.back().key; + db_iter->SeekToLast(); + ref_iter->SeekToLast(); + } + } else { + old_key = gen_key(max_key); + if (trace) { + std::cout << (forward ? "Seek" : "SeekForPrev") << " \"" + << old_key << '"' << std::endl; + } + if (forward) { + db_iter->Seek(old_key); + ref_iter->Seek(old_key); + } else { + db_iter->SeekForPrev(old_key); + ref_iter->SeekForPrev(old_key); + } + } + } + + // Check the result. + if (db_iter->Valid()) { + ASSERT_TRUE(db_iter->status().ok()); + if (data.recently_touched_keys.count( + db_iter->key().ToString())) { + // Ended on a key that may have been mutated during the + // operation. Reference iterator skips such keys, so we + // can't check the exact result. + + // Check that the key moved in the right direction. + if (forward) { + if (seek) + ASSERT_GE(db_iter->key().ToString(), old_key); + else + ASSERT_GT(db_iter->key().ToString(), old_key); + } else { + if (seek) + ASSERT_LE(db_iter->key().ToString(), old_key); + else + ASSERT_LT(db_iter->key().ToString(), old_key); + } + + if (ref_iter->Valid()) { + // Check that DBIter didn't miss any non-mutated key. + if (forward) { + ASSERT_LT(db_iter->key().ToString(), ref_iter->key); + } else { + ASSERT_GT(db_iter->key().ToString(), ref_iter->key); + } + } + // Tell the next iteration of the loop to reseek the + // iterators. + ref_iter->valid = false; + + ++num_recently_removed; + } else { + ASSERT_TRUE(ref_iter->Valid()); + ASSERT_EQ(ref_iter->key, db_iter->key().ToString()); + ASSERT_EQ(ref_iter->value, db_iter->value()); + ++num_matching; + } + } else if (db_iter->status().ok()) { + ASSERT_FALSE(ref_iter->Valid()); + ++num_at_end; + } else { + // Non-ok status. Nothing to check here. + // Tell the next iteration of the loop to reseek the + // iterators. + ref_iter->valid = false; + ++num_not_ok; + } + } + } + } + } + } + } + } + + // Check that all cases were hit many times. + EXPECT_GT(num_matching, 10000); + EXPECT_GT(num_at_end, 10000); + EXPECT_GT(num_not_ok, 10000); + EXPECT_GT(num_recently_removed, 10000); + + std::cout << "stats:\n exact matches: " << num_matching + << "\n end reached: " << num_at_end + << "\n non-ok status: " << num_not_ok + << "\n mutated on the fly: " << num_recently_removed << std::endl; +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + ParseCommandLineFlags(&argc, &argv, true); + return RUN_ALL_TESTS(); +} diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 0143482bf..4af678b4a 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -84,6 +84,36 @@ class TestIterator : public InternalIterator { }); } + // Removes the key from the set of keys over which this iterator iterates. + // Not to be confused with AddDeletion(). + // If the iterator is currently positioned on this key, the deletion will + // apply next time the iterator moves. + // Used for simulating ForwardIterator updating to a new version that doesn't + // have some of the keys (e.g. after compaction with a filter). + void Vanish(std::string _key) { + if (valid_ && data_[iter_].first == _key) { + delete_current_ = true; + return; + } + for (auto it = data_.begin(); it != data_.end(); ++it) { + ParsedInternalKey ikey; + bool ok __attribute__((__unused__)) = ParseInternalKey(it->first, &ikey); + assert(ok); + if (ikey.user_key != _key) { + continue; + } + if (valid_ && data_.begin() + iter_ > it) { + --iter_; + } + data_.erase(it); + return; + } + assert(false); + } + + // Number of operations done on this iterator since construction. + size_t steps() const { return steps_; } + virtual bool Valid() const override { assert(initialized_); return valid_; @@ -91,12 +121,16 @@ class TestIterator : public InternalIterator { virtual void SeekToFirst() override { assert(initialized_); + ++steps_; + DeleteCurrentIfNeeded(); valid_ = (data_.size() > 0); iter_ = 0; } virtual void SeekToLast() override { assert(initialized_); + ++steps_; + DeleteCurrentIfNeeded(); valid_ = (data_.size() > 0); iter_ = data_.size() - 1; } @@ -104,6 +138,7 @@ class TestIterator : public InternalIterator { virtual void Seek(const Slice& target) override { assert(initialized_); SeekToFirst(); + ++steps_; if (!valid_) { return; } @@ -119,20 +154,31 @@ class TestIterator : public InternalIterator { virtual void SeekForPrev(const Slice& target) override { assert(initialized_); + DeleteCurrentIfNeeded(); SeekForPrevImpl(target, &cmp); } virtual void Next() override { assert(initialized_); - if (data_.empty() || (iter_ == data_.size() - 1)) { - valid_ = false; + assert(valid_); + assert(iter_ < data_.size()); + + ++steps_; + if (delete_current_) { + DeleteCurrentIfNeeded(); } else { ++iter_; } + valid_ = iter_ < data_.size(); } virtual void Prev() override { assert(initialized_); + assert(valid_); + assert(iter_ < data_.size()); + + ++steps_; + DeleteCurrentIfNeeded(); if (iter_ == 0) { valid_ = false; } else { @@ -163,9 +209,19 @@ class TestIterator : public InternalIterator { bool valid_; size_t sequence_number_; size_t iter_; + size_t steps_ = 0; InternalKeyComparator cmp; std::vector> data_; + bool delete_current_ = false; + + void DeleteCurrentIfNeeded() { + if (!delete_current_) { + return; + } + data_.erase(data_.begin() + iter_); + delete_current_ = false; + } }; class DBIteratorTest : public testing::Test { @@ -3018,6 +3074,41 @@ TEST_F(DBIteratorTest, SeekLessLowerBound) { ASSERT_EQ(lower_bound_str, db_iter->key().ToString()); } +TEST_F(DBIteratorTest, ReverseToForwardWithDisappearingKeys) { + Options options; + options.prefix_extractor.reset(NewCappedPrefixTransform(0)); + + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "A"); + internal_iter->AddPut("b", "B"); + for (int i = 0; i < 100; ++i) { + internal_iter->AddPut("c" + ToString(i), ""); + } + internal_iter->Finish(); + + std::unique_ptr db_iter(NewDBIterator( + env_, ReadOptions(), ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 10, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); + + db_iter->SeekForPrev("a"); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_OK(db_iter->status()); + ASSERT_EQ("a", db_iter->key().ToString()); + + internal_iter->Vanish("a"); + db_iter->Next(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_OK(db_iter->status()); + ASSERT_EQ("b", db_iter->key().ToString()); + + // A (sort of) bug used to cause DBIter to pointlessly drag the internal + // iterator all the way to the end. But this doesn't really matter at the time + // of writing because the only iterator that can see disappearing keys is + // ForwardIterator, which doesn't support SeekForPrev(). + EXPECT_LT(internal_iter->steps(), 20); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 24dbac41b..dd2d1ce49 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -2232,6 +2232,46 @@ TEST_P(DBIteratorTest, SeekAfterHittingManyInternalKeys) { ASSERT_EQ(iter2->value().ToString(), "val_6"); } +// Reproduces a former bug where iterator would skip some records when DBIter +// re-seeks subiterator with Incomplete status. +TEST_P(DBIteratorTest, NonBlockingIterationBugRepro) { + Options options = CurrentOptions(); + BlockBasedTableOptions table_options; + // Make sure the sst file has more than one block. + table_options.flush_block_policy_factory = + std::make_shared(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + // Two records in sst file, each in its own block. + Put("b", ""); + Put("d", ""); + Flush(); + + // Create a nonblocking iterator before writing to memtable. + ReadOptions ropt; + ropt.read_tier = kBlockCacheTier; + unique_ptr iter(NewIterator(ropt)); + + // Overwrite a key in memtable many times to hit + // max_sequential_skip_in_iterations (which is 8 by default). + for (int i = 0; i < 20; ++i) { + Put("c", ""); + } + + // Load the second block in sst file into the block cache. + { + unique_ptr iter2(NewIterator(ReadOptions())); + iter2->Seek("d"); + } + + // Finally seek the nonblocking iterator. + iter->Seek("a"); + // With the bug, the status used to be OK, and the iterator used to point to + // "d". + EXPECT_TRUE(iter->status().IsIncomplete()); +} + INSTANTIATE_TEST_CASE_P(DBIteratorTestInstance, DBIteratorTest, testing::Values(true, false)); diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 471d7c0b7..1401bb0ab 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -27,7 +27,7 @@ namespace rocksdb { // Usage: // ForwardLevelIterator iter; // iter.SetFileIndex(file_index); -// iter.Seek(target); +// iter.Seek(target); // or iter.SeekToFirst(); // iter.Next() class ForwardLevelIterator : public InternalIterator { public: @@ -53,11 +53,11 @@ class ForwardLevelIterator : public InternalIterator { void SetFileIndex(uint32_t file_index) { assert(file_index < files_.size()); + status_ = Status::OK(); if (file_index != file_index_) { file_index_ = file_index; Reset(); } - valid_ = false; } void Reset() { assert(file_index_ < files_.size()); @@ -77,10 +77,10 @@ class ForwardLevelIterator : public InternalIterator { read_options_.ignore_range_deletions ? nullptr : &range_del_agg, nullptr /* table_reader_ptr */, nullptr, false); file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); + valid_ = false; if (!range_del_agg.IsEmpty()) { status_ = Status::NotSupported( "Range tombstones unsupported with ForwardIterator"); - valid_ = false; } } void SeekToLast() override { @@ -95,12 +95,27 @@ class ForwardLevelIterator : public InternalIterator { return valid_; } void SeekToFirst() override { - SetFileIndex(0); + assert(file_iter_ != nullptr); + if (!status_.ok()) { + assert(!valid_); + return; + } file_iter_->SeekToFirst(); valid_ = file_iter_->Valid(); } void Seek(const Slice& internal_key) override { assert(file_iter_ != nullptr); + + // This deviates from the usual convention for InternalIterator::Seek() in + // that it doesn't discard pre-existing error status. That's because this + // Seek() is only supposed to be called immediately after SetFileIndex() + // (which discards pre-existing error status), and SetFileIndex() may set + // an error status, which we shouldn't discard. + if (!status_.ok()) { + assert(!valid_); + return; + } + file_iter_->Seek(internal_key); valid_ = file_iter_->Valid(); } @@ -112,8 +127,12 @@ class ForwardLevelIterator : public InternalIterator { assert(valid_); file_iter_->Next(); for (;;) { - if (file_iter_->status().IsIncomplete() || file_iter_->Valid()) { - valid_ = !file_iter_->status().IsIncomplete(); + valid_ = file_iter_->Valid(); + if (!file_iter_->status().ok()) { + assert(!valid_); + return; + } + if (valid_) { return; } if (file_index_ + 1 >= files_.size()) { @@ -121,6 +140,10 @@ class ForwardLevelIterator : public InternalIterator { return; } SetFileIndex(file_index_ + 1); + if (!status_.ok()) { + assert(!valid_); + return; + } file_iter_->SeekToFirst(); } } @@ -135,7 +158,7 @@ class ForwardLevelIterator : public InternalIterator { Status status() const override { if (!status_.ok()) { return status_; - } else if (file_iter_ && !file_iter_->status().ok()) { + } else if (file_iter_) { return file_iter_->status(); } return Status::OK(); @@ -299,9 +322,6 @@ bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const { } void ForwardIterator::Seek(const Slice& internal_key) { - if (IsOverUpperBound(internal_key)) { - valid_ = false; - } if (sv_ == nullptr) { RebuildIterators(true); } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) { @@ -605,7 +625,9 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { if ((read_options_.iterate_upper_bound != nullptr) && cfd_->internal_comparator().user_comparator()->Compare( l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) { - has_iter_trimmed_for_upper_bound_ = true; + // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator + // will never be interested in files with smallest key above + // iterate_upper_bound, since iterate_upper_bound can't be changed. l0_iters_.push_back(nullptr); continue; } @@ -773,7 +795,7 @@ void ForwardIterator::UpdateCurrent() { current_ = mutable_iter_; } } - valid_ = (current_ != nullptr); + valid_ = current_ != nullptr && immutable_status_.ok(); if (!status_.ok()) { status_ = Status::OK(); } diff --git a/db/managed_iterator.cc b/db/managed_iterator.cc index c393eb5a6..9831eb80a 100644 --- a/db/managed_iterator.cc +++ b/db/managed_iterator.cc @@ -101,9 +101,7 @@ void ManagedIterator::SeekToLast() { } assert(mutable_iter_ != nullptr); mutable_iter_->SeekToLast(); - if (mutable_iter_->status().ok()) { - UpdateCurrent(); - } + UpdateCurrent(); } void ManagedIterator::SeekToFirst() { @@ -146,27 +144,13 @@ void ManagedIterator::Prev() { } MILock l(&in_use_, this); if (NeedToRebuild()) { - std::string current_key = key().ToString(); - Slice old_key(current_key); - RebuildIterator(); - SeekInternal(old_key, false); - UpdateCurrent(); + RebuildIterator(true); if (!valid_) { return; } - if (key().compare(old_key) != 0) { - valid_ = false; - status_ = Status::Incomplete("Cannot do Prev now"); - return; - } } mutable_iter_->Prev(); - if (mutable_iter_->status().ok()) { - UpdateCurrent(); - status_ = Status::OK(); - } else { - status_ = mutable_iter_->status(); - } + UpdateCurrent(); } void ManagedIterator::Next() { @@ -176,19 +160,10 @@ void ManagedIterator::Next() { } MILock l(&in_use_, this); if (NeedToRebuild()) { - std::string current_key = key().ToString(); - Slice old_key(current_key.data(), cached_key_.Size()); - RebuildIterator(); - SeekInternal(old_key, false); - UpdateCurrent(); + RebuildIterator(true); if (!valid_) { return; } - if (key().compare(old_key) != 0) { - valid_ = false; - status_ = Status::Incomplete("Cannot do Next now"); - return; - } } mutable_iter_->Next(); UpdateCurrent(); @@ -206,21 +181,38 @@ Slice ManagedIterator::value() const { Status ManagedIterator::status() const { return status_; } -void ManagedIterator::RebuildIterator() { +void ManagedIterator::RebuildIterator(bool reseek) { + std::string current_key; + if (reseek) { + current_key = key().ToString(); + } + svnum_ = cfd_->GetSuperVersionNumber(); mutable_iter_ = unique_ptr(db_->NewIterator(read_options_, &cfh_)); + + if (reseek) { + Slice old_key(current_key.data(), current_key.size()); + SeekInternal(old_key, false); + UpdateCurrent(); + if (!valid_ || key().compare(old_key) != 0) { + valid_ = false; + status_ = Status::Incomplete( + "Next/Prev failed because current key has " + "been removed"); + } + } } void ManagedIterator::UpdateCurrent() { assert(mutable_iter_ != nullptr); valid_ = mutable_iter_->Valid(); + status_ = mutable_iter_->status(); + if (!valid_) { - status_ = mutable_iter_->status(); return; } - status_ = Status::OK(); cached_key_.SetUserKey(mutable_iter_->key()); cached_value_.SetUserKey(mutable_iter_->value()); } diff --git a/db/managed_iterator.h b/db/managed_iterator.h index 8e962f781..bd125fa32 100644 --- a/db/managed_iterator.h +++ b/db/managed_iterator.h @@ -54,7 +54,7 @@ class ManagedIterator : public Iterator { } private: - void RebuildIterator(); + void RebuildIterator(bool reseek = false); void UpdateCurrent(); void SeekInternal(const Slice& user_key, bool seek_to_first); bool NeedToRebuild(); diff --git a/db/version_set.cc b/db/version_set.cc index 7cceb5860..d1a63bce6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -502,13 +502,7 @@ class LevelIterator final : public InternalIterator { return file_iter_.value(); } virtual Status status() const override { - // It'd be nice if status() returned a const Status& instead of a Status - if (!status_.ok()) { - return status_; - } else if (file_iter_.iter() != nullptr) { - return file_iter_.status(); - } - return Status::OK(); + return file_iter_.iter() ? file_iter_.status() : Status::OK(); } virtual void SetPinnedItersMgr( PinnedIteratorsManager* pinned_iters_mgr) override { @@ -573,7 +567,6 @@ class LevelIterator final : public InternalIterator { RangeDelAggregator* range_del_agg_; IteratorWrapper file_iter_; // May be nullptr PinnedIteratorsManager* pinned_iters_mgr_; - Status status_; }; void LevelIterator::Seek(const Slice& target) { @@ -628,16 +621,9 @@ void LevelIterator::Prev() { } void LevelIterator::SkipEmptyFileForward() { - // For an error (IO error, checksum mismatch, etc), we skip the file - // and move to the next one and continue reading data. - // TODO this behavior is from LevelDB. We should revisit it. while (file_iter_.iter() == nullptr || - (!file_iter_.Valid() && !file_iter_.status().IsIncomplete())) { - if (file_iter_.iter() != nullptr && !file_iter_.Valid() && - file_iter_.iter()->IsOutOfBound()) { - return; - } - + (!file_iter_.Valid() && file_iter_.status().ok() && + !file_iter_.iter()->IsOutOfBound())) { // Move to next file if (file_index_ >= flevel_->num_files - 1) { // Already at the last file @@ -657,7 +643,7 @@ void LevelIterator::SkipEmptyFileForward() { void LevelIterator::SkipEmptyFileBackward() { while (file_iter_.iter() == nullptr || - (!file_iter_.Valid() && !file_iter_.status().IsIncomplete())) { + (!file_iter_.Valid() && file_iter_.status().ok())) { // Move to previous file if (file_index_ == 0) { // Already the first file @@ -672,13 +658,6 @@ void LevelIterator::SkipEmptyFileBackward() { } void LevelIterator::SetFileIterator(InternalIterator* iter) { - if (file_iter_.iter() != nullptr && status_.ok()) { - // TODO right now we don't invalidate the iterator even if the status is - // not OK. We should consider to do that so that it is harder for users to - // skip errors. - status_ = file_iter_.status(); - } - if (pinned_iters_mgr_ && iter) { iter->SetPinnedItersMgr(pinned_iters_mgr_); } diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 4be77e962..4475eb396 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -33,6 +33,7 @@ class Iterator : public Cleanable { // An iterator is either positioned at a key/value pair, or // not valid. This method returns true iff the iterator is valid. + // Always returns false if !status().ok(). virtual bool Valid() const = 0; // Position at the first key in the source. The iterator is Valid() @@ -46,6 +47,9 @@ class Iterator : public Cleanable { // Position at the first key in the source that at or past target. // The iterator is Valid() after this call iff the source contains // an entry that comes at or past target. + // All Seek*() methods clear any error status() that the iterator had prior to + // the call; after the seek, status() indicates only the error (if any) that + // happened during the seek, not any past errors. virtual void Seek(const Slice& target) = 0; // Position at the last key in the source that at or before target. @@ -72,7 +76,7 @@ class Iterator : public Cleanable { // Return the value for the current entry. The underlying storage for // the returned slice is valid only until the next modification of // the iterator. - // REQUIRES: !AtEnd() && !AtStart() + // REQUIRES: Valid() virtual Slice value() const = 0; // If an error has occurred, return it. Else return an ok status. diff --git a/src.mk b/src.mk index 1a1a7b58c..592ad94ca 100644 --- a/src.mk +++ b/src.mk @@ -267,6 +267,7 @@ MAIN_SOURCES = \ db/db_inplace_update_test.cc \ db/db_io_failure_test.cc \ db/db_iter_test.cc \ + db/db_iter_stress_test.cc \ db/db_iterator_test.cc \ db/db_log_iter_test.cc \ db/db_memtable_test.cc \ diff --git a/table/block.cc b/table/block.cc index ae82ae337..ce45f4316 100644 --- a/table/block.cc +++ b/table/block.cc @@ -429,12 +429,13 @@ BlockIter* Block::NewIterator(const Comparator* cmp, BlockIter* iter, ret_iter = new BlockIter; } if (size_ < 2*sizeof(uint32_t)) { - ret_iter->SetStatus(Status::Corruption("bad block contents")); + ret_iter->Invalidate(Status::Corruption("bad block contents")); return ret_iter; } const uint32_t num_restarts = NumRestarts(); if (num_restarts == 0) { - ret_iter->SetStatus(Status::OK()); + // Empty block. + ret_iter->Invalidate(Status::OK()); return ret_iter; } else { BlockPrefixIndex* prefix_index_ptr = diff --git a/table/block.h b/table/block.h index 120bcbdcc..6cc304cdc 100644 --- a/table/block.h +++ b/table/block.h @@ -241,8 +241,21 @@ class BlockIter final : public InternalIterator { last_bitmap_offset_ = current_ + 1; } - void SetStatus(Status s) { + // Makes Valid() return false, status() return `s`, and Seek()/Prev()/etc do + // nothing. + void Invalidate(Status s) { + // Assert that the BlockIter is never deleted while Pinning is Enabled. + assert(!pinned_iters_mgr_ || + (pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled())); + + data_ = nullptr; + current_ = restarts_; status_ = s; + + // Clear prev entries cache. + prev_entries_keys_buff_.clear(); + prev_entries_.clear(); + prev_entries_idx_ = -1; } virtual bool Valid() const override { return current_ < restarts_; } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index dacff6a73..e3c218474 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1391,7 +1391,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator( if (cache_handle == nullptr && no_io) { if (input_iter != nullptr) { - input_iter->SetStatus(Status::Incomplete("no blocking io")); + input_iter->Invalidate(Status::Incomplete("no blocking io")); return input_iter; } else { return NewErrorInternalIterator(Status::Incomplete("no blocking io")); @@ -1438,7 +1438,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator( RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); // make sure if something goes wrong, index_reader shall remain intact. if (input_iter != nullptr) { - input_iter->SetStatus(s); + input_iter->Invalidate(s); return input_iter; } else { return NewErrorInternalIterator(s); @@ -1506,7 +1506,7 @@ BlockIter* BlockBasedTable::NewDataBlockIterator( if (s.ok() && block.value == nullptr) { if (no_io) { // Could not read from block_cache and can't do IO - iter->SetStatus(Status::Incomplete("no blocking io")); + iter->Invalidate(Status::Incomplete("no blocking io")); return iter; } std::unique_ptr block_value; @@ -1566,7 +1566,7 @@ BlockIter* BlockBasedTable::NewDataBlockIterator( } } else { assert(block.value == nullptr); - iter->SetStatus(s); + iter->Invalidate(s); } return iter; } @@ -1876,7 +1876,9 @@ void BlockBasedTableIterator::InitDataBlock() { BlockHandle data_block_handle; Slice handle_slice = index_iter_->value(); if (!block_iter_points_to_real_block_ || - handle_slice.compare(prev_index_value_) != 0) { + handle_slice.compare(prev_index_value_) != 0 || + // if previous attempt of reading the block missed cache, try again + data_block_iter_.status().IsIncomplete()) { if (block_iter_points_to_real_block_) { ResetDataIter(); } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index bb1f79774..3b4e49d76 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -528,11 +528,9 @@ class BlockBasedTableIterator : public InternalIterator { return data_block_iter_.value(); } Status status() const override { - // It'd be nice if status() returned a const Status& instead of a Status if (!index_iter_->status().ok()) { return index_iter_->status(); - } else if (block_iter_points_to_real_block_ && - !data_block_iter_.status().ok()) { + } else if (block_iter_points_to_real_block_) { return data_block_iter_.status(); } else { return Status::OK(); @@ -571,8 +569,7 @@ class BlockBasedTableIterator : public InternalIterator { if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) { data_block_iter_.DelegateCleanupsTo(pinned_iters_mgr_); } - data_block_iter_.~BlockIter(); - new (&data_block_iter_) BlockIter(); + data_block_iter_.Invalidate(Status::OK()); block_iter_points_to_real_block_ = false; } } diff --git a/table/cuckoo_table_reader.cc b/table/cuckoo_table_reader.cc index da93af458..d611352b2 100644 --- a/table/cuckoo_table_reader.cc +++ b/table/cuckoo_table_reader.cc @@ -206,7 +206,7 @@ class CuckooTableIterator : public InternalIterator { void Prev() override; Slice key() const override; Slice value() const override; - Status status() const override { return status_; } + Status status() const override { return Status::OK(); } void InitIfNeeded(); private: @@ -241,7 +241,6 @@ class CuckooTableIterator : public InternalIterator { void PrepareKVAtCurrIdx(); CuckooTableReader* reader_; bool initialized_; - Status status_; // Contains a map of keys to bucket_id sorted in key order. std::vector sorted_bucket_ids_; // We assume that the number of items can be stored in uint32 (4 Billion). diff --git a/table/internal_iterator.h b/table/internal_iterator.h index ff7b4d2cb..2fdb14c7f 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -22,6 +22,7 @@ class InternalIterator : public Cleanable { // An iterator is either positioned at a key/value pair, or // not valid. This method returns true iff the iterator is valid. + // Always returns false if !status().ok(). virtual bool Valid() const = 0; // Position at the first key in the source. The iterator is Valid() @@ -35,6 +36,9 @@ class InternalIterator : public Cleanable { // Position at the first key in the source that at or past target // The iterator is Valid() after this call iff the source contains // an entry that comes at or past target. + // All Seek*() methods clear any error status() that the iterator had prior to + // the call; after the seek, status() indicates only the error (if any) that + // happened during the seek, not any past errors. virtual void Seek(const Slice& target) = 0; // Position at the first key in the source that at or before target @@ -61,7 +65,7 @@ class InternalIterator : public Cleanable { // Return the value for the current entry. The underlying storage for // the returned slice is valid only until the next modification of // the iterator. - // REQUIRES: !AtEnd() && !AtStart() + // REQUIRES: Valid() virtual Slice value() const = 0; // If an error has occurred, return it. Else return an ok status. diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index f14acdb9b..5ddea2470 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -87,6 +87,7 @@ class IteratorWrapper { valid_ = iter_->Valid(); if (valid_) { key_ = iter_->key(); + assert(iter_->status().ok()); } } diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 99a867fe7..c2f486c2a 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -52,12 +52,21 @@ class MergingIterator : public InternalIterator { } for (auto& child : children_) { if (child.Valid()) { + assert(child.status().ok()); minHeap_.push(&child); + } else { + considerStatus(child.status()); } } current_ = CurrentForward(); } + void considerStatus(Status s) { + if (!s.ok() && status_.ok()) { + status_ = s; + } + } + virtual void AddIterator(InternalIterator* iter) { assert(direction_ == kForward); children_.emplace_back(iter); @@ -66,8 +75,11 @@ class MergingIterator : public InternalIterator { } auto new_wrapper = children_.back(); if (new_wrapper.Valid()) { + assert(new_wrapper.status().ok()); minHeap_.push(&new_wrapper); current_ = CurrentForward(); + } else { + considerStatus(new_wrapper.status()); } } @@ -77,14 +89,22 @@ class MergingIterator : public InternalIterator { } } - virtual bool Valid() const override { return (current_ != nullptr); } + virtual bool Valid() const override { + return current_ != nullptr && status_.ok(); + } + + virtual Status status() const override { return status_; } virtual void SeekToFirst() override { ClearHeaps(); + status_ = Status::OK(); for (auto& child : children_) { child.SeekToFirst(); if (child.Valid()) { + assert(child.status().ok()); minHeap_.push(&child); + } else { + considerStatus(child.status()); } } direction_ = kForward; @@ -94,10 +114,14 @@ class MergingIterator : public InternalIterator { virtual void SeekToLast() override { ClearHeaps(); InitMaxHeap(); + status_ = Status::OK(); for (auto& child : children_) { child.SeekToLast(); if (child.Valid()) { + assert(child.status().ok()); maxHeap_->push(&child); + } else { + considerStatus(child.status()); } } direction_ = kReverse; @@ -106,6 +130,7 @@ class MergingIterator : public InternalIterator { virtual void Seek(const Slice& target) override { ClearHeaps(); + status_ = Status::OK(); for (auto& child : children_) { { PERF_TIMER_GUARD(seek_child_seek_time); @@ -114,8 +139,11 @@ class MergingIterator : public InternalIterator { PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { + assert(child.status().ok()); PERF_TIMER_GUARD(seek_min_heap_time); minHeap_.push(&child); + } else { + considerStatus(child.status()); } } direction_ = kForward; @@ -128,6 +156,7 @@ class MergingIterator : public InternalIterator { virtual void SeekForPrev(const Slice& target) override { ClearHeaps(); InitMaxHeap(); + status_ = Status::OK(); for (auto& child : children_) { { @@ -137,8 +166,11 @@ class MergingIterator : public InternalIterator { PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { + assert(child.status().ok()); PERF_TIMER_GUARD(seek_max_heap_time); maxHeap_->push(&child); + } else { + considerStatus(child.status()); } } direction_ = kReverse; @@ -172,9 +204,11 @@ class MergingIterator : public InternalIterator { // current is still valid after the Next() call above. Call // replace_top() to restore the heap property. When the same child // iterator yields a sequence of keys, this is cheap. + assert(current_->status().ok()); minHeap_.replace_top(current_); } else { // current stopped being valid, remove it from the heap. + considerStatus(current_->status()); minHeap_.pop(); } current_ = CurrentForward(); @@ -191,28 +225,35 @@ class MergingIterator : public InternalIterator { // just after the if-block. ClearHeaps(); InitMaxHeap(); + Slice target = key(); for (auto& child : children_) { if (&child != current_) { if (!prefix_seek_mode_) { - child.Seek(key()); + child.Seek(target); if (child.Valid()) { // Child is at first entry >= key(). Step back one to be < key() TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); + assert(child.status().ok()); child.Prev(); } else { // Child has no entries >= key(). Position at last entry. TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast"); + considerStatus(child.status()); child.SeekToLast(); } + considerStatus(child.status()); } else { - child.SeekForPrev(key()); - if (child.Valid() && comparator_->Equal(key(), child.key())) { + child.SeekForPrev(target); + considerStatus(child.status()); + if (child.Valid() && comparator_->Equal(target, child.key())) { child.Prev(); + considerStatus(child.status()); } } } if (child.Valid()) { + assert(child.status().ok()); maxHeap_->push(&child); } } @@ -238,9 +279,11 @@ class MergingIterator : public InternalIterator { // current is still valid after the Prev() call above. Call // replace_top() to restore the heap property. When the same child // iterator yields a sequence of keys, this is cheap. + assert(current_->status().ok()); maxHeap_->replace_top(current_); } else { // current stopped being valid, remove it from the heap. + considerStatus(current_->status()); maxHeap_->pop(); } current_ = CurrentReverse(); @@ -256,17 +299,6 @@ class MergingIterator : public InternalIterator { return current_->value(); } - virtual Status status() const override { - Status s; - for (auto& child : children_) { - s = child.status(); - if (!s.ok()) { - break; - } - } - return s; - } - virtual void SetPinnedItersMgr( PinnedIteratorsManager* pinned_iters_mgr) override { pinned_iters_mgr_ = pinned_iters_mgr; @@ -302,6 +334,8 @@ class MergingIterator : public InternalIterator { // child iterators are valid. This is the top of minHeap_ or maxHeap_ // depending on the direction. IteratorWrapper* current_; + // If any of the children have non-ok status, this is one of them. + Status status_; // Which direction is the iterator moving? enum Direction { kForward, @@ -334,11 +368,14 @@ void MergingIterator::SwitchToForward() { // Otherwise, advance the non-current children. We advance current_ // just after the if-block. ClearHeaps(); + Slice target = key(); for (auto& child : children_) { if (&child != current_) { - child.Seek(key()); - if (child.Valid() && comparator_->Equal(key(), child.key())) { + child.Seek(target); + considerStatus(child.status()); + if (child.Valid() && comparator_->Equal(target, child.key())) { child.Next(); + considerStatus(child.status()); } } if (child.Valid()) { diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 476381cac..8408abfad 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -625,6 +625,7 @@ bool PlainTableIterator::Valid() const { } void PlainTableIterator::SeekToFirst() { + status_ = Status::OK(); next_offset_ = table_->data_start_offset_; if (next_offset_ >= table_->file_info_.data_end_offset) { next_offset_ = offset_ = table_->file_info_.data_end_offset; @@ -636,6 +637,7 @@ void PlainTableIterator::SeekToFirst() { void PlainTableIterator::SeekToLast() { assert(false); status_ = Status::NotSupported("SeekToLast() is not supported in PlainTable"); + next_offset_ = offset_ = table_->file_info_.data_end_offset; } void PlainTableIterator::Seek(const Slice& target) { @@ -676,6 +678,7 @@ void PlainTableIterator::Seek(const Slice& target) { if (!table_->IsTotalOrderMode()) { prefix_hash = GetSliceHash(prefix_slice); if (!table_->MatchBloom(prefix_hash)) { + status_ = Status::OK(); offset_ = next_offset_ = table_->file_info_.data_end_offset; return; } @@ -711,6 +714,7 @@ void PlainTableIterator::SeekForPrev(const Slice& /*target*/) { assert(false); status_ = Status::NotSupported("SeekForPrev() is not supported in PlainTable"); + offset_ = next_offset_ = table_->file_info_.data_end_offset; } void PlainTableIterator::Next() { diff --git a/table/table_test.cc b/table/table_test.cc index fd2866283..19bd8043d 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -256,7 +256,7 @@ class KeyConvertingIterator : public InternalIterator { delete iter_; } } - virtual bool Valid() const override { return iter_->Valid(); } + virtual bool Valid() const override { return iter_->Valid() && status_.ok(); } virtual void Seek(const Slice& target) override { ParsedInternalKey ikey(target, kMaxSequenceNumber, kTypeValue); std::string encoded; @@ -2368,6 +2368,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) { iter->Next(); } ASSERT_OK(iter->status()); + iter.reset(); const ImmutableCFOptions ioptions1(opt); ASSERT_OK(c.Reopen(ioptions1)); diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 97734b146..09e0e1ef1 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -47,8 +47,8 @@ class TwoLevelIterator : public InternalIterator { return second_level_iter_.value(); } virtual Status status() const override { - // It'd be nice if status() returned a const Status& instead of a Status if (!first_level_iter_.status().ok()) { + assert(second_level_iter_.iter() == nullptr); return first_level_iter_.status(); } else if (second_level_iter_.iter() != nullptr && !second_level_iter_.status().ok()) { @@ -101,7 +101,7 @@ void TwoLevelIterator::SeekForPrev(const Slice& target) { second_level_iter_.SeekForPrev(target); } if (!Valid()) { - if (!first_level_iter_.Valid()) { + if (!first_level_iter_.Valid() && first_level_iter_.status().ok()) { first_level_iter_.SeekToLast(); InitDataBlock(); if (second_level_iter_.iter() != nullptr) { @@ -144,8 +144,7 @@ void TwoLevelIterator::Prev() { void TwoLevelIterator::SkipEmptyDataBlocksForward() { while (second_level_iter_.iter() == nullptr || - (!second_level_iter_.Valid() && - !second_level_iter_.status().IsIncomplete())) { + (!second_level_iter_.Valid() && second_level_iter_.status().ok())) { // Move to next block if (!first_level_iter_.Valid()) { SetSecondLevelIterator(nullptr); @@ -161,8 +160,7 @@ void TwoLevelIterator::SkipEmptyDataBlocksForward() { void TwoLevelIterator::SkipEmptyDataBlocksBackward() { while (second_level_iter_.iter() == nullptr || - (!second_level_iter_.Valid() && - !second_level_iter_.status().IsIncomplete())) { + (!second_level_iter_.Valid() && second_level_iter_.status().ok())) { // Move to next block if (!first_level_iter_.Valid()) { SetSecondLevelIterator(nullptr); @@ -177,9 +175,6 @@ void TwoLevelIterator::SkipEmptyDataBlocksBackward() { } void TwoLevelIterator::SetSecondLevelIterator(InternalIterator* iter) { - if (second_level_iter_.iter() != nullptr) { - SaveError(second_level_iter_.status()); - } InternalIterator* old_iter = second_level_iter_.Set(iter); delete old_iter; } diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h index 5ead75dd7..1565c670b 100644 --- a/utilities/blob_db/blob_db_iterator.h +++ b/utilities/blob_db/blob_db_iterator.h @@ -119,6 +119,7 @@ class BlobDBIterator : public Iterator { TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1"); TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2"); value_.Reset(); + status_ = Status::OK(); if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) { Status s = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_); if (s.IsNotFound()) { diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 31f7b3993..2849e4bbf 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -79,6 +79,7 @@ class BaseDeltaIterator : public Iterator { void Next() override { if (!Valid()) { status_ = Status::NotSupported("Next() on invalid iterator"); + return; } if (!forward_) { @@ -114,6 +115,7 @@ class BaseDeltaIterator : public Iterator { void Prev() override { if (!Valid()) { status_ = Status::NotSupported("Prev() on invalid iterator"); + return; } if (forward_) { @@ -170,6 +172,21 @@ class BaseDeltaIterator : public Iterator { private: void AssertInvariants() { #ifndef NDEBUG + bool not_ok = false; + if (!base_iterator_->status().ok()) { + assert(!base_iterator_->Valid()); + not_ok = true; + } + if (!delta_iterator_->status().ok()) { + assert(!delta_iterator_->Valid()); + not_ok = true; + } + if (not_ok) { + assert(!Valid()); + assert(!status().ok()); + return; + } + if (!Valid()) { return; } @@ -238,13 +255,25 @@ class BaseDeltaIterator : public Iterator { void UpdateCurrent() { // Suppress false positive clang analyzer warnings. #ifndef __clang_analyzer__ + status_ = Status::OK(); while (true) { WriteEntry delta_entry; if (DeltaValid()) { + assert(delta_iterator_->status().ok()); delta_entry = delta_iterator_->Entry(); + } else if (!delta_iterator_->status().ok()) { + // Expose the error status and stop. + current_at_base_ = false; + return; } equal_keys_ = false; if (!BaseValid()) { + if (!base_iterator_->status().ok()) { + // Expose the error status and stop. + current_at_base_ = true; + return; + } + // Base has finished. if (!DeltaValid()) { // Finished