diff --git a/CMakeLists.txt b/CMakeLists.txt index 160a53641..7a5df4213 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -845,6 +845,7 @@ if(WITH_TESTS) db/db_inplace_update_test.cc db/db_io_failure_test.cc db/db_iter_test.cc + db/db_iter_stress_test.cc db/db_iterator_test.cc db/db_log_iter_test.cc db/db_memtable_test.cc diff --git a/HISTORY.md b/HISTORY.md index 7e32e73f7..1c482dd71 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,8 @@ * Touch-up to write-related counters in PerfContext. New counters added: write_scheduling_flushes_compactions_time, write_thread_wait_nanos. Counters whose behavior was fixed or modified: write_memtable_time, write_pre_and_post_process_time, write_delay_time. * Posix Env's NewRandomRWFile() will fail if the file doesn't exist. * Now, `DBOptions::use_direct_io_for_flush_and_compaction` only applies to background writes, and `DBOptions::use_direct_reads` applies to both user reads and background reads. This conforms with Linux's `open(2)` manpage, which advises against simultaneously reading a file in buffered and direct modes, due to possibly undefined behavior and degraded performance. +* Iterator::Valid() always returns false if !status().ok(). So, now when doing a Seek() followed by some Next()s, there's no need to check status() after every operation. +* Iterator::Seek()/SeekForPrev()/SeekToFirst()/SeekToLast() always resets status(). ### New Features * Introduce TTL for level compaction so that all files older than ttl go through the compaction process to get rid of old data. diff --git a/Makefile b/Makefile index 9702fe278..5d0092d11 100644 --- a/Makefile +++ b/Makefile @@ -402,6 +402,7 @@ TESTS = \ db_blob_index_test \ db_bloom_filter_test \ db_iter_test \ + db_iter_stress_test \ db_log_iter_test \ db_compaction_filter_test \ db_compaction_test \ @@ -1195,6 +1196,9 @@ db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) db_iter_test: db/db_iter_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_iter_stress_test: db/db_iter_stress_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_universal_compaction_test: db/db_universal_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 20390f073..7bd3a45dc 100644 --- a/TARGETS +++ b/TARGETS @@ -560,6 +560,11 @@ ROCKS_TESTS = [ "db/db_iter_test.cc", "serial", ], + [ + "db_iter_stress_test", + "db/db_iter_stress_test.cc", + "serial", + ], [ "db_iterator_test", "db/db_iterator_test.cc", diff --git a/db/corruption_test.cc b/db/corruption_test.cc index cbd125b98..dbc09e753 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -333,9 +333,9 @@ TEST_F(CorruptionTest, TableFileIndexData) { Corrupt(kTableFile, -2000, 500); Reopen(); dbi = reinterpret_cast(db_); - // one full file should be readable, since only one was corrupted + // one full file may be readable, since only one was corrupted // the other file should be fully non-readable, since index was corrupted - Check(5000, 5000); + Check(0, 5000); ASSERT_NOK(dbi->VerifyChecksum()); } diff --git a/db/db_iter.cc b/db/db_iter.cc index 29b538de9..d2d1d0451 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -52,8 +52,13 @@ class DBIter final: public Iterator { public: // The following is grossly complicated. TODO: clean it up // Which direction is the iterator currently moving? - // (1) When moving forward, the internal iterator is positioned at - // the exact entry that yields this->key(), this->value() + // (1) When moving forward: + // (1a) if current_entry_is_merged_ = false, the internal iterator is + // positioned at the exact entry that yields this->key(), this->value() + // (1b) if current_entry_is_merged_ = true, the internal iterator is + // positioned immediately after the last entry that contributed to the + // current this->value(). That entry may or may not have key equal to + // this->key(). // (2) When moving backwards, the internal iterator is positioned // just before all entries whose user key == this->key(). enum Direction { @@ -194,6 +199,7 @@ class DBIter final: public Iterator { if (status_.ok()) { return iter_->status(); } else { + assert(!valid_); return status_; } } @@ -235,18 +241,21 @@ class DBIter final: public Iterator { void set_valid(bool v) { valid_ = v; } private: - void ReverseToForward(); - void ReverseToBackward(); - void PrevInternal(); - void FindParseableKey(ParsedInternalKey* ikey, Direction direction); + // For all methods in this block: + // PRE: iter_->Valid() && status_.ok() + // Return false if there was an error, and status() is non-ok, valid_ = false; + // in this case callers would usually stop what they were doing and return. + bool ReverseToForward(); + bool ReverseToBackward(); bool FindValueForCurrentKey(); bool FindValueForCurrentKeyUsingSeek(); - void FindPrevUserKey(); - void FindNextUserKey(); - inline void FindNextUserEntry(bool skipping, bool prefix_check); - void FindNextUserEntryInternal(bool skipping, bool prefix_check); + bool FindUserKeyBeforeSavedKey(); + inline bool FindNextUserEntry(bool skipping, bool prefix_check); + bool FindNextUserEntryInternal(bool skipping, bool prefix_check); bool ParseKey(ParsedInternalKey* key); - void MergeValuesNewToOld(); + bool MergeValuesNewToOld(); + + void PrevInternal(); bool TooManyInternalKeysSkipped(bool increment = true); bool IsVisible(SequenceNumber sequence); @@ -336,6 +345,7 @@ class DBIter final: public Iterator { inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { if (!ParseInternalKey(iter_->key(), ikey)) { status_ = Status::Corruption("corrupted internal key in DBIter"); + valid_ = false; ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s", iter_->key().ToString(true).c_str()); return false; @@ -346,12 +356,16 @@ inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { void DBIter::Next() { assert(valid_); + assert(status_.ok()); // Release temporarily pinned blocks from last operation ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); + bool ok = true; if (direction_ == kReverse) { - ReverseToForward(); + if (!ReverseToForward()) { + ok = false; + } } else if (iter_->Valid() && !current_entry_is_merged_) { // If the current value is not a merge, the iter position is the // current key, which is already returned. We can safely issue a @@ -365,13 +379,12 @@ void DBIter::Next() { if (statistics_ != nullptr) { local_stats_.next_count_++; } - // Now we point to the next internal position, for both of merge and - // not merge cases. - if (!iter_->Valid()) { + if (ok && iter_->Valid()) { + FindNextUserEntry(true /* skipping the current user key */, + prefix_same_as_start_); + } else { valid_ = false; - return; } - FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_); if (statistics_ != nullptr && valid_) { local_stats_.next_found_count_++; local_stats_.bytes_read_ += (key().size() + value().size()); @@ -392,15 +405,16 @@ void DBIter::Next() { // keys against the prefix of the seeked key. Set to false when // performing a seek without a key (e.g. SeekToFirst). Set to // prefix_same_as_start_ for other iterations. -inline void DBIter::FindNextUserEntry(bool skipping, bool prefix_check) { +inline bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) { PERF_TIMER_GUARD(find_next_user_entry_time); - FindNextUserEntryInternal(skipping, prefix_check); + return FindNextUserEntryInternal(skipping, prefix_check); } // Actual implementation of DBIter::FindNextUserEntry() -void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { +bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // Loop until we hit an acceptable entry to yield assert(iter_->Valid()); + assert(status_.ok()); assert(direction_ == kForward); current_entry_is_merged_ = false; @@ -420,9 +434,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { do { if (!ParseKey(&ikey_)) { - // Skip corrupted keys. - iter_->Next(); - continue; + return false; } if (iterate_upper_bound_ != nullptr && @@ -437,7 +449,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { } if (TooManyInternalKeysSkipped()) { - return; + return false; } if (IsVisible(ikey_.sequence)) { @@ -455,18 +467,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // if iterartor specified start_seqnum we // 1) return internal key, including the type // 2) return ikey only if ikey.seqnum >= start_seqnum_ - // not that if deletion seqnum is < start_seqnum_ we + // note that if deletion seqnum is < start_seqnum_ we // just skip it like in normal iterator. if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) { saved_key_.SetInternalKey(ikey_); - valid_=true; - return; + valid_ = true; + return true; } else { saved_key_.SetUserKey( ikey_.user_key, !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); - skipping = true; - PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + skipping = true; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); } break; case kTypeValue: @@ -480,7 +492,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { if (ikey_.sequence >= start_seqnum_) { saved_key_.SetInternalKey(ikey_); valid_ = true; - return; + return true; } else { // this key and all previous versions shouldn't be included, // skipping @@ -507,14 +519,15 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { "Encounter unexpected blob index. Please open DB with " "rocksdb::blob_db::BlobDB instead."); valid_ = false; - } else { - is_blob_ = true; - valid_ = true; + return false; } - return; + + is_blob_ = true; + valid_ = true; + return true; } else { valid_ = true; - return; + return true; } } break; @@ -535,8 +548,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // value current_entry_is_merged_ = true; valid_ = true; - MergeValuesNewToOld(); // Go to a different state machine - return; + return MergeValuesNewToOld(); // Go to a different state machine } break; default: @@ -545,13 +557,14 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { } } } else { - // This key was inserted after our snapshot was taken. PERF_COUNTER_ADD(internal_recent_skipped_count, 1); - // Here saved_key_ may contain some old key, or the default empty key, or - // key assigned by some random other method. We don't care. - if (user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey()) <= - 0) { + // This key was inserted after our snapshot was taken. + // If this happens too many times in a row for the same user key, we want + // to seek to the target sequence number. + int cmp = + user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey()); + if (cmp == 0 || (skipping && cmp <= 0)) { num_skipped++; } else { saved_key_.SetUserKey( @@ -591,7 +604,9 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { iter_->Next(); } } while (iter_->Valid()); + valid_ = false; + return iter_->status().ok(); } // Merge values of the same user key starting from the current iter_ position @@ -600,12 +615,12 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // saved_key_ stores the user key // POST: saved_value_ has the merged value for the user key // iter_ points to the next entry (or invalid) -void DBIter::MergeValuesNewToOld() { +bool DBIter::MergeValuesNewToOld() { if (!merge_operator_) { ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null."); status_ = Status::InvalidArgument("merge_operator_ must be set."); valid_ = false; - return; + return false; } // Temporarily pin the blocks that hold merge operands @@ -621,8 +636,7 @@ void DBIter::MergeValuesNewToOld() { for (iter_->Next(); iter_->Valid(); iter_->Next()) { TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand"); if (!ParseKey(&ikey)) { - // skip corrupted key - continue; + return false; } if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { @@ -639,7 +653,6 @@ void DBIter::MergeValuesNewToOld() { } else if (kTypeValue == ikey.type) { // hit a put, merge the put value with operands and store the // final result in saved_value_. We are done! - // ignore corruption if there is any. const Slice val = iter_->value(); s = MergeHelper::TimedFullMerge( merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(), @@ -647,10 +660,15 @@ void DBIter::MergeValuesNewToOld() { if (!s.ok()) { valid_ = false; status_ = s; + return false; } // iter_ is positioned after put iter_->Next(); - return; + if (!iter_->status().ok()) { + valid_ = false; + return false; + } + return true; } else if (kTypeMerge == ikey.type) { // hit a merge, add the value as an operand and run associative merge. // when complete, add result to operands and continue. @@ -668,12 +686,17 @@ void DBIter::MergeValuesNewToOld() { Status::NotSupported("Blob DB does not support merge operator."); } valid_ = false; - return; + return false; } else { assert(false); } } + if (!iter_->status().ok()) { + valid_ = false; + return false; + } + // we either exhausted all internal keys under this user key, or hit // a deletion marker. // feed null as the existing value to the merge operator, such that @@ -685,17 +708,27 @@ void DBIter::MergeValuesNewToOld() { if (!s.ok()) { valid_ = false; status_ = s; + return false; } + + assert(status_.ok()); + return true; } void DBIter::Prev() { assert(valid_); + assert(status_.ok()); ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); + bool ok = true; if (direction_ == kForward) { - ReverseToBackward(); + if (!ReverseToBackward()) { + ok = false; + } + } + if (ok) { + PrevInternal(); } - PrevInternal(); if (statistics_ != nullptr) { local_stats_.prev_count_++; if (valid_) { @@ -705,71 +738,76 @@ void DBIter::Prev() { } } -void DBIter::ReverseToForward() { - if (prefix_extractor_ != nullptr && !total_order_seek_) { +bool DBIter::ReverseToForward() { + assert(iter_->status().ok()); + + // When moving backwards, iter_ is positioned on _previous_ key, which may + // not exist or may have different prefix than the current key(). + // If that's the case, seek iter_ to current key. + if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) { IterKey last_key; last_key.SetInternalKey(ParsedInternalKey( saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); iter_->Seek(last_key.GetInternalKey()); } - FindNextUserKey(); + direction_ = kForward; - if (!iter_->Valid()) { - iter_->SeekToFirst(); - range_del_agg_.InvalidateTombstoneMapPositions(); + // Skip keys less than the current key() (a.k.a. saved_key_). + while (iter_->Valid()) { + ParsedInternalKey ikey; + if (!ParseKey(&ikey)) { + return false; + } + if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >= + 0) { + return true; + } + iter_->Next(); } + + if (!iter_->status().ok()) { + valid_ = false; + return false; + } + + return true; } -void DBIter::ReverseToBackward() { - if (prefix_extractor_ != nullptr && !total_order_seek_) { - IterKey last_key; - last_key.SetInternalKey(ParsedInternalKey(saved_key_.GetUserKey(), 0, - kValueTypeForSeekForPrev)); - iter_->SeekForPrev(last_key.GetInternalKey()); - } - if (current_entry_is_merged_) { - // Not placed in the same key. Need to call Prev() until finding the - // previous key. - if (!iter_->Valid()) { - iter_->SeekToLast(); - range_del_agg_.InvalidateTombstoneMapPositions(); - } - ParsedInternalKey ikey; - FindParseableKey(&ikey, kReverse); - while (iter_->Valid() && - user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) > - 0) { - assert(ikey.sequence != kMaxSequenceNumber); - if (!IsVisible(ikey.sequence)) { - PERF_COUNTER_ADD(internal_recent_skipped_count, 1); - } else { - PERF_COUNTER_ADD(internal_key_skipped_count, 1); - } - iter_->Prev(); - FindParseableKey(&ikey, kReverse); - } - } -#ifndef NDEBUG - if (iter_->Valid()) { - ParsedInternalKey ikey; - assert(ParseKey(&ikey)); - assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) <= - 0); - } -#endif +// Move iter_ to the key before saved_key_. +bool DBIter::ReverseToBackward() { + assert(iter_->status().ok()); + + // When current_entry_is_merged_ is true, iter_ may be positioned on the next + // key, which may not exist or may have prefix different from current. + // If that's the case, seek to saved_key_. + if (current_entry_is_merged_ && + ((prefix_extractor_ != nullptr && !total_order_seek_) || + !iter_->Valid())) { + IterKey last_key; + // Using kMaxSequenceNumber and kValueTypeForSeek + // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller + // than saved_key_. + last_key.SetInternalKey(ParsedInternalKey( + saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); + if (prefix_extractor_ != nullptr && !total_order_seek_) { + iter_->SeekForPrev(last_key.GetInternalKey()); + } else { + // Some iterators may not support SeekForPrev(), so we avoid using it + // when prefix seek mode is disabled. This is somewhat expensive + // (an extra Prev(), as well as an extra change of direction of iter_), + // so we may need to reconsider it later. + iter_->Seek(last_key.GetInternalKey()); + if (!iter_->Valid() && iter_->status().ok()) { + iter_->SeekToLast(); + } + } + } - FindPrevUserKey(); direction_ = kReverse; + return FindUserKeyBeforeSavedKey(); } void DBIter::PrevInternal() { - if (!iter_->Valid()) { - valid_ = false; - return; - } - - ParsedInternalKey ikey; - while (iter_->Valid()) { saved_key_.SetUserKey( ExtractUserKey(iter_->key()), @@ -791,38 +829,41 @@ void DBIter::PrevInternal() { return; } - if (FindValueForCurrentKey()) { - if (!iter_->Valid()) { - return; - } - FindParseableKey(&ikey, kReverse); - if (user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { - FindPrevUserKey(); - } + if (!FindValueForCurrentKey()) { // assigns valid_ + return; + } + + // Whether or not we found a value for current key, we need iter_ to end up + // on a smaller key. + if (!FindUserKeyBeforeSavedKey()) { + return; + } + + if (valid_) { + // Found the value. return; } if (TooManyInternalKeysSkipped(false)) { return; } - - if (!iter_->Valid()) { - break; - } - FindParseableKey(&ikey, kReverse); - if (user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { - FindPrevUserKey(); - } } + // We haven't found any key - iterator is not valid - // Or the prefix is different than start prefix - assert(!iter_->Valid()); valid_ = false; } -// This function checks, if the entry with biggest sequence_number <= sequence_ -// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in -// saved_value_ +// Used for backwards iteration. +// Looks at the entries with user key saved_key_ and finds the most up-to-date +// value for it, or executes a merge, or determines that the value was deleted. +// Sets valid_ to true if the value is found and is ready to be presented to +// the user through value(). +// Sets valid_ to false if the value was deleted, and we should try another key. +// Returns false if an error occurred, and !status().ok() and !valid_. +// +// PRE: iter_ is positioned on the last entry with user key equal to saved_key_. +// POST: iter_ is positioned on one of the entries equal to saved_key_, or on +// the entry just before them, or on the entry just after them. bool DBIter::FindValueForCurrentKey() { assert(iter_->Valid()); merge_context_.Clear(); @@ -832,20 +873,27 @@ bool DBIter::FindValueForCurrentKey() { ValueType last_not_merge_type = kTypeDeletion; ValueType last_key_entry_type = kTypeDeletion; - ParsedInternalKey ikey; - FindParseableKey(&ikey, kReverse); - // Temporarily pin blocks that hold (merge operands / the value) ReleaseTempPinnedData(); TempPinData(); size_t num_skipped = 0; - while (iter_->Valid() && IsVisible(ikey.sequence) && - user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { + while (iter_->Valid()) { + ParsedInternalKey ikey; + if (!ParseKey(&ikey)) { + return false; + } + + if (!IsVisible(ikey.sequence) || + !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { + break; + } if (TooManyInternalKeysSkipped()) { return false; } - // We iterate too much: let's use Seek() to avoid too much key comparisons + // This user key has lots of entries. + // We're going from old to new, and it's taking too long. Let's do a Seek() + // and go from new to old. This helps when a key was overwritten many times. if (num_skipped >= max_skip_) { return FindValueForCurrentKeyUsingSeek(); } @@ -892,10 +940,13 @@ bool DBIter::FindValueForCurrentKey() { } PERF_COUNTER_ADD(internal_key_skipped_count, 1); - assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())); iter_->Prev(); ++num_skipped; - FindParseableKey(&ikey, kReverse); + } + + if (!iter_->status().ok()) { + valid_ = false; + return false; } Status s; @@ -905,7 +956,7 @@ bool DBIter::FindValueForCurrentKey() { case kTypeSingleDeletion: case kTypeRangeDeletion: valid_ = false; - return false; + return true; case kTypeMerge: current_entry_is_merged_ = true; if (last_not_merge_type == kTypeDeletion || @@ -926,7 +977,7 @@ bool DBIter::FindValueForCurrentKey() { Status::NotSupported("Blob DB does not support merge operator."); } valid_ = false; - return true; + return false; } else { assert(last_not_merge_type == kTypeValue); s = MergeHelper::TimedFullMerge( @@ -936,7 +987,7 @@ bool DBIter::FindValueForCurrentKey() { } break; case kTypeValue: - // do nothing - we've already has value in saved_value_ + // do nothing - we've already has value in pinned_value_ break; case kTypeBlobIndex: if (!allow_blob_) { @@ -945,7 +996,7 @@ bool DBIter::FindValueForCurrentKey() { "Encounter unexpected blob index. Please open DB with " "rocksdb::blob_db::BlobDB instead."); valid_ = false; - return true; + return false; } is_blob_ = true; break; @@ -953,17 +1004,19 @@ bool DBIter::FindValueForCurrentKey() { assert(false); break; } - if (s.ok()) { - valid_ = true; - } else { + if (!s.ok()) { valid_ = false; status_ = s; + return false; } + valid_ = true; return true; } // This function is used in FindValueForCurrentKey. // We use Seek() function instead of Prev() to find necessary value +// TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld(). +// Would be nice to reuse some code. bool DBIter::FindValueForCurrentKeyUsingSeek() { // FindValueForCurrentKey will enable pinning before calling // FindValueForCurrentKeyUsingSeek() @@ -974,26 +1027,38 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { iter_->Seek(last_key); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); - // assume there is at least one parseable key for this user key - ParsedInternalKey ikey; - FindParseableKey(&ikey, kForward); - assert(iter_->Valid()); - assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())); - // In case read_callback presents, the value we seek to may not be visible. - // Seek for the next value that's visible. - while (!IsVisible(ikey.sequence)) { + // Find the next value that's visible. + ParsedInternalKey ikey; + while (true) { + if (!iter_->Valid()) { + valid_ = false; + return iter_->status().ok(); + } + + if (!ParseKey(&ikey)) { + return false; + } + if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { + // No visible values for this key, even though FindValueForCurrentKey() + // has seen some. This is possible if we're using a tailing iterator, and + // the entries were discarded in a compaction. + valid_ = false; + return true; + } + + if (IsVisible(ikey.sequence)) { + break; + } + iter_->Next(); - FindParseableKey(&ikey, kForward); - assert(iter_->Valid()); - assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())); } if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || range_del_agg_.ShouldDelete( ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { valid_ = false; - return false; + return true; } if (ikey.type == kTypeBlobIndex && !allow_blob_) { ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); @@ -1001,7 +1066,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { "Encounter unexpected blob index. Please open DB with " "rocksdb::blob_db::BlobDB instead."); valid_ = false; - return true; + return false; } if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { assert(iter_->IsValuePinned()); @@ -1012,115 +1077,147 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { // kTypeMerge. We need to collect all kTypeMerge values and save them // in operands + assert(ikey.type == kTypeMerge); current_entry_is_merged_ = true; merge_context_.Clear(); - while ( - iter_->Valid() && - user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) && - ikey.type == kTypeMerge && - !range_del_agg_.ShouldDelete( - ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { - merge_context_.PushOperand(iter_->value(), - iter_->IsValuePinned() /* operand_pinned */); - PERF_COUNTER_ADD(internal_merge_count, 1); + merge_context_.PushOperand(iter_->value(), + iter_->IsValuePinned() /* operand_pinned */); + while (true) { iter_->Next(); - FindParseableKey(&ikey, kForward); - } - Status s; - if (!iter_->Valid() || - !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) || - ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || - range_del_agg_.ShouldDelete( - ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { - s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(), - nullptr, merge_context_.GetOperands(), - &saved_value_, logger_, statistics_, env_, - &pinned_value_, true); - // Make iter_ valid and point to saved_key_ - if (!iter_->Valid() || - !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { - iter_->Seek(last_key); - RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); + if (!iter_->Valid()) { + if (!iter_->status().ok()) { + valid_ = false; + return false; + } + break; } - if (s.ok()) { + if (!ParseKey(&ikey)) { + return false; + } + if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { + break; + } + + if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || + range_del_agg_.ShouldDelete( + ikey, + RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { + break; + } else if (ikey.type == kTypeValue) { + const Slice val = iter_->value(); + Status s = MergeHelper::TimedFullMerge( + merge_operator_, saved_key_.GetUserKey(), &val, + merge_context_.GetOperands(), &saved_value_, logger_, statistics_, + env_, &pinned_value_, true); + if (!s.ok()) { + valid_ = false; + status_ = s; + return false; + } valid_ = true; - } else { + return true; + } else if (ikey.type == kTypeMerge) { + merge_context_.PushOperand(iter_->value(), + iter_->IsValuePinned() /* operand_pinned */); + PERF_COUNTER_ADD(internal_merge_count, 1); + } else if (ikey.type == kTypeBlobIndex) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else { + status_ = + Status::NotSupported("Blob DB does not support merge operator."); + } valid_ = false; - status_ = s; + return false; + } else { + assert(false); } - return true; } - const Slice& val = iter_->value(); - s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(), - &val, merge_context_.GetOperands(), - &saved_value_, logger_, statistics_, env_, - &pinned_value_, true); - if (s.ok()) { - valid_ = true; - } else { + Status s = MergeHelper::TimedFullMerge( + merge_operator_, saved_key_.GetUserKey(), nullptr, + merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_, + &pinned_value_, true); + if (!s.ok()) { valid_ = false; status_ = s; + return false; } + + // Make sure we leave iter_ in a good state. If it's valid and we don't care + // about prefixes, that's already good enough. Otherwise it needs to be + // seeked to the current key. + if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) { + if (prefix_extractor_ != nullptr && !total_order_seek_) { + iter_->SeekForPrev(last_key); + } else { + iter_->Seek(last_key); + if (!iter_->Valid() && iter_->status().ok()) { + iter_->SeekToLast(); + } + } + RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); + } + + valid_ = true; return true; } -// Used in Next to change directions -// Go to next user key -// Don't use Seek(), -// because next user key will be very close -void DBIter::FindNextUserKey() { - if (!iter_->Valid()) { - return; - } - ParsedInternalKey ikey; - FindParseableKey(&ikey, kForward); - while (iter_->Valid() && - !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { - iter_->Next(); - FindParseableKey(&ikey, kForward); - } -} - -// Go to previous user_key -void DBIter::FindPrevUserKey() { - if (!iter_->Valid()) { - return; - } +// Move backwards until the key smaller than saved_key_. +// Changes valid_ only if return value is false. +bool DBIter::FindUserKeyBeforeSavedKey() { + assert(status_.ok()); size_t num_skipped = 0; - ParsedInternalKey ikey; - FindParseableKey(&ikey, kReverse); - int cmp; - while (iter_->Valid() && - ((cmp = user_comparator_->Compare(ikey.user_key, - saved_key_.GetUserKey())) == 0 || - (cmp > 0 && !IsVisible(ikey.sequence)))) { - if (TooManyInternalKeysSkipped()) { - return; + while (iter_->Valid()) { + ParsedInternalKey ikey; + if (!ParseKey(&ikey)) { + return false; } - if (cmp == 0) { - if (num_skipped >= max_skip_) { - num_skipped = 0; - IterKey last_key; - last_key.SetInternalKey(ParsedInternalKey( - saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); - iter_->Seek(last_key.GetInternalKey()); - RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); - } else { - ++num_skipped; - } + if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) { + return true; } + + if (TooManyInternalKeysSkipped()) { + return false; + } + assert(ikey.sequence != kMaxSequenceNumber); if (!IsVisible(ikey.sequence)) { PERF_COUNTER_ADD(internal_recent_skipped_count, 1); } else { PERF_COUNTER_ADD(internal_key_skipped_count, 1); } + + if (num_skipped >= max_skip_) { + num_skipped = 0; + IterKey last_key; + last_key.SetInternalKey(ParsedInternalKey( + saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); + // It would be more efficient to use SeekForPrev() here, but some + // iterators may not support it. + iter_->Seek(last_key.GetInternalKey()); + RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); + if (!iter_->Valid()) { + break; + } + } else { + ++num_skipped; + } + iter_->Prev(); - FindParseableKey(&ikey, kReverse); } + + if (!iter_->status().ok()) { + valid_ = false; + return false; + } + + return true; } bool DBIter::TooManyInternalKeysSkipped(bool increment) { @@ -1140,19 +1237,9 @@ bool DBIter::IsVisible(SequenceNumber sequence) { (read_callback_ == nullptr || read_callback_->IsCommitted(sequence)); } -// Skip all unparseable keys -void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) { - while (iter_->Valid() && !ParseKey(ikey)) { - if (direction == kReverse) { - iter_->Prev(); - } else { - iter_->Next(); - } - } -} - void DBIter::Seek(const Slice& target) { StopWatch sw(env_, statistics_, DB_SEEK); + status_ = Status::OK(); ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); saved_key_.Clear(); @@ -1201,6 +1288,7 @@ void DBIter::Seek(const Slice& target) { void DBIter::SeekForPrev(const Slice& target) { StopWatch sw(env_, statistics_, DB_SEEK); + status_ = Status::OK(); ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); saved_key_.Clear(); @@ -1249,15 +1337,16 @@ void DBIter::SeekForPrev(const Slice& target) { } void DBIter::SeekToFirst() { - // Don't use iter_::Seek() if we set a prefix extractor - // because prefix seek will be used. - if (prefix_extractor_ != nullptr) { - max_skip_ = std::numeric_limits::max(); - } if (iterate_lower_bound_ != nullptr) { Seek(*iterate_lower_bound_); return; } + // Don't use iter_::Seek() if we set a prefix extractor + // because prefix seek will be used. + if (prefix_extractor_ != nullptr && !total_order_seek_) { + max_skip_ = std::numeric_limits::max(); + } + status_ = Status::OK(); direction_ = kForward; ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); @@ -1293,11 +1382,22 @@ void DBIter::SeekToFirst() { } void DBIter::SeekToLast() { + if (iterate_upper_bound_ != nullptr) { + // Seek to last key strictly less than ReadOptions.iterate_upper_bound. + SeekForPrev(*iterate_upper_bound_); + if (Valid() && user_comparator_->Equal(*iterate_upper_bound_, key())) { + ReleaseTempPinnedData(); + PrevInternal(); + } + return; + } + // Don't use iter_::Seek() if we set a prefix extractor // because prefix seek will be used. - if (prefix_extractor_ != nullptr) { + if (prefix_extractor_ != nullptr && !total_order_seek_) { max_skip_ = std::numeric_limits::max(); } + status_ = Status::OK(); direction_ = kReverse; ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); @@ -1308,21 +1408,7 @@ void DBIter::SeekToLast() { iter_->SeekToLast(); range_del_agg_.InvalidateTombstoneMapPositions(); } - // When the iterate_upper_bound is set to a value, - // it will seek to the last key before the - // ReadOptions.iterate_upper_bound - if (iter_->Valid() && iterate_upper_bound_ != nullptr) { - SeekForPrev(*iterate_upper_bound_); - range_del_agg_.InvalidateTombstoneMapPositions(); - if (!Valid()) { - return; - } else if (user_comparator_->Equal(*iterate_upper_bound_, key())) { - ReleaseTempPinnedData(); - PrevInternal(); - } - } else { - PrevInternal(); - } + PrevInternal(); if (statistics_ != nullptr) { RecordTick(statistics_, NUMBER_DB_SEEK); if (valid_) { diff --git a/db/db_iter_stress_test.cc b/db/db_iter_stress_test.cc new file mode 100644 index 000000000..f964bbad1 --- /dev/null +++ b/db/db_iter_stress_test.cc @@ -0,0 +1,652 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_iter.h" +#include "db/dbformat.h" +#include "rocksdb/comparator.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/testharness.h" +#include "utilities/merge_operators.h" + +#ifdef GFLAGS + +#include "util/gflags_compat.h" + +using GFLAGS_NAMESPACE::ParseCommandLineFlags; + +DEFINE_bool(verbose, false, + "Print huge, detailed trace. Intended for debugging failures."); + +#else + +void ParseCommandLineFlags(int*, char***, bool) {} +bool FLAGS_verbose = false; + +#endif + +namespace rocksdb { + +class DBIteratorStressTest : public testing::Test { + public: + Env* env_; + + DBIteratorStressTest() : env_(Env::Default()) {} +}; + +namespace { + +struct Entry { + std::string key; + ValueType type; // kTypeValue, kTypeDeletion, kTypeMerge + uint64_t sequence; + std::string ikey; // internal key, made from `key`, `sequence` and `type` + std::string value; + // If false, we'll pretend that this entry doesn't exist. + bool visible = true; + + bool operator<(const Entry& e) const { + if (key != e.key) return key < e.key; + return std::tie(sequence, type) > std::tie(e.sequence, e.type); + } +}; + +struct Data { + std::vector entries; + + // Indices in `entries` with `visible` = false. + std::vector hidden; + // Keys of entries whose `visible` changed since the last seek of iterators. + std::set recently_touched_keys; +}; + +struct StressTestIterator : public InternalIterator { + Data* data; + Random64* rnd; + InternalKeyComparator cmp; + + // Each operation will return error with this probability... + double error_probability = 0; + // ... and add/remove entries with this probability. + double mutation_probability = 0; + // The probability of adding vs removing entries will be chosen so that the + // amount of removed entries stays somewhat close to this number. + double target_hidden_fraction = 0; + // If true, print all mutations to stdout for debugging. + bool trace = false; + + int iter = -1; + Status status_; + + StressTestIterator(Data* _data, Random64* _rnd, const Comparator* _cmp) + : data(_data), rnd(_rnd), cmp(_cmp) {} + + bool Valid() const override { + if (iter >= 0 && iter < (int)data->entries.size()) { + assert(status_.ok()); + return true; + } + return false; + } + + Status status() const override { return status_; } + + bool MaybeFail() { + if (rnd->Next() >= + std::numeric_limits::max() * error_probability) { + return false; + } + if (rnd->Next() % 2) { + status_ = Status::Incomplete("test"); + } else { + status_ = Status::IOError("test"); + } + if (trace) { + std::cout << "injecting " << status_.ToString() << std::endl; + } + iter = -1; + return true; + } + + void MaybeMutate() { + if (rnd->Next() >= + std::numeric_limits::max() * mutation_probability) { + return; + } + do { + // If too many entries are hidden, hide less, otherwise hide more. + double hide_probability = + data->hidden.size() > data->entries.size() * target_hidden_fraction + ? 1. / 3 + : 2. / 3; + if (data->hidden.empty()) { + hide_probability = 1; + } + bool do_hide = + rnd->Next() < std::numeric_limits::max() * hide_probability; + if (do_hide) { + // Hide a random entry. + size_t idx = rnd->Next() % data->entries.size(); + Entry& e = data->entries[idx]; + if (e.visible) { + if (trace) { + std::cout << "hiding idx " << idx << std::endl; + } + e.visible = false; + data->hidden.push_back(idx); + data->recently_touched_keys.insert(e.key); + } else { + // Already hidden. Let's go unhide something instead, just because + // it's easy and it doesn't really matter what we do. + do_hide = false; + } + } + if (!do_hide) { + // Unhide a random entry. + size_t hi = rnd->Next() % data->hidden.size(); + size_t idx = data->hidden[hi]; + if (trace) { + std::cout << "unhiding idx " << idx << std::endl; + } + Entry& e = data->entries[idx]; + assert(!e.visible); + e.visible = true; + data->hidden[hi] = data->hidden.back(); + data->hidden.pop_back(); + data->recently_touched_keys.insert(e.key); + } + } while (rnd->Next() % 3 != 0); // do 3 mutations on average + } + + void SkipForward() { + while (iter < (int)data->entries.size() && !data->entries[iter].visible) { + ++iter; + } + } + void SkipBackward() { + while (iter >= 0 && !data->entries[iter].visible) { + --iter; + } + } + + void SeekToFirst() { + if (MaybeFail()) return; + MaybeMutate(); + + status_ = Status::OK(); + iter = 0; + SkipForward(); + } + void SeekToLast() { + if (MaybeFail()) return; + MaybeMutate(); + + status_ = Status::OK(); + iter = (int)data->entries.size() - 1; + SkipBackward(); + } + + void Seek(const Slice& target) { + if (MaybeFail()) return; + MaybeMutate(); + + status_ = Status::OK(); + // Binary search. + auto it = std::partition_point( + data->entries.begin(), data->entries.end(), + [&](const Entry& e) { return cmp.Compare(e.ikey, target) < 0; }); + iter = (int)(it - data->entries.begin()); + SkipForward(); + } + void SeekForPrev(const Slice& target) { + if (MaybeFail()) return; + MaybeMutate(); + + status_ = Status::OK(); + // Binary search. + auto it = std::partition_point( + data->entries.begin(), data->entries.end(), + [&](const Entry& e) { return cmp.Compare(e.ikey, target) <= 0; }); + iter = (int)(it - data->entries.begin()); + --iter; + SkipBackward(); + } + + void Next() { + assert(Valid()); + if (MaybeFail()) return; + MaybeMutate(); + ++iter; + SkipForward(); + } + void Prev() { + assert(Valid()); + if (MaybeFail()) return; + MaybeMutate(); + --iter; + SkipBackward(); + } + + Slice key() const { + assert(Valid()); + return data->entries[iter].ikey; + } + Slice value() const { + assert(Valid()); + return data->entries[iter].value; + } + + bool IsKeyPinned() const override { return true; } + bool IsValuePinned() const override { return true; } +}; + +// A small reimplementation of DBIter, supporting only some of the features, +// and doing everything in O(log n). +// Skips all keys that are in recently_touched_keys. +struct ReferenceIterator { + Data* data; + uint64_t sequence; // ignore entries with sequence number below this + + bool valid = false; + std::string key; + std::string value; + + ReferenceIterator(Data* _data, uint64_t _sequence) + : data(_data), sequence(_sequence) {} + + bool Valid() const { return valid; } + + // Finds the first entry with key + // greater/less/greater-or-equal/less-or-equal than `key`, depending on + // arguments: if `skip`, inequality is strict; if `forward`, it's + // greater/greater-or-equal, otherwise less/less-or-equal. + // Sets `key` to the result. + // If no such key exists, returns false. Doesn't check `visible`. + bool FindNextKey(bool skip, bool forward) { + valid = false; + auto it = std::partition_point(data->entries.begin(), data->entries.end(), + [&](const Entry& e) { + if (forward != skip) { + return e.key < key; + } else { + return e.key <= key; + } + }); + if (forward) { + if (it != data->entries.end()) { + key = it->key; + return true; + } + } else { + if (it != data->entries.begin()) { + --it; + key = it->key; + return true; + } + } + return false; + } + + bool FindValueForCurrentKey() { + if (data->recently_touched_keys.count(key)) { + return false; + } + + // Find the first entry for the key. The caller promises that it exists. + auto it = std::partition_point(data->entries.begin(), data->entries.end(), + [&](const Entry& e) { + if (e.key != key) { + return e.key < key; + } + return e.sequence > sequence; + }); + + // Find the first visible entry. + for (;; ++it) { + if (it == data->entries.end()) { + return false; + } + Entry& e = *it; + if (e.key != key) { + return false; + } + assert(e.sequence <= sequence); + if (!e.visible) continue; + if (e.type == kTypeDeletion) { + return false; + } + if (e.type == kTypeValue) { + value = e.value; + valid = true; + return true; + } + assert(e.type == kTypeMerge); + break; + } + + // Collect merge operands. + std::vector operands; + for (; it != data->entries.end(); ++it) { + Entry& e = *it; + if (e.key != key) { + break; + } + assert(e.sequence <= sequence); + if (!e.visible) continue; + if (e.type == kTypeDeletion) { + break; + } + operands.push_back(e.value); + if (e.type == kTypeValue) { + break; + } + } + + // Do a merge. + value = operands.back().ToString(); + for (int i = (int)operands.size() - 2; i >= 0; --i) { + value.append(","); + value.append(operands[i].data(), operands[i].size()); + } + + valid = true; + return true; + } + + // Start at `key` and move until we encounter a valid value. + // `forward` defines the direction of movement. + // If `skip` is true, we're looking for key not equal to `key`. + void DoTheThing(bool skip, bool forward) { + while (FindNextKey(skip, forward) && !FindValueForCurrentKey()) { + skip = true; + } + } + + void Seek(const Slice& target) { + key = target.ToString(); + DoTheThing(false, true); + } + void SeekForPrev(const Slice& target) { + key = target.ToString(); + DoTheThing(false, false); + } + void SeekToFirst() { Seek(""); } + void SeekToLast() { + key = data->entries.back().key; + DoTheThing(false, false); + } + void Next() { + assert(Valid()); + DoTheThing(true, true); + } + void Prev() { + assert(Valid()); + DoTheThing(true, false); + } +}; + +} // namespace + +// Use an internal iterator that sometimes returns errors and sometimes +// adds/removes entries on the fly. Do random operations on a DBIter and +// check results. +// TODO: can be improved for more coverage: +// * Override IsKeyPinned() and IsValuePinned() to actually use +// PinnedIteratorManager and check that there's no use-after free. +// * Try different combinations of prefix_extractor, total_order_seek, +// prefix_same_as_start, iterate_lower_bound, iterate_upper_bound. +TEST_F(DBIteratorStressTest, StressTest) { + // We use a deterministic RNG, and everything happens in a single thread. + Random64 rnd(826909345792864532ll); + + auto gen_key = [&](int max_key) { + int len = 0; + int a = max_key; + while (a) { + a /= 10; + ++len; + } + std::string s = ToString(rnd.Next() % (uint64_t)max_key); + s.insert(0, len - (int)s.size(), '0'); + return s; + }; + + Options options; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + ReadOptions ropt; + + size_t num_matching = 0; + size_t num_at_end = 0; + size_t num_not_ok = 0; + size_t num_recently_removed = 0; + + // Number of iterations for each combination of parameters + // (there are ~250 of those). + // Tweak this to change the test run time. + // As of the time of writing, the test takes ~4 seconds for value of 5000. + const int num_iterations = 5000; + // Enable this to print all the operations for debugging. + bool trace = FLAGS_verbose; + + for (int num_entries : {5, 10, 100}) { + for (double key_space : {0.1, 1.0, 3.0}) { + for (ValueType prevalent_entry_type : + {kTypeValue, kTypeDeletion, kTypeMerge}) { + for (double error_probability : {0.01, 0.1}) { + for (double mutation_probability : {0.01, 0.5}) { + for (double target_hidden_fraction : {0.1, 0.5}) { + std::string trace_str = + "entries: " + ToString(num_entries) + + ", key_space: " + ToString(key_space) + + ", error_probability: " + ToString(error_probability) + + ", mutation_probability: " + ToString(mutation_probability) + + ", target_hidden_fraction: " + + ToString(target_hidden_fraction); + SCOPED_TRACE(trace_str); + if (trace) { + std::cout << trace_str << std::endl; + } + + // Generate data. + Data data; + int max_key = (int)(num_entries * key_space) + 1; + for (int i = 0; i < num_entries; ++i) { + Entry e; + e.key = gen_key(max_key); + if (rnd.Next() % 10 != 0) { + e.type = prevalent_entry_type; + } else { + const ValueType types[] = {kTypeValue, kTypeDeletion, + kTypeMerge}; + e.type = + types[rnd.Next() % (sizeof(types) / sizeof(types[0]))]; + } + e.sequence = i; + e.value = "v" + ToString(i); + ParsedInternalKey internal_key(e.key, e.sequence, e.type); + AppendInternalKey(&e.ikey, internal_key); + + data.entries.push_back(e); + } + std::sort(data.entries.begin(), data.entries.end()); + if (trace) { + std::cout << "entries:"; + for (size_t i = 0; i < data.entries.size(); ++i) { + Entry& e = data.entries[i]; + std::cout + << "\n idx " << i << ": \"" << e.key << "\": \"" + << e.value << "\" seq: " << e.sequence << " type: " + << (e.type == kTypeValue + ? "val" + : e.type == kTypeDeletion ? "del" : "merge"); + } + std::cout << std::endl; + } + + std::unique_ptr db_iter; + std::unique_ptr ref_iter; + for (int iteration = 0; iteration < num_iterations; ++iteration) { + SCOPED_TRACE(iteration); + // Create a new iterator every ~30 operations. + if (db_iter == nullptr || rnd.Next() % 30 == 0) { + uint64_t sequence = rnd.Next() % (data.entries.size() + 2); + ref_iter.reset(new ReferenceIterator(&data, sequence)); + if (trace) { + std::cout << "new iterator, seq: " << sequence << std::endl; + } + + auto internal_iter = + new StressTestIterator(&data, &rnd, BytewiseComparator()); + internal_iter->error_probability = error_probability; + internal_iter->mutation_probability = mutation_probability; + internal_iter->target_hidden_fraction = + target_hidden_fraction; + internal_iter->trace = trace; + db_iter.reset(NewDBIterator( + env_, ropt, ImmutableCFOptions(options), + BytewiseComparator(), internal_iter, sequence, + options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); + } + + // Do a random operation. It's important to do it on ref_it + // later than on db_iter to make sure ref_it sees the correct + // recently_touched_keys. + std::string old_key; + bool forward = rnd.Next() % 2 > 0; + // Do Next()/Prev() ~90% of the time. + bool seek = !ref_iter->Valid() || rnd.Next() % 10 == 0; + if (trace) { + std::cout << iteration << ": "; + } + + if (!seek) { + assert(db_iter->Valid()); + old_key = ref_iter->key; + if (trace) { + std::cout << (forward ? "Next" : "Prev") << std::endl; + } + + if (forward) { + db_iter->Next(); + ref_iter->Next(); + } else { + db_iter->Prev(); + ref_iter->Prev(); + } + } else { + data.recently_touched_keys.clear(); + // Do SeekToFirst less often than Seek. + if (rnd.Next() % 4 == 0) { + if (trace) { + std::cout << (forward ? "SeekToFirst" : "SeekToLast") + << std::endl; + } + + if (forward) { + old_key = ""; + db_iter->SeekToFirst(); + ref_iter->SeekToFirst(); + } else { + old_key = data.entries.back().key; + db_iter->SeekToLast(); + ref_iter->SeekToLast(); + } + } else { + old_key = gen_key(max_key); + if (trace) { + std::cout << (forward ? "Seek" : "SeekForPrev") << " \"" + << old_key << '"' << std::endl; + } + if (forward) { + db_iter->Seek(old_key); + ref_iter->Seek(old_key); + } else { + db_iter->SeekForPrev(old_key); + ref_iter->SeekForPrev(old_key); + } + } + } + + // Check the result. + if (db_iter->Valid()) { + ASSERT_TRUE(db_iter->status().ok()); + if (data.recently_touched_keys.count( + db_iter->key().ToString())) { + // Ended on a key that may have been mutated during the + // operation. Reference iterator skips such keys, so we + // can't check the exact result. + + // Check that the key moved in the right direction. + if (forward) { + if (seek) + ASSERT_GE(db_iter->key().ToString(), old_key); + else + ASSERT_GT(db_iter->key().ToString(), old_key); + } else { + if (seek) + ASSERT_LE(db_iter->key().ToString(), old_key); + else + ASSERT_LT(db_iter->key().ToString(), old_key); + } + + if (ref_iter->Valid()) { + // Check that DBIter didn't miss any non-mutated key. + if (forward) { + ASSERT_LT(db_iter->key().ToString(), ref_iter->key); + } else { + ASSERT_GT(db_iter->key().ToString(), ref_iter->key); + } + } + // Tell the next iteration of the loop to reseek the + // iterators. + ref_iter->valid = false; + + ++num_recently_removed; + } else { + ASSERT_TRUE(ref_iter->Valid()); + ASSERT_EQ(ref_iter->key, db_iter->key().ToString()); + ASSERT_EQ(ref_iter->value, db_iter->value()); + ++num_matching; + } + } else if (db_iter->status().ok()) { + ASSERT_FALSE(ref_iter->Valid()); + ++num_at_end; + } else { + // Non-ok status. Nothing to check here. + // Tell the next iteration of the loop to reseek the + // iterators. + ref_iter->valid = false; + ++num_not_ok; + } + } + } + } + } + } + } + } + + // Check that all cases were hit many times. + EXPECT_GT(num_matching, 10000); + EXPECT_GT(num_at_end, 10000); + EXPECT_GT(num_not_ok, 10000); + EXPECT_GT(num_recently_removed, 10000); + + std::cout << "stats:\n exact matches: " << num_matching + << "\n end reached: " << num_at_end + << "\n non-ok status: " << num_not_ok + << "\n mutated on the fly: " << num_recently_removed << std::endl; +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + ParseCommandLineFlags(&argc, &argv, true); + return RUN_ALL_TESTS(); +} diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 0143482bf..4af678b4a 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -84,6 +84,36 @@ class TestIterator : public InternalIterator { }); } + // Removes the key from the set of keys over which this iterator iterates. + // Not to be confused with AddDeletion(). + // If the iterator is currently positioned on this key, the deletion will + // apply next time the iterator moves. + // Used for simulating ForwardIterator updating to a new version that doesn't + // have some of the keys (e.g. after compaction with a filter). + void Vanish(std::string _key) { + if (valid_ && data_[iter_].first == _key) { + delete_current_ = true; + return; + } + for (auto it = data_.begin(); it != data_.end(); ++it) { + ParsedInternalKey ikey; + bool ok __attribute__((__unused__)) = ParseInternalKey(it->first, &ikey); + assert(ok); + if (ikey.user_key != _key) { + continue; + } + if (valid_ && data_.begin() + iter_ > it) { + --iter_; + } + data_.erase(it); + return; + } + assert(false); + } + + // Number of operations done on this iterator since construction. + size_t steps() const { return steps_; } + virtual bool Valid() const override { assert(initialized_); return valid_; @@ -91,12 +121,16 @@ class TestIterator : public InternalIterator { virtual void SeekToFirst() override { assert(initialized_); + ++steps_; + DeleteCurrentIfNeeded(); valid_ = (data_.size() > 0); iter_ = 0; } virtual void SeekToLast() override { assert(initialized_); + ++steps_; + DeleteCurrentIfNeeded(); valid_ = (data_.size() > 0); iter_ = data_.size() - 1; } @@ -104,6 +138,7 @@ class TestIterator : public InternalIterator { virtual void Seek(const Slice& target) override { assert(initialized_); SeekToFirst(); + ++steps_; if (!valid_) { return; } @@ -119,20 +154,31 @@ class TestIterator : public InternalIterator { virtual void SeekForPrev(const Slice& target) override { assert(initialized_); + DeleteCurrentIfNeeded(); SeekForPrevImpl(target, &cmp); } virtual void Next() override { assert(initialized_); - if (data_.empty() || (iter_ == data_.size() - 1)) { - valid_ = false; + assert(valid_); + assert(iter_ < data_.size()); + + ++steps_; + if (delete_current_) { + DeleteCurrentIfNeeded(); } else { ++iter_; } + valid_ = iter_ < data_.size(); } virtual void Prev() override { assert(initialized_); + assert(valid_); + assert(iter_ < data_.size()); + + ++steps_; + DeleteCurrentIfNeeded(); if (iter_ == 0) { valid_ = false; } else { @@ -163,9 +209,19 @@ class TestIterator : public InternalIterator { bool valid_; size_t sequence_number_; size_t iter_; + size_t steps_ = 0; InternalKeyComparator cmp; std::vector> data_; + bool delete_current_ = false; + + void DeleteCurrentIfNeeded() { + if (!delete_current_) { + return; + } + data_.erase(data_.begin() + iter_); + delete_current_ = false; + } }; class DBIteratorTest : public testing::Test { @@ -3018,6 +3074,41 @@ TEST_F(DBIteratorTest, SeekLessLowerBound) { ASSERT_EQ(lower_bound_str, db_iter->key().ToString()); } +TEST_F(DBIteratorTest, ReverseToForwardWithDisappearingKeys) { + Options options; + options.prefix_extractor.reset(NewCappedPrefixTransform(0)); + + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "A"); + internal_iter->AddPut("b", "B"); + for (int i = 0; i < 100; ++i) { + internal_iter->AddPut("c" + ToString(i), ""); + } + internal_iter->Finish(); + + std::unique_ptr db_iter(NewDBIterator( + env_, ReadOptions(), ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 10, options.max_sequential_skip_in_iterations, + nullptr /*read_callback*/)); + + db_iter->SeekForPrev("a"); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_OK(db_iter->status()); + ASSERT_EQ("a", db_iter->key().ToString()); + + internal_iter->Vanish("a"); + db_iter->Next(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_OK(db_iter->status()); + ASSERT_EQ("b", db_iter->key().ToString()); + + // A (sort of) bug used to cause DBIter to pointlessly drag the internal + // iterator all the way to the end. But this doesn't really matter at the time + // of writing because the only iterator that can see disappearing keys is + // ForwardIterator, which doesn't support SeekForPrev(). + EXPECT_LT(internal_iter->steps(), 20); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 24dbac41b..dd2d1ce49 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -2232,6 +2232,46 @@ TEST_P(DBIteratorTest, SeekAfterHittingManyInternalKeys) { ASSERT_EQ(iter2->value().ToString(), "val_6"); } +// Reproduces a former bug where iterator would skip some records when DBIter +// re-seeks subiterator with Incomplete status. +TEST_P(DBIteratorTest, NonBlockingIterationBugRepro) { + Options options = CurrentOptions(); + BlockBasedTableOptions table_options; + // Make sure the sst file has more than one block. + table_options.flush_block_policy_factory = + std::make_shared(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + // Two records in sst file, each in its own block. + Put("b", ""); + Put("d", ""); + Flush(); + + // Create a nonblocking iterator before writing to memtable. + ReadOptions ropt; + ropt.read_tier = kBlockCacheTier; + unique_ptr iter(NewIterator(ropt)); + + // Overwrite a key in memtable many times to hit + // max_sequential_skip_in_iterations (which is 8 by default). + for (int i = 0; i < 20; ++i) { + Put("c", ""); + } + + // Load the second block in sst file into the block cache. + { + unique_ptr iter2(NewIterator(ReadOptions())); + iter2->Seek("d"); + } + + // Finally seek the nonblocking iterator. + iter->Seek("a"); + // With the bug, the status used to be OK, and the iterator used to point to + // "d". + EXPECT_TRUE(iter->status().IsIncomplete()); +} + INSTANTIATE_TEST_CASE_P(DBIteratorTestInstance, DBIteratorTest, testing::Values(true, false)); diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 471d7c0b7..1401bb0ab 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -27,7 +27,7 @@ namespace rocksdb { // Usage: // ForwardLevelIterator iter; // iter.SetFileIndex(file_index); -// iter.Seek(target); +// iter.Seek(target); // or iter.SeekToFirst(); // iter.Next() class ForwardLevelIterator : public InternalIterator { public: @@ -53,11 +53,11 @@ class ForwardLevelIterator : public InternalIterator { void SetFileIndex(uint32_t file_index) { assert(file_index < files_.size()); + status_ = Status::OK(); if (file_index != file_index_) { file_index_ = file_index; Reset(); } - valid_ = false; } void Reset() { assert(file_index_ < files_.size()); @@ -77,10 +77,10 @@ class ForwardLevelIterator : public InternalIterator { read_options_.ignore_range_deletions ? nullptr : &range_del_agg, nullptr /* table_reader_ptr */, nullptr, false); file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); + valid_ = false; if (!range_del_agg.IsEmpty()) { status_ = Status::NotSupported( "Range tombstones unsupported with ForwardIterator"); - valid_ = false; } } void SeekToLast() override { @@ -95,12 +95,27 @@ class ForwardLevelIterator : public InternalIterator { return valid_; } void SeekToFirst() override { - SetFileIndex(0); + assert(file_iter_ != nullptr); + if (!status_.ok()) { + assert(!valid_); + return; + } file_iter_->SeekToFirst(); valid_ = file_iter_->Valid(); } void Seek(const Slice& internal_key) override { assert(file_iter_ != nullptr); + + // This deviates from the usual convention for InternalIterator::Seek() in + // that it doesn't discard pre-existing error status. That's because this + // Seek() is only supposed to be called immediately after SetFileIndex() + // (which discards pre-existing error status), and SetFileIndex() may set + // an error status, which we shouldn't discard. + if (!status_.ok()) { + assert(!valid_); + return; + } + file_iter_->Seek(internal_key); valid_ = file_iter_->Valid(); } @@ -112,8 +127,12 @@ class ForwardLevelIterator : public InternalIterator { assert(valid_); file_iter_->Next(); for (;;) { - if (file_iter_->status().IsIncomplete() || file_iter_->Valid()) { - valid_ = !file_iter_->status().IsIncomplete(); + valid_ = file_iter_->Valid(); + if (!file_iter_->status().ok()) { + assert(!valid_); + return; + } + if (valid_) { return; } if (file_index_ + 1 >= files_.size()) { @@ -121,6 +140,10 @@ class ForwardLevelIterator : public InternalIterator { return; } SetFileIndex(file_index_ + 1); + if (!status_.ok()) { + assert(!valid_); + return; + } file_iter_->SeekToFirst(); } } @@ -135,7 +158,7 @@ class ForwardLevelIterator : public InternalIterator { Status status() const override { if (!status_.ok()) { return status_; - } else if (file_iter_ && !file_iter_->status().ok()) { + } else if (file_iter_) { return file_iter_->status(); } return Status::OK(); @@ -299,9 +322,6 @@ bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const { } void ForwardIterator::Seek(const Slice& internal_key) { - if (IsOverUpperBound(internal_key)) { - valid_ = false; - } if (sv_ == nullptr) { RebuildIterators(true); } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) { @@ -605,7 +625,9 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { if ((read_options_.iterate_upper_bound != nullptr) && cfd_->internal_comparator().user_comparator()->Compare( l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) { - has_iter_trimmed_for_upper_bound_ = true; + // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator + // will never be interested in files with smallest key above + // iterate_upper_bound, since iterate_upper_bound can't be changed. l0_iters_.push_back(nullptr); continue; } @@ -773,7 +795,7 @@ void ForwardIterator::UpdateCurrent() { current_ = mutable_iter_; } } - valid_ = (current_ != nullptr); + valid_ = current_ != nullptr && immutable_status_.ok(); if (!status_.ok()) { status_ = Status::OK(); } diff --git a/db/managed_iterator.cc b/db/managed_iterator.cc index c393eb5a6..9831eb80a 100644 --- a/db/managed_iterator.cc +++ b/db/managed_iterator.cc @@ -101,9 +101,7 @@ void ManagedIterator::SeekToLast() { } assert(mutable_iter_ != nullptr); mutable_iter_->SeekToLast(); - if (mutable_iter_->status().ok()) { - UpdateCurrent(); - } + UpdateCurrent(); } void ManagedIterator::SeekToFirst() { @@ -146,27 +144,13 @@ void ManagedIterator::Prev() { } MILock l(&in_use_, this); if (NeedToRebuild()) { - std::string current_key = key().ToString(); - Slice old_key(current_key); - RebuildIterator(); - SeekInternal(old_key, false); - UpdateCurrent(); + RebuildIterator(true); if (!valid_) { return; } - if (key().compare(old_key) != 0) { - valid_ = false; - status_ = Status::Incomplete("Cannot do Prev now"); - return; - } } mutable_iter_->Prev(); - if (mutable_iter_->status().ok()) { - UpdateCurrent(); - status_ = Status::OK(); - } else { - status_ = mutable_iter_->status(); - } + UpdateCurrent(); } void ManagedIterator::Next() { @@ -176,19 +160,10 @@ void ManagedIterator::Next() { } MILock l(&in_use_, this); if (NeedToRebuild()) { - std::string current_key = key().ToString(); - Slice old_key(current_key.data(), cached_key_.Size()); - RebuildIterator(); - SeekInternal(old_key, false); - UpdateCurrent(); + RebuildIterator(true); if (!valid_) { return; } - if (key().compare(old_key) != 0) { - valid_ = false; - status_ = Status::Incomplete("Cannot do Next now"); - return; - } } mutable_iter_->Next(); UpdateCurrent(); @@ -206,21 +181,38 @@ Slice ManagedIterator::value() const { Status ManagedIterator::status() const { return status_; } -void ManagedIterator::RebuildIterator() { +void ManagedIterator::RebuildIterator(bool reseek) { + std::string current_key; + if (reseek) { + current_key = key().ToString(); + } + svnum_ = cfd_->GetSuperVersionNumber(); mutable_iter_ = unique_ptr(db_->NewIterator(read_options_, &cfh_)); + + if (reseek) { + Slice old_key(current_key.data(), current_key.size()); + SeekInternal(old_key, false); + UpdateCurrent(); + if (!valid_ || key().compare(old_key) != 0) { + valid_ = false; + status_ = Status::Incomplete( + "Next/Prev failed because current key has " + "been removed"); + } + } } void ManagedIterator::UpdateCurrent() { assert(mutable_iter_ != nullptr); valid_ = mutable_iter_->Valid(); + status_ = mutable_iter_->status(); + if (!valid_) { - status_ = mutable_iter_->status(); return; } - status_ = Status::OK(); cached_key_.SetUserKey(mutable_iter_->key()); cached_value_.SetUserKey(mutable_iter_->value()); } diff --git a/db/managed_iterator.h b/db/managed_iterator.h index 8e962f781..bd125fa32 100644 --- a/db/managed_iterator.h +++ b/db/managed_iterator.h @@ -54,7 +54,7 @@ class ManagedIterator : public Iterator { } private: - void RebuildIterator(); + void RebuildIterator(bool reseek = false); void UpdateCurrent(); void SeekInternal(const Slice& user_key, bool seek_to_first); bool NeedToRebuild(); diff --git a/db/version_set.cc b/db/version_set.cc index 7cceb5860..d1a63bce6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -502,13 +502,7 @@ class LevelIterator final : public InternalIterator { return file_iter_.value(); } virtual Status status() const override { - // It'd be nice if status() returned a const Status& instead of a Status - if (!status_.ok()) { - return status_; - } else if (file_iter_.iter() != nullptr) { - return file_iter_.status(); - } - return Status::OK(); + return file_iter_.iter() ? file_iter_.status() : Status::OK(); } virtual void SetPinnedItersMgr( PinnedIteratorsManager* pinned_iters_mgr) override { @@ -573,7 +567,6 @@ class LevelIterator final : public InternalIterator { RangeDelAggregator* range_del_agg_; IteratorWrapper file_iter_; // May be nullptr PinnedIteratorsManager* pinned_iters_mgr_; - Status status_; }; void LevelIterator::Seek(const Slice& target) { @@ -628,16 +621,9 @@ void LevelIterator::Prev() { } void LevelIterator::SkipEmptyFileForward() { - // For an error (IO error, checksum mismatch, etc), we skip the file - // and move to the next one and continue reading data. - // TODO this behavior is from LevelDB. We should revisit it. while (file_iter_.iter() == nullptr || - (!file_iter_.Valid() && !file_iter_.status().IsIncomplete())) { - if (file_iter_.iter() != nullptr && !file_iter_.Valid() && - file_iter_.iter()->IsOutOfBound()) { - return; - } - + (!file_iter_.Valid() && file_iter_.status().ok() && + !file_iter_.iter()->IsOutOfBound())) { // Move to next file if (file_index_ >= flevel_->num_files - 1) { // Already at the last file @@ -657,7 +643,7 @@ void LevelIterator::SkipEmptyFileForward() { void LevelIterator::SkipEmptyFileBackward() { while (file_iter_.iter() == nullptr || - (!file_iter_.Valid() && !file_iter_.status().IsIncomplete())) { + (!file_iter_.Valid() && file_iter_.status().ok())) { // Move to previous file if (file_index_ == 0) { // Already the first file @@ -672,13 +658,6 @@ void LevelIterator::SkipEmptyFileBackward() { } void LevelIterator::SetFileIterator(InternalIterator* iter) { - if (file_iter_.iter() != nullptr && status_.ok()) { - // TODO right now we don't invalidate the iterator even if the status is - // not OK. We should consider to do that so that it is harder for users to - // skip errors. - status_ = file_iter_.status(); - } - if (pinned_iters_mgr_ && iter) { iter->SetPinnedItersMgr(pinned_iters_mgr_); } diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 4be77e962..4475eb396 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -33,6 +33,7 @@ class Iterator : public Cleanable { // An iterator is either positioned at a key/value pair, or // not valid. This method returns true iff the iterator is valid. + // Always returns false if !status().ok(). virtual bool Valid() const = 0; // Position at the first key in the source. The iterator is Valid() @@ -46,6 +47,9 @@ class Iterator : public Cleanable { // Position at the first key in the source that at or past target. // The iterator is Valid() after this call iff the source contains // an entry that comes at or past target. + // All Seek*() methods clear any error status() that the iterator had prior to + // the call; after the seek, status() indicates only the error (if any) that + // happened during the seek, not any past errors. virtual void Seek(const Slice& target) = 0; // Position at the last key in the source that at or before target. @@ -72,7 +76,7 @@ class Iterator : public Cleanable { // Return the value for the current entry. The underlying storage for // the returned slice is valid only until the next modification of // the iterator. - // REQUIRES: !AtEnd() && !AtStart() + // REQUIRES: Valid() virtual Slice value() const = 0; // If an error has occurred, return it. Else return an ok status. diff --git a/src.mk b/src.mk index 1a1a7b58c..592ad94ca 100644 --- a/src.mk +++ b/src.mk @@ -267,6 +267,7 @@ MAIN_SOURCES = \ db/db_inplace_update_test.cc \ db/db_io_failure_test.cc \ db/db_iter_test.cc \ + db/db_iter_stress_test.cc \ db/db_iterator_test.cc \ db/db_log_iter_test.cc \ db/db_memtable_test.cc \ diff --git a/table/block.cc b/table/block.cc index ae82ae337..ce45f4316 100644 --- a/table/block.cc +++ b/table/block.cc @@ -429,12 +429,13 @@ BlockIter* Block::NewIterator(const Comparator* cmp, BlockIter* iter, ret_iter = new BlockIter; } if (size_ < 2*sizeof(uint32_t)) { - ret_iter->SetStatus(Status::Corruption("bad block contents")); + ret_iter->Invalidate(Status::Corruption("bad block contents")); return ret_iter; } const uint32_t num_restarts = NumRestarts(); if (num_restarts == 0) { - ret_iter->SetStatus(Status::OK()); + // Empty block. + ret_iter->Invalidate(Status::OK()); return ret_iter; } else { BlockPrefixIndex* prefix_index_ptr = diff --git a/table/block.h b/table/block.h index 120bcbdcc..6cc304cdc 100644 --- a/table/block.h +++ b/table/block.h @@ -241,8 +241,21 @@ class BlockIter final : public InternalIterator { last_bitmap_offset_ = current_ + 1; } - void SetStatus(Status s) { + // Makes Valid() return false, status() return `s`, and Seek()/Prev()/etc do + // nothing. + void Invalidate(Status s) { + // Assert that the BlockIter is never deleted while Pinning is Enabled. + assert(!pinned_iters_mgr_ || + (pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled())); + + data_ = nullptr; + current_ = restarts_; status_ = s; + + // Clear prev entries cache. + prev_entries_keys_buff_.clear(); + prev_entries_.clear(); + prev_entries_idx_ = -1; } virtual bool Valid() const override { return current_ < restarts_; } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index dacff6a73..e3c218474 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1391,7 +1391,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator( if (cache_handle == nullptr && no_io) { if (input_iter != nullptr) { - input_iter->SetStatus(Status::Incomplete("no blocking io")); + input_iter->Invalidate(Status::Incomplete("no blocking io")); return input_iter; } else { return NewErrorInternalIterator(Status::Incomplete("no blocking io")); @@ -1438,7 +1438,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator( RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); // make sure if something goes wrong, index_reader shall remain intact. if (input_iter != nullptr) { - input_iter->SetStatus(s); + input_iter->Invalidate(s); return input_iter; } else { return NewErrorInternalIterator(s); @@ -1506,7 +1506,7 @@ BlockIter* BlockBasedTable::NewDataBlockIterator( if (s.ok() && block.value == nullptr) { if (no_io) { // Could not read from block_cache and can't do IO - iter->SetStatus(Status::Incomplete("no blocking io")); + iter->Invalidate(Status::Incomplete("no blocking io")); return iter; } std::unique_ptr block_value; @@ -1566,7 +1566,7 @@ BlockIter* BlockBasedTable::NewDataBlockIterator( } } else { assert(block.value == nullptr); - iter->SetStatus(s); + iter->Invalidate(s); } return iter; } @@ -1876,7 +1876,9 @@ void BlockBasedTableIterator::InitDataBlock() { BlockHandle data_block_handle; Slice handle_slice = index_iter_->value(); if (!block_iter_points_to_real_block_ || - handle_slice.compare(prev_index_value_) != 0) { + handle_slice.compare(prev_index_value_) != 0 || + // if previous attempt of reading the block missed cache, try again + data_block_iter_.status().IsIncomplete()) { if (block_iter_points_to_real_block_) { ResetDataIter(); } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index bb1f79774..3b4e49d76 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -528,11 +528,9 @@ class BlockBasedTableIterator : public InternalIterator { return data_block_iter_.value(); } Status status() const override { - // It'd be nice if status() returned a const Status& instead of a Status if (!index_iter_->status().ok()) { return index_iter_->status(); - } else if (block_iter_points_to_real_block_ && - !data_block_iter_.status().ok()) { + } else if (block_iter_points_to_real_block_) { return data_block_iter_.status(); } else { return Status::OK(); @@ -571,8 +569,7 @@ class BlockBasedTableIterator : public InternalIterator { if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) { data_block_iter_.DelegateCleanupsTo(pinned_iters_mgr_); } - data_block_iter_.~BlockIter(); - new (&data_block_iter_) BlockIter(); + data_block_iter_.Invalidate(Status::OK()); block_iter_points_to_real_block_ = false; } } diff --git a/table/cuckoo_table_reader.cc b/table/cuckoo_table_reader.cc index da93af458..d611352b2 100644 --- a/table/cuckoo_table_reader.cc +++ b/table/cuckoo_table_reader.cc @@ -206,7 +206,7 @@ class CuckooTableIterator : public InternalIterator { void Prev() override; Slice key() const override; Slice value() const override; - Status status() const override { return status_; } + Status status() const override { return Status::OK(); } void InitIfNeeded(); private: @@ -241,7 +241,6 @@ class CuckooTableIterator : public InternalIterator { void PrepareKVAtCurrIdx(); CuckooTableReader* reader_; bool initialized_; - Status status_; // Contains a map of keys to bucket_id sorted in key order. std::vector sorted_bucket_ids_; // We assume that the number of items can be stored in uint32 (4 Billion). diff --git a/table/internal_iterator.h b/table/internal_iterator.h index ff7b4d2cb..2fdb14c7f 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -22,6 +22,7 @@ class InternalIterator : public Cleanable { // An iterator is either positioned at a key/value pair, or // not valid. This method returns true iff the iterator is valid. + // Always returns false if !status().ok(). virtual bool Valid() const = 0; // Position at the first key in the source. The iterator is Valid() @@ -35,6 +36,9 @@ class InternalIterator : public Cleanable { // Position at the first key in the source that at or past target // The iterator is Valid() after this call iff the source contains // an entry that comes at or past target. + // All Seek*() methods clear any error status() that the iterator had prior to + // the call; after the seek, status() indicates only the error (if any) that + // happened during the seek, not any past errors. virtual void Seek(const Slice& target) = 0; // Position at the first key in the source that at or before target @@ -61,7 +65,7 @@ class InternalIterator : public Cleanable { // Return the value for the current entry. The underlying storage for // the returned slice is valid only until the next modification of // the iterator. - // REQUIRES: !AtEnd() && !AtStart() + // REQUIRES: Valid() virtual Slice value() const = 0; // If an error has occurred, return it. Else return an ok status. diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index f14acdb9b..5ddea2470 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -87,6 +87,7 @@ class IteratorWrapper { valid_ = iter_->Valid(); if (valid_) { key_ = iter_->key(); + assert(iter_->status().ok()); } } diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 99a867fe7..c2f486c2a 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -52,12 +52,21 @@ class MergingIterator : public InternalIterator { } for (auto& child : children_) { if (child.Valid()) { + assert(child.status().ok()); minHeap_.push(&child); + } else { + considerStatus(child.status()); } } current_ = CurrentForward(); } + void considerStatus(Status s) { + if (!s.ok() && status_.ok()) { + status_ = s; + } + } + virtual void AddIterator(InternalIterator* iter) { assert(direction_ == kForward); children_.emplace_back(iter); @@ -66,8 +75,11 @@ class MergingIterator : public InternalIterator { } auto new_wrapper = children_.back(); if (new_wrapper.Valid()) { + assert(new_wrapper.status().ok()); minHeap_.push(&new_wrapper); current_ = CurrentForward(); + } else { + considerStatus(new_wrapper.status()); } } @@ -77,14 +89,22 @@ class MergingIterator : public InternalIterator { } } - virtual bool Valid() const override { return (current_ != nullptr); } + virtual bool Valid() const override { + return current_ != nullptr && status_.ok(); + } + + virtual Status status() const override { return status_; } virtual void SeekToFirst() override { ClearHeaps(); + status_ = Status::OK(); for (auto& child : children_) { child.SeekToFirst(); if (child.Valid()) { + assert(child.status().ok()); minHeap_.push(&child); + } else { + considerStatus(child.status()); } } direction_ = kForward; @@ -94,10 +114,14 @@ class MergingIterator : public InternalIterator { virtual void SeekToLast() override { ClearHeaps(); InitMaxHeap(); + status_ = Status::OK(); for (auto& child : children_) { child.SeekToLast(); if (child.Valid()) { + assert(child.status().ok()); maxHeap_->push(&child); + } else { + considerStatus(child.status()); } } direction_ = kReverse; @@ -106,6 +130,7 @@ class MergingIterator : public InternalIterator { virtual void Seek(const Slice& target) override { ClearHeaps(); + status_ = Status::OK(); for (auto& child : children_) { { PERF_TIMER_GUARD(seek_child_seek_time); @@ -114,8 +139,11 @@ class MergingIterator : public InternalIterator { PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { + assert(child.status().ok()); PERF_TIMER_GUARD(seek_min_heap_time); minHeap_.push(&child); + } else { + considerStatus(child.status()); } } direction_ = kForward; @@ -128,6 +156,7 @@ class MergingIterator : public InternalIterator { virtual void SeekForPrev(const Slice& target) override { ClearHeaps(); InitMaxHeap(); + status_ = Status::OK(); for (auto& child : children_) { { @@ -137,8 +166,11 @@ class MergingIterator : public InternalIterator { PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { + assert(child.status().ok()); PERF_TIMER_GUARD(seek_max_heap_time); maxHeap_->push(&child); + } else { + considerStatus(child.status()); } } direction_ = kReverse; @@ -172,9 +204,11 @@ class MergingIterator : public InternalIterator { // current is still valid after the Next() call above. Call // replace_top() to restore the heap property. When the same child // iterator yields a sequence of keys, this is cheap. + assert(current_->status().ok()); minHeap_.replace_top(current_); } else { // current stopped being valid, remove it from the heap. + considerStatus(current_->status()); minHeap_.pop(); } current_ = CurrentForward(); @@ -191,28 +225,35 @@ class MergingIterator : public InternalIterator { // just after the if-block. ClearHeaps(); InitMaxHeap(); + Slice target = key(); for (auto& child : children_) { if (&child != current_) { if (!prefix_seek_mode_) { - child.Seek(key()); + child.Seek(target); if (child.Valid()) { // Child is at first entry >= key(). Step back one to be < key() TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); + assert(child.status().ok()); child.Prev(); } else { // Child has no entries >= key(). Position at last entry. TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast"); + considerStatus(child.status()); child.SeekToLast(); } + considerStatus(child.status()); } else { - child.SeekForPrev(key()); - if (child.Valid() && comparator_->Equal(key(), child.key())) { + child.SeekForPrev(target); + considerStatus(child.status()); + if (child.Valid() && comparator_->Equal(target, child.key())) { child.Prev(); + considerStatus(child.status()); } } } if (child.Valid()) { + assert(child.status().ok()); maxHeap_->push(&child); } } @@ -238,9 +279,11 @@ class MergingIterator : public InternalIterator { // current is still valid after the Prev() call above. Call // replace_top() to restore the heap property. When the same child // iterator yields a sequence of keys, this is cheap. + assert(current_->status().ok()); maxHeap_->replace_top(current_); } else { // current stopped being valid, remove it from the heap. + considerStatus(current_->status()); maxHeap_->pop(); } current_ = CurrentReverse(); @@ -256,17 +299,6 @@ class MergingIterator : public InternalIterator { return current_->value(); } - virtual Status status() const override { - Status s; - for (auto& child : children_) { - s = child.status(); - if (!s.ok()) { - break; - } - } - return s; - } - virtual void SetPinnedItersMgr( PinnedIteratorsManager* pinned_iters_mgr) override { pinned_iters_mgr_ = pinned_iters_mgr; @@ -302,6 +334,8 @@ class MergingIterator : public InternalIterator { // child iterators are valid. This is the top of minHeap_ or maxHeap_ // depending on the direction. IteratorWrapper* current_; + // If any of the children have non-ok status, this is one of them. + Status status_; // Which direction is the iterator moving? enum Direction { kForward, @@ -334,11 +368,14 @@ void MergingIterator::SwitchToForward() { // Otherwise, advance the non-current children. We advance current_ // just after the if-block. ClearHeaps(); + Slice target = key(); for (auto& child : children_) { if (&child != current_) { - child.Seek(key()); - if (child.Valid() && comparator_->Equal(key(), child.key())) { + child.Seek(target); + considerStatus(child.status()); + if (child.Valid() && comparator_->Equal(target, child.key())) { child.Next(); + considerStatus(child.status()); } } if (child.Valid()) { diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 476381cac..8408abfad 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -625,6 +625,7 @@ bool PlainTableIterator::Valid() const { } void PlainTableIterator::SeekToFirst() { + status_ = Status::OK(); next_offset_ = table_->data_start_offset_; if (next_offset_ >= table_->file_info_.data_end_offset) { next_offset_ = offset_ = table_->file_info_.data_end_offset; @@ -636,6 +637,7 @@ void PlainTableIterator::SeekToFirst() { void PlainTableIterator::SeekToLast() { assert(false); status_ = Status::NotSupported("SeekToLast() is not supported in PlainTable"); + next_offset_ = offset_ = table_->file_info_.data_end_offset; } void PlainTableIterator::Seek(const Slice& target) { @@ -676,6 +678,7 @@ void PlainTableIterator::Seek(const Slice& target) { if (!table_->IsTotalOrderMode()) { prefix_hash = GetSliceHash(prefix_slice); if (!table_->MatchBloom(prefix_hash)) { + status_ = Status::OK(); offset_ = next_offset_ = table_->file_info_.data_end_offset; return; } @@ -711,6 +714,7 @@ void PlainTableIterator::SeekForPrev(const Slice& /*target*/) { assert(false); status_ = Status::NotSupported("SeekForPrev() is not supported in PlainTable"); + offset_ = next_offset_ = table_->file_info_.data_end_offset; } void PlainTableIterator::Next() { diff --git a/table/table_test.cc b/table/table_test.cc index fd2866283..19bd8043d 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -256,7 +256,7 @@ class KeyConvertingIterator : public InternalIterator { delete iter_; } } - virtual bool Valid() const override { return iter_->Valid(); } + virtual bool Valid() const override { return iter_->Valid() && status_.ok(); } virtual void Seek(const Slice& target) override { ParsedInternalKey ikey(target, kMaxSequenceNumber, kTypeValue); std::string encoded; @@ -2368,6 +2368,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) { iter->Next(); } ASSERT_OK(iter->status()); + iter.reset(); const ImmutableCFOptions ioptions1(opt); ASSERT_OK(c.Reopen(ioptions1)); diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 97734b146..09e0e1ef1 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -47,8 +47,8 @@ class TwoLevelIterator : public InternalIterator { return second_level_iter_.value(); } virtual Status status() const override { - // It'd be nice if status() returned a const Status& instead of a Status if (!first_level_iter_.status().ok()) { + assert(second_level_iter_.iter() == nullptr); return first_level_iter_.status(); } else if (second_level_iter_.iter() != nullptr && !second_level_iter_.status().ok()) { @@ -101,7 +101,7 @@ void TwoLevelIterator::SeekForPrev(const Slice& target) { second_level_iter_.SeekForPrev(target); } if (!Valid()) { - if (!first_level_iter_.Valid()) { + if (!first_level_iter_.Valid() && first_level_iter_.status().ok()) { first_level_iter_.SeekToLast(); InitDataBlock(); if (second_level_iter_.iter() != nullptr) { @@ -144,8 +144,7 @@ void TwoLevelIterator::Prev() { void TwoLevelIterator::SkipEmptyDataBlocksForward() { while (second_level_iter_.iter() == nullptr || - (!second_level_iter_.Valid() && - !second_level_iter_.status().IsIncomplete())) { + (!second_level_iter_.Valid() && second_level_iter_.status().ok())) { // Move to next block if (!first_level_iter_.Valid()) { SetSecondLevelIterator(nullptr); @@ -161,8 +160,7 @@ void TwoLevelIterator::SkipEmptyDataBlocksForward() { void TwoLevelIterator::SkipEmptyDataBlocksBackward() { while (second_level_iter_.iter() == nullptr || - (!second_level_iter_.Valid() && - !second_level_iter_.status().IsIncomplete())) { + (!second_level_iter_.Valid() && second_level_iter_.status().ok())) { // Move to next block if (!first_level_iter_.Valid()) { SetSecondLevelIterator(nullptr); @@ -177,9 +175,6 @@ void TwoLevelIterator::SkipEmptyDataBlocksBackward() { } void TwoLevelIterator::SetSecondLevelIterator(InternalIterator* iter) { - if (second_level_iter_.iter() != nullptr) { - SaveError(second_level_iter_.status()); - } InternalIterator* old_iter = second_level_iter_.Set(iter); delete old_iter; } diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h index 5ead75dd7..1565c670b 100644 --- a/utilities/blob_db/blob_db_iterator.h +++ b/utilities/blob_db/blob_db_iterator.h @@ -119,6 +119,7 @@ class BlobDBIterator : public Iterator { TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1"); TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2"); value_.Reset(); + status_ = Status::OK(); if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) { Status s = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_); if (s.IsNotFound()) { diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 31f7b3993..2849e4bbf 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -79,6 +79,7 @@ class BaseDeltaIterator : public Iterator { void Next() override { if (!Valid()) { status_ = Status::NotSupported("Next() on invalid iterator"); + return; } if (!forward_) { @@ -114,6 +115,7 @@ class BaseDeltaIterator : public Iterator { void Prev() override { if (!Valid()) { status_ = Status::NotSupported("Prev() on invalid iterator"); + return; } if (forward_) { @@ -170,6 +172,21 @@ class BaseDeltaIterator : public Iterator { private: void AssertInvariants() { #ifndef NDEBUG + bool not_ok = false; + if (!base_iterator_->status().ok()) { + assert(!base_iterator_->Valid()); + not_ok = true; + } + if (!delta_iterator_->status().ok()) { + assert(!delta_iterator_->Valid()); + not_ok = true; + } + if (not_ok) { + assert(!Valid()); + assert(!status().ok()); + return; + } + if (!Valid()) { return; } @@ -238,13 +255,25 @@ class BaseDeltaIterator : public Iterator { void UpdateCurrent() { // Suppress false positive clang analyzer warnings. #ifndef __clang_analyzer__ + status_ = Status::OK(); while (true) { WriteEntry delta_entry; if (DeltaValid()) { + assert(delta_iterator_->status().ok()); delta_entry = delta_iterator_->Entry(); + } else if (!delta_iterator_->status().ok()) { + // Expose the error status and stop. + current_at_base_ = false; + return; } equal_keys_ = false; if (!BaseValid()) { + if (!base_iterator_->status().ok()) { + // Expose the error status and stop. + current_at_base_ = true; + return; + } + // Base has finished. if (!DeltaValid()) { // Finished