diff --git a/HISTORY.md b/HISTORY.md index 9b80a6da8..f1c64333e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,10 +12,12 @@ * Add new SetBufferSize API to WriteBufferManager to allow dynamic management of memory allotted to all write buffers. This allows user code to adjust memory monitoring provided by WriteBufferManager as process memory needs change datasets grow and shrink. * Clarified the required semantics of Read() functions in FileSystem and Env APIs. Please ensure any custom implementations are compliant. * For the new integrated BlobDB implementation, compaction statistics now include the amount of data read from blob files during compaction (due to garbage collection or compaction filters). Write amplification metrics have also been extended to account for data read from blob files. +* Add EqualWithoutTimestamp() to Comparator. ### New Features * Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision. * Add support to extend retrieval of checksums for blob files from the MANIFEST when checkpointing. During backup, rocksdb can detect corruption in blob files during file copies. +* Enable backward iteration on keys with user-defined timestamps. ## 6.18.0 (02/19/2021) ### Behavior Changes diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 2569514cd..ef50e1365 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -390,13 +390,11 @@ void CompactionIterator::NextFromInput() { // merge_helper_->compaction_filter_skip_until_. Slice skip_until; - int cmp_user_key_without_ts = 0; + bool user_key_equal_without_ts = false; int cmp_ts = 0; if (has_current_user_key_) { - cmp_user_key_without_ts = - timestamp_size_ - ? cmp_->CompareWithoutTimestamp(ikey_.user_key, current_user_key_) - : cmp_->Compare(ikey_.user_key, current_user_key_); + user_key_equal_without_ts = + cmp_->EqualWithoutTimestamp(ikey_.user_key, current_user_key_); // if timestamp_size_ > 0, then curr_ts_ has been initialized by a // previous key. cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp( @@ -409,7 +407,7 @@ void CompactionIterator::NextFromInput() { // Check whether the user key changed. After this if statement current_key_ // is a copy of the current input key (maybe converted to a delete by the // compaction filter). ikey_.user_key is pointing to the copy. - if (!has_current_user_key_ || cmp_user_key_without_ts != 0 || cmp_ts != 0) { + if (!has_current_user_key_ || !user_key_equal_without_ts || cmp_ts != 0) { // First occurrence of this user key // Copy key for output key_ = current_key_.SetInternalKey(key_, &ikey_); @@ -430,7 +428,7 @@ void CompactionIterator::NextFromInput() { // consider this key for GC, e.g. it may be dropped if certain conditions // match. if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ || - 0 != cmp_user_key_without_ts || cmp_with_history_ts_low_ >= 0) { + !user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0) { // Initialize for future comparison for rule (A) and etc. current_user_key_sequence_ = kMaxSequenceNumber; current_user_key_snapshot_ = 0; @@ -724,8 +722,7 @@ void CompactionIterator::NextFromInput() { input_->Valid() && (ParseInternalKey(input_->key(), &next_ikey, allow_data_in_errors_) .ok()) && - 0 == cmp_->CompareWithoutTimestamp(ikey_.user_key, - next_ikey.user_key) && + cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) && (prev_snapshot == 0 || DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) { input_->Next(); @@ -735,8 +732,7 @@ void CompactionIterator::NextFromInput() { if (input_->Valid() && (ParseInternalKey(input_->key(), &next_ikey, allow_data_in_errors_) .ok()) && - 0 == cmp_->CompareWithoutTimestamp(ikey_.user_key, - next_ikey.user_key)) { + cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { valid_ = true; at_next_ = true; } diff --git a/db/db_iter.cc b/db/db_iter.cc index 6d4d9bab3..e9480adb8 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -114,8 +114,7 @@ Status DBIter::GetProperty(std::string prop_name, std::string* prop) { } bool DBIter::ParseKey(ParsedInternalKey* ikey) { - Status s = - ParseInternalKey(iter_.key(), ikey, false /* log_err_key */); // TODO + Status s = ParseInternalKey(iter_.key(), ikey, false /* log_err_key */); if (!s.ok()) { status_ = Status::Corruption("In DBIter: ", s.getState()); valid_ = false; @@ -489,7 +488,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, &last_key, ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion)); } else { - const std::string kTsMin(timestamp_size_, static_cast(0)); + const std::string kTsMin(timestamp_size_, '\0'); AppendInternalKeyWithDifferentTimestamp( &last_key, ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion), @@ -635,13 +634,6 @@ bool DBIter::MergeValuesNewToOld() { } void DBIter::Prev() { - if (timestamp_size_ > 0) { - valid_ = false; - status_ = Status::NotSupported( - "SeekToLast/SeekForPrev/Prev currently not supported with timestamp."); - return; - } - assert(valid_); assert(status_.ok()); @@ -680,8 +672,14 @@ bool DBIter::ReverseToForward() { // If that's the case, seek iter_ to current key. if (!expect_total_order_inner_iter() || !iter_.Valid()) { IterKey last_key; - last_key.SetInternalKey(ParsedInternalKey( - saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); + ParsedInternalKey pikey(saved_key_.GetUserKey(), kMaxSequenceNumber, + kValueTypeForSeek); + if (timestamp_size_ > 0) { + // TODO: pre-create kTsMax. + const std::string kTsMax(timestamp_size_, '\xff'); + pikey.SetTimestamp(kTsMax); + } + last_key.SetInternalKey(pikey); iter_.Seek(last_key.GetInternalKey()); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); } @@ -760,11 +758,13 @@ void DBIter::PrevInternal(const Slice* prefix) { } assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() || - user_comparator_.Compare(saved_key_.GetUserKey(), - *iterate_lower_bound_) >= 0); + user_comparator_.CompareWithoutTimestamp( + saved_key_.GetUserKey(), /*a_has_ts=*/true, + *iterate_lower_bound_, /*b_has_ts=*/false) >= 0); if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() && - user_comparator_.Compare(saved_key_.GetUserKey(), - *iterate_lower_bound_) < 0) { + user_comparator_.CompareWithoutTimestamp( + saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_, + /*b_has_ts=*/false) < 0) { // We've iterated earlier than the user-specified lower bound. valid_ = false; return; @@ -809,8 +809,8 @@ bool DBIter::FindValueForCurrentKey() { assert(iter_.Valid()); merge_context_.Clear(); current_entry_is_merged_ = false; - // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or - // kTypeValue) + // last entry before merge (could be kTypeDeletion, + // kTypeDeletionWithTimestamp, kTypeSingleDeletion or kTypeValue) ValueType last_not_merge_type = kTypeDeletion; ValueType last_key_entry_type = kTypeDeletion; @@ -831,9 +831,13 @@ bool DBIter::FindValueForCurrentKey() { timestamp_size_); } if (!IsVisible(ikey.sequence, ts) || - !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) { + !user_comparator_.EqualWithoutTimestamp(ikey.user_key, + saved_key_.GetUserKey())) { break; } + if (!ts.empty()) { + saved_timestamp_.assign(ts.data(), ts.size()); + } if (TooManyInternalKeysSkipped()) { return false; } @@ -873,6 +877,7 @@ bool DBIter::FindValueForCurrentKey() { } break; case kTypeDeletion: + case kTypeDeletionWithTimestamp: case kTypeSingleDeletion: merge_context_.Clear(); last_not_merge_type = last_key_entry_type; @@ -916,6 +921,7 @@ bool DBIter::FindValueForCurrentKey() { is_blob_ = false; switch (last_key_entry_type) { case kTypeDeletion: + case kTypeDeletionWithTimestamp: case kTypeSingleDeletion: case kTypeRangeDeletion: valid_ = false; @@ -976,8 +982,17 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { // FindValueForCurrentKeyUsingSeek() assert(pinned_iters_mgr_.PinningEnabled()); std::string last_key; - AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(), - sequence_, kValueTypeForSeek)); + if (0 == timestamp_size_) { + AppendInternalKey(&last_key, + ParsedInternalKey(saved_key_.GetUserKey(), sequence_, + kValueTypeForSeek)); + } else { + AppendInternalKeyWithDifferentTimestamp( + &last_key, + ParsedInternalKey(saved_key_.GetUserKey(), sequence_, + kValueTypeForSeek), + *timestamp_ub_); + } iter_.Seek(last_key); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); @@ -1001,7 +1016,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { timestamp_size_); } - if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) { + if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key, + saved_key_.GetUserKey())) { // No visible values for this key, even though FindValueForCurrentKey() // has seen some. This is possible if we're using a tailing iterator, and // the entries were discarded in a compaction. @@ -1018,7 +1034,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || range_del_agg_.ShouldDelete( - ikey, RangeDelPositioningMode::kBackwardTraversal)) { + ikey, RangeDelPositioningMode::kBackwardTraversal) || + kTypeDeletionWithTimestamp == ikey.type) { valid_ = false; return true; } @@ -1026,6 +1043,10 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { valid_ = false; return false; } + if (timestamp_size_ > 0) { + Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_); + saved_timestamp_.assign(ts.data(), ts.size()); + } if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { assert(iter_.iter()->IsValuePinned()); pinned_value_ = iter_.value(); @@ -1142,7 +1163,8 @@ bool DBIter::FindUserKeyBeforeSavedKey() { return false; } - if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) { + if (user_comparator_.CompareWithoutTimestamp(ikey.user_key, + saved_key_.GetUserKey()) < 0) { return true; } @@ -1166,8 +1188,14 @@ bool DBIter::FindUserKeyBeforeSavedKey() { if (num_skipped >= max_skip_) { num_skipped = 0; IterKey last_key; - last_key.SetInternalKey(ParsedInternalKey( - saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); + ParsedInternalKey pikey(saved_key_.GetUserKey(), kMaxSequenceNumber, + kValueTypeForSeek); + if (timestamp_size_ > 0) { + // TODO: pre-create kTsMax. + const std::string kTsMax(timestamp_size_, '\xff'); + pikey.SetTimestamp(kTsMax); + } + last_key.SetInternalKey(pikey); // It would be more efficient to use SeekForPrev() here, but some // iterators may not support it. iter_.Seek(last_key.GetInternalKey()); @@ -1244,13 +1272,27 @@ void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) { saved_key_.Clear(); // now saved_key is used to store internal key. saved_key_.SetInternalKey(target, 0 /* sequence_number */, - kValueTypeForSeekForPrev); + kValueTypeForSeekForPrev, timestamp_ub_); + + if (timestamp_size_ > 0) { + const std::string kTsMin(timestamp_size_, '\0'); + Slice ts = kTsMin; + saved_key_.UpdateInternalKey(/*seq=*/0, kValueTypeForSeekForPrev, &ts); + } if (iterate_upper_bound_ != nullptr && - user_comparator_.Compare(saved_key_.GetUserKey(), - *iterate_upper_bound_) >= 0) { + user_comparator_.CompareWithoutTimestamp( + saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_upper_bound_, + /*b_has_ts=*/false) >= 0) { saved_key_.Clear(); - saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber); + saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber, + kValueTypeForSeekForPrev, timestamp_ub_); + if (timestamp_size_ > 0) { + const std::string kTsMax(timestamp_size_, '\xff'); + Slice ts = kTsMax; + saved_key_.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeekForPrev, + &ts); + } } } @@ -1353,13 +1395,6 @@ void DBIter::SeekForPrev(const Slice& target) { } #endif // ROCKSDB_LITE - if (timestamp_size_ > 0) { - valid_ = false; - status_ = Status::NotSupported( - "SeekToLast/SeekForPrev/Prev currently not supported with timestamp."); - return; - } - status_ = Status::OK(); ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); @@ -1454,17 +1489,12 @@ void DBIter::SeekToFirst() { } void DBIter::SeekToLast() { - if (timestamp_size_ > 0) { - valid_ = false; - status_ = Status::NotSupported( - "SeekToLast/SeekForPrev/Prev currently not supported with timestamp."); - return; - } - if (iterate_upper_bound_ != nullptr) { // Seek to last key strictly less than ReadOptions.iterate_upper_bound. SeekForPrev(*iterate_upper_bound_); - if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) { + if (Valid() && 0 == user_comparator_.CompareWithoutTimestamp( + *iterate_upper_bound_, /*a_has_ts=*/false, key(), + /*b_has_ts=*/false)) { ReleaseTempPinnedData(); PrevInternal(nullptr); } diff --git a/db/db_iter.h b/db/db_iter.h index 0cc1db774..9b049ffbb 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -67,7 +67,7 @@ class DBIter final : public Iterator { // this->key(). // (2) When moving backwards, the internal iterator is positioned // just before all entries whose user key == this->key(). - enum Direction { kForward, kReverse }; + enum Direction : uint8_t { kForward, kReverse }; // LocalStatistics contain Statistics counters that will be aggregated per // each iterator instance and then will be sent to the global statistics when @@ -184,6 +184,9 @@ class DBIter final : public Iterator { Slice timestamp() const override { assert(valid_); assert(timestamp_size_ > 0); + if (direction_ == kReverse) { + return saved_timestamp_; + } const Slice ukey_and_ts = saved_key_.GetUserKey(); assert(timestamp_size_ < ukey_and_ts.size()); return ExtractTimestampFromUserKey(ukey_and_ts, timestamp_size_); @@ -372,6 +375,7 @@ class DBIter final : public Iterator { const Slice* const timestamp_ub_; const Slice* const timestamp_lb_; const size_t timestamp_size_; + std::string saved_timestamp_; }; // Return a new iterator that converts internal keys (yielded by diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 9eca45027..b3b26769c 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -402,12 +402,11 @@ TEST_F(DBBasicTestWithTimestamp, GetApproximateSizes) { ASSERT_OK( db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size)); ASSERT_EQ(size, 0); - std::cout << size << std::endl; Close(); } -TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) { +TEST_F(DBBasicTestWithTimestamp, SimpleIterate) { const int kNumKeysPerFile = 128; const uint64_t kMaxKey = 1024; Options options = CurrentOptions(); @@ -439,6 +438,7 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) { std::unique_ptr it(db_->NewIterator(read_opts)); int count = 0; uint64_t key = 0; + // Forward iterate. for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid(); it->Next(), ++count, ++key) { CheckIterUserEntry(it.get(), Key1(key), kTypeValue, @@ -447,7 +447,16 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) { size_t expected_count = kMaxKey - start_keys[i] + 1; ASSERT_EQ(expected_count, count); - // SeekToFirst() with lower bound. + // Backward iterate. + count = 0; + for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid(); + it->Prev(), ++count, --key) { + CheckIterUserEntry(it.get(), Key1(key), kTypeValue, + "value" + std::to_string(i), write_timestamps[i]); + } + ASSERT_EQ(static_cast(kMaxKey) - start_keys[i] + 1, count); + + // SeekToFirst()/SeekToLast() with lower/upper bounds. // Then iter with lower and upper bounds. uint64_t l = 0; uint64_t r = kMaxKey + 1; @@ -465,6 +474,12 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) { "value" + std::to_string(i), write_timestamps[i]); } ASSERT_EQ(r - std::max(l, start_keys[i]), count); + + for (it->SeekToLast(), key = std::min(r, kMaxKey + 1), count = 0; + it->Valid(); it->Prev(), --key, ++count) { + CheckIterUserEntry(it.get(), Key1(key - 1), kTypeValue, + "value" + std::to_string(i), write_timestamps[i]); + } l += (kMaxKey / 100); r -= (kMaxKey / 100); } @@ -527,7 +542,7 @@ TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLessThanKey) { Close(); } -TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLargerThanKey) { +TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLongerThanKey) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; @@ -600,38 +615,135 @@ TEST_F(DBBasicTestWithTimestamp, SeekWithBound) { Slice ts = ts_str; write_opts.timestamp = &ts; - ASSERT_OK(db_->Put(write_opts, "foo1", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo1", "bar1")); ASSERT_OK(Flush()); - ASSERT_OK(db_->Put(write_opts, "foo2", "bar")); + ASSERT_OK(db_->Put(write_opts, "foo2", "bar2")); ASSERT_OK(Flush()); // Move sst file to next level ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - ASSERT_OK(db_->Put(write_opts, "foo3", "bar")); + for (int i = 3; i < 9; ++i) { + ASSERT_OK(db_->Put(write_opts, "foo" + std::to_string(i), + "bar" + std::to_string(i))); + } ASSERT_OK(Flush()); ReadOptions read_opts; std::string read_ts = Timestamp(2, 0); ts = read_ts; read_opts.timestamp = &ts; - std::string up_bound = "foo5"; + std::string up_bound = "foo5"; // exclusive Slice up_bound_slice = up_bound; + std::string lo_bound = "foo2"; // inclusive + Slice lo_bound_slice = lo_bound; read_opts.iterate_upper_bound = &up_bound_slice; + read_opts.iterate_lower_bound = &lo_bound_slice; read_opts.auto_prefix_mode = true; { std::unique_ptr iter(db_->NewIterator(read_opts)); // Make sure the prefix extractor doesn't include timestamp, otherwise it // may return invalid result. iter->Seek("foo"); - ASSERT_TRUE(iter->Valid()); - ASSERT_OK(iter->status()); + CheckIterUserEntry(iter.get(), lo_bound, kTypeValue, "bar2", + Timestamp(1, 0)); + iter->SeekToFirst(); + CheckIterUserEntry(iter.get(), lo_bound, kTypeValue, "bar2", + Timestamp(1, 0)); + iter->SeekForPrev("g"); + CheckIterUserEntry(iter.get(), "foo4", kTypeValue, "bar4", Timestamp(1, 0)); + iter->SeekToLast(); + CheckIterUserEntry(iter.get(), "foo4", kTypeValue, "bar4", Timestamp(1, 0)); } Close(); } +TEST_F(DBBasicTestWithTimestamp, ChangeIterationDirection) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.env = env_; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.prefix_extractor.reset(NewFixedPrefixTransform(1)); + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + DestroyAndReopen(options); + const std::vector timestamps = {Timestamp(1, 1), Timestamp(0, 2), + Timestamp(4, 3)}; + const std::vector> kvs = { + std::make_tuple("aa", "value1"), std::make_tuple("ab", "value2")}; + for (const auto& ts : timestamps) { + WriteBatch wb(0, 0, kTimestampSize); + for (const auto& kv : kvs) { + const std::string& key = std::get<0>(kv); + const std::string& value = std::get<1>(kv); + ASSERT_OK(wb.Put(key, value)); + } + + ASSERT_OK(wb.AssignTimestamp(ts)); + ASSERT_OK(db_->Write(WriteOptions(), &wb)); + } + std::string read_ts_str = Timestamp(5, 3); + Slice read_ts = read_ts_str; + ReadOptions read_opts; + read_opts.timestamp = &read_ts; + std::unique_ptr it(db_->NewIterator(read_opts)); + + it->SeekToFirst(); + ASSERT_TRUE(it->Valid()); + it->Prev(); + ASSERT_FALSE(it->Valid()); + + it->SeekToLast(); + ASSERT_TRUE(it->Valid()); + uint64_t prev_reseek_count = + options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION); + ASSERT_EQ(0, prev_reseek_count); + it->Next(); + ASSERT_FALSE(it->Valid()); + ASSERT_EQ(1 + prev_reseek_count, + options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); + + it->Seek(std::get<0>(kvs[0])); + CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue, + std::get<1>(kvs[0]), Timestamp(4, 3)); + it->Next(); + CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue, + std::get<1>(kvs[1]), Timestamp(4, 3)); + it->Prev(); + CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue, + std::get<1>(kvs[0]), Timestamp(4, 3)); + + prev_reseek_count = + options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION); + ASSERT_EQ(1, prev_reseek_count); + it->Next(); + CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue, + std::get<1>(kvs[1]), Timestamp(4, 3)); + ASSERT_EQ(1 + prev_reseek_count, + options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); + + it->SeekForPrev(std::get<0>(kvs[1])); + CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue, + std::get<1>(kvs[1]), Timestamp(4, 3)); + it->Prev(); + CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue, + std::get<1>(kvs[0]), Timestamp(4, 3)); + + prev_reseek_count = + options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION); + it->Next(); + CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue, + std::get<1>(kvs[1]), Timestamp(4, 3)); + ASSERT_EQ(1 + prev_reseek_count, + options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); + + it.reset(); + Close(); +} + TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) { constexpr int kNumKeysPerFile = 128; constexpr uint64_t kMaxKey = 1024; @@ -830,6 +942,16 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) { CheckIterUserEntry(iter.get(), "foo", kTypeValue, "value0", ts_str); ASSERT_EQ( 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); + + ts_str = Timestamp(kNumKeys, 0); + ts = ts_str; + read_opts.timestamp = &ts; + iter.reset(db_->NewIterator(read_opts)); + iter->SeekToLast(); + CheckIterUserEntry(iter.get(), "foo", kTypeValue, + "value" + std::to_string(kNumKeys - 1), ts_str); + ASSERT_EQ( + 2, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); } Close(); } @@ -880,6 +1002,47 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) { Close(); } +TEST_F(DBBasicTestWithTimestamp, ReseekToUserKeyBeforeSavedKey) { + Options options = GetDefaultOptions(); + options.env = env_; + options.create_if_missing = true; + constexpr size_t kNumKeys = 16; + options.max_sequential_skip_in_iterations = kNumKeys / 2; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + for (size_t i = 0; i < kNumKeys; ++i) { + std::string ts_str = Timestamp(static_cast(i + 1), 0); + Slice ts = ts_str; + WriteOptions write_opts; + write_opts.timestamp = &ts; + Status s = db_->Put(write_opts, "b", "value" + std::to_string(i)); + ASSERT_OK(s); + } + { + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + WriteOptions write_opts; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Put(write_opts, "a", "value")); + } + { + ReadOptions read_opts; + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + read_opts.timestamp = &ts; + std::unique_ptr iter(db_->NewIterator(read_opts)); + iter->SeekToLast(); + iter->Prev(); + CheckIterUserEntry(iter.get(), "a", kTypeValue, "value", ts_str); + ASSERT_EQ( + 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); + } + Close(); +} + TEST_F(DBBasicTestWithTimestamp, MultiGetWithFastLocalBloom) { Options options = CurrentOptions(); options.env = env_; @@ -1102,7 +1265,7 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetPrefixFilter) { Close(); } -TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) { +TEST_F(DBBasicTestWithTimestamp, MaxKeysSkippedDuringNext) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; @@ -1141,6 +1304,45 @@ TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) { Close(); } +TEST_F(DBBasicTestWithTimestamp, MaxKeysSkippedDuringPrev) { + Options options = GetDefaultOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + constexpr size_t max_skippable_internal_keys = 2; + const size_t kNumKeys = max_skippable_internal_keys + 2; + WriteOptions write_opts; + Status s; + { + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Put(write_opts, "b", "value")); + } + for (size_t i = 0; i < kNumKeys; ++i) { + std::string ts_str = Timestamp(static_cast(i + 1), 0); + Slice ts = ts_str; + write_opts.timestamp = &ts; + s = db_->Put(write_opts, "a", "value" + std::to_string(i)); + ASSERT_OK(s); + } + { + ReadOptions read_opts; + read_opts.max_skippable_internal_keys = max_skippable_internal_keys; + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + read_opts.timestamp = &ts; + std::unique_ptr iter(db_->NewIterator(read_opts)); + iter->SeekToLast(); + iter->Prev(); + ASSERT_TRUE(iter->status().IsIncomplete()); + } + Close(); +} + // Create two L0, and compact them to a new L1. In this test, L1 is L_bottom. // Two L0s: // f1 f2 @@ -2411,7 +2613,7 @@ class DBBasicTestWithTimestampPrefixSeek "/db_basic_test_with_timestamp_prefix_seek") {} }; -TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) { +TEST_P(DBBasicTestWithTimestampPrefixSeek, IterateWithPrefix) { const size_t kNumKeysPerFile = 128; Options options = CurrentOptions(); options.env = env_; @@ -2461,6 +2663,13 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) { "value" + std::to_string(i), write_ts_list[i]); iter->Next(); ASSERT_FALSE(iter->Valid()); + + // Seek to kMinKey + iter->Seek(Key1(kMinKey)); + CheckIterUserEntry(iter.get(), Key1(kMinKey), kTypeValue, + "value" + std::to_string(i), write_ts_list[i]); + iter->Prev(); + ASSERT_FALSE(iter->Valid()); } const std::vector targets = {kMinKey, kMinKey + 0x10, kMinKey + 0x100, kMaxKey}; @@ -2476,6 +2685,7 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) { Slice read_ts = read_ts_list[i]; read_opts.timestamp = &read_ts; std::unique_ptr it(db_->NewIterator(read_opts)); + // Forward and backward iterate. for (size_t j = 0; j != targets.size(); ++j) { std::string start_key = Key1(targets[j]); uint64_t expected_ub = @@ -2499,6 +2709,24 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) { it->Next(); } ASSERT_EQ(expected_ub - targets[j] + 1, count); + + count = 0; + expected_key = targets[j]; + it->SeekForPrev(start_key); + uint64_t expected_lb = (targets[j] & kPrefixMask); + while (it->Valid()) { + // Out of prefix + if (!read_opts.prefix_same_as_start && + pe->Transform(it->key()) != pe->Transform(start_key)) { + break; + } + CheckIterUserEntry(it.get(), Key1(expected_key), kTypeValue, + "value" + std::to_string(i), write_ts_list[i]); + ++count; + --expected_key; + it->Prev(); + } + ASSERT_EQ(targets[j] - std::max(expected_lb, kMinKey) + 1, count); } } } @@ -2533,7 +2761,7 @@ class DBBasicTestWithTsIterTombstones : DBBasicTestWithTimestampBase("/db_basic_ts_iter_tombstones") {} }; -TEST_P(DBBasicTestWithTsIterTombstones, ForwardIterDelete) { +TEST_P(DBBasicTestWithTsIterTombstones, IterWithDelete) { constexpr size_t kNumKeysPerFile = 128; Options options = CurrentOptions(); options.env = env_; @@ -2563,7 +2791,7 @@ TEST_P(DBBasicTestWithTsIterTombstones, ForwardIterDelete) { } ++key; } while (true); - // Delete them all + ts = write_ts_strs[1]; write_opts.timestamp = &ts; for (key = kMaxKey; key >= kMinKey; --key) { @@ -2590,6 +2818,13 @@ TEST_P(DBBasicTestWithTsIterTombstones, ForwardIterDelete) { ASSERT_EQ("value1" + std::to_string(key), iter->value()); } ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count); + + for (iter->SeekToLast(), count = 0, key = kMaxKey; iter->Valid(); + key -= 2, ++count, iter->Prev()) { + ASSERT_EQ(Key1(key), iter->key()); + ASSERT_EQ("value1" + std::to_string(key), iter->value()); + } + ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count); } Close(); } diff --git a/db/dbformat.h b/db/dbformat.h index e3e457719..76cc40aa8 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -122,7 +122,7 @@ struct ParsedInternalKey { void SetTimestamp(const Slice& ts) { assert(ts.size() <= user_key.size()); - const char* addr = user_key.data() - ts.size(); + const char* addr = user_key.data() + user_key.size() - ts.size(); memcpy(const_cast(addr), ts.data(), ts.size()); } }; @@ -512,6 +512,7 @@ class IterKey { bool IsKeyPinned() const { return (key_ != buf_); } + // user_key does not have timestamp. void SetInternalKey(const Slice& key_prefix, const Slice& user_key, SequenceNumber s, ValueType value_type = kValueTypeForSeek, diff --git a/db/memtable.cc b/db/memtable.cc index 2db1d0171..8283a5c6b 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -723,8 +723,8 @@ static bool SaveValue(void* arg, const char* entry) { const Comparator* user_comparator = s->mem->GetInternalKeyComparator().user_comparator(); size_t ts_sz = user_comparator->timestamp_size(); - if (user_comparator->CompareWithoutTimestamp(user_key_slice, - s->key->user_key()) == 0) { + if (user_comparator->EqualWithoutTimestamp(user_key_slice, + s->key->user_key())) { // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); ValueType type; diff --git a/include/rocksdb/comparator.h b/include/rocksdb/comparator.h index 53a46ad33..37c2925bc 100644 --- a/include/rocksdb/comparator.h +++ b/include/rocksdb/comparator.h @@ -110,7 +110,9 @@ class Comparator { // == 0 iff t1 == t2 // > 0 iff t1 > t2 // Note that an all-zero byte array will be the smallest (oldest) timestamp - // of the same length. + // of the same length, and a byte array with all bits 1 will be the largest. + // In the future, we can extend Comparator so that subclasses can specify + // both largest and smallest timestamps. virtual int CompareTimestamp(const Slice& /*ts1*/, const Slice& /*ts2*/) const { return 0; @@ -121,6 +123,11 @@ class Comparator { return Compare(a, b); } + virtual bool EqualWithoutTimestamp(const Slice& a, const Slice& b) const { + return 0 == + CompareWithoutTimestamp(a, /*a_has_ts=*/true, b, /*b_has_ts=*/true); + } + private: size_t timestamp_size_; }; diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index aa2f2a3ff..eb3f42acd 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -45,7 +45,6 @@ class Iterator : public Cleanable { // Position at the last key in the source. The iterator is // Valid() after this call iff the source is not empty. - // Currently incompatible with user timestamp. virtual void SeekToLast() = 0; // Position at the first key in the source that at or past target. @@ -60,7 +59,7 @@ class Iterator : public Cleanable { // Position at the last key in the source that at or before target. // The iterator is Valid() after this call iff the source contains // an entry that comes at or before target. - // Currently incompatible with user timestamp. + // Target does not contain timestamp. virtual void SeekForPrev(const Slice& target) = 0; // Moves to the next entry in the source. After this call, Valid() is @@ -70,7 +69,6 @@ class Iterator : public Cleanable { // Moves to the previous entry in the source. After this call, Valid() is // true iff the iterator was not positioned at the first entry in source. - // Currently incompatible with user timestamp. // REQUIRES: Valid() virtual void Prev() = 0; diff --git a/table/get_context.cc b/table/get_context.cc index 2211e440b..1c5e12c83 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -219,7 +219,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(matched); assert((state_ != kMerge && parsed_key.type != kTypeMerge) || merge_context_ != nullptr); - if (ucmp_->CompareWithoutTimestamp(parsed_key.user_key, user_key_) == 0) { + if (ucmp_->EqualWithoutTimestamp(parsed_key.user_key, user_key_)) { *matched = true; // If the value is not in the snapshot, skip it if (!CheckCallback(parsed_key.sequence)) { diff --git a/util/comparator.cc b/util/comparator.cc index 96e770489..f115a73e9 100644 --- a/util/comparator.cc +++ b/util/comparator.cc @@ -129,6 +129,10 @@ class BytewiseComparatorImpl : public Comparator { bool /*b_has_ts*/) const override { return a.compare(b); } + + bool EqualWithoutTimestamp(const Slice& a, const Slice& b) const override { + return a == b; + } }; class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl { diff --git a/util/user_comparator_wrapper.h b/util/user_comparator_wrapper.h index 3b67debd0..c40e04a76 100644 --- a/util/user_comparator_wrapper.h +++ b/util/user_comparator_wrapper.h @@ -73,6 +73,10 @@ class UserComparatorWrapper final : public Comparator { return user_comparator_->CompareWithoutTimestamp(a, a_has_ts, b, b_has_ts); } + bool EqualWithoutTimestamp(const Slice& a, const Slice& b) const override { + return user_comparator_->EqualWithoutTimestamp(a, b); + } + private: const Comparator* user_comparator_; };