From 9e89ffb776af2f3640a6f70ec744c2f67e8e4500 Mon Sep 17 00:00:00 2001 From: Huisheng Liu Date: Fri, 10 Apr 2020 09:49:38 -0700 Subject: [PATCH] make iterator return versions between timestamp bounds (#6544) Summary: (Based on Yanqin's idea) Add a new field in readoptions as lower timestamp bound for iterator. When the parameter is not supplied (nullptr), the iterator returns the latest visible version of a record. When it is supplied, the existing timestamp field is the upper bound. Together the two serves as a bounded time window. The iterator returns all versions of a record falling in the window. SeekRandom perf test (10 minutes) on the same development machine ram drive with the same DB data shows no regression (within marge of error). The test is adapted from https://github.com/facebook/rocksdb/wiki/RocksDB-In-Memory-Workload-Performance-Benchmarks. base line (commit e860f8840): seekrandom : 7.836 micros/op 4082449 ops/sec; (0 of 73481999 found) This PR: seekrandom : 7.764 micros/op 4120935 ops/sec; (0 of 71303999 found) db_bench --db=r:\rocksdb.github --num_levels=6 --key_size=20 --prefix_size=20 --keys_per_prefix=0 --value_size=100 --cache_size=2147483648 --cache_numshardbits=6 --compression_type=none --compression_ratio=1 --min_level_to_compress=-1 --disable_seek_compaction=1 --hard_rate_limit=2 --write_buffer_size=134217728 --max_write_buffer_number=2 --level0_file_num_compaction_trigger=8 --target_file_size_base=134217728 --max_bytes_for_level_base=1073741824 --disable_wal=0 --wal_dir=r:\rocksdb.github\WAL_LOG --sync=0 --verify_checksum=1 --statistics=0 --stats_per_interval=0 --stats_interval=1048576 --histogram=0 --use_plain_table=1 --open_files=-1 --memtablerep=prefix_hash --bloom_bits=10 --bloom_locality=1 --duration=600 --benchmarks=seekrandom --use_existing_db=1 --num=25000000 --threads=32 --allow_concurrent_memtable_write=0 Pull Request resolved: https://github.com/facebook/rocksdb/pull/6544 Reviewed By: ltamasi Differential Revision: D20844069 Pulled By: riversand963 fbshipit-source-id: d97f2bf38a323c8c6a68db213b2d3c694b1c1f74 --- db/db_iter.cc | 48 +++++++++++++----------- db/db_iter.h | 13 ++++++- db/db_with_timestamp_basic_test.cc | 59 ++++++++++++++++++++++++++++-- include/rocksdb/options.h | 5 +++ options/options.cc | 6 ++- 5 files changed, 103 insertions(+), 28 deletions(-) diff --git a/db/db_iter.cc b/db/db_iter.cc index d74162c82..cd0073a04 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -73,6 +73,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, cfd_(cfd), start_seqnum_(read_options.iter_start_seqnum), timestamp_ub_(read_options.timestamp), + timestamp_lb_(read_options.iter_start_ts), timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) { RecordTick(statistics_, NO_ITERATOR_CREATED); if (pin_thru_lifetime_) { @@ -246,23 +247,22 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, assert(ikey_.user_key.size() >= timestamp_size_); Slice ts; + bool more_recent = false; if (timestamp_size_ > 0) { ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_); } - if (IsVisible(ikey_.sequence, ts)) { + if (IsVisible(ikey_.sequence, ts, &more_recent)) { // If the previous entry is of seqnum 0, the current entry will not // possibly be skipped. This condition can potentially be relaxed to // prev_key.seq <= ikey_.sequence. We are cautious because it will be more // prone to bugs causing the same user key with the same sequence number. if (!is_prev_key_seqnum_zero && skipping_saved_key && - user_comparator_.CompareWithoutTimestamp( - ikey_.user_key, saved_key_.GetUserKey()) <= 0) { + CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) { num_skipped++; // skip this entry PERF_COUNTER_ADD(internal_key_skipped_count, 1); } else { assert(!skipping_saved_key || - user_comparator_.CompareWithoutTimestamp( - ikey_.user_key, saved_key_.GetUserKey()) > 0); + CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0); num_skipped = 0; reseek_done = false; switch (ikey_.type) { @@ -363,11 +363,13 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, } } } else { - PERF_COUNTER_ADD(internal_recent_skipped_count, 1); + if (more_recent) { + PERF_COUNTER_ADD(internal_recent_skipped_count, 1); + } - // This key was inserted after our snapshot was taken. - // If this happens too many times in a row for the same user key, we want - // to seek to the target sequence number. + // This key was inserted after our snapshot was taken or skipped by + // timestamp range. If this happens too many times in a row for the same + // user key, we want to seek to the target sequence number. int cmp = user_comparator_.CompareWithoutTimestamp( ikey_.user_key, saved_key_.GetUserKey()); if (cmp == 0 || (skipping_saved_key && cmp < 0)) { @@ -1101,20 +1103,24 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) { return false; } -bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts) { +bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts, + bool* more_recent) { // Remember that comparator orders preceding timestamp as larger. - int cmp_ts = timestamp_ub_ != nullptr - ? user_comparator_.CompareTimestamp(ts, *timestamp_ub_) - : 0; - if (cmp_ts > 0) { - return false; - } - if (read_callback_ == nullptr) { - return sequence <= sequence_; - } else { - // TODO(yanqin): support timestamp in read_callback_. - return read_callback_->IsVisible(sequence); + // TODO(yanqin): support timestamp in read_callback_. + bool visible_by_seq = (read_callback_ == nullptr) + ? sequence <= sequence_ + : read_callback_->IsVisible(sequence); + + bool visible_by_ts = + (timestamp_ub_ == nullptr || + user_comparator_.CompareTimestamp(ts, *timestamp_ub_) <= 0) && + (timestamp_lb_ == nullptr || + user_comparator_.CompareTimestamp(ts, *timestamp_lb_) >= 0); + + if (more_recent) { + *more_recent = !visible_by_seq; } + return visible_by_seq && visible_by_ts; } void DBIter::SetSavedKeyToSeekTarget(const Slice& target) { diff --git a/db/db_iter.h b/db/db_iter.h index c2f545c4f..ed843952b 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -231,7 +231,8 @@ class DBIter final : public Iterator { // entry can be found within the prefix. void PrevInternal(const Slice* prefix); bool TooManyInternalKeysSkipped(bool increment = true); - bool IsVisible(SequenceNumber sequence, const Slice& ts); + bool IsVisible(SequenceNumber sequence, const Slice& ts, + bool* more_recent = nullptr); // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // is called @@ -270,6 +271,15 @@ class DBIter final : public Iterator { return expect_total_order_inner_iter_; } + // If lower bound of timestamp is given by ReadOptions.iter_start_ts, we need + // to return versions of the same key. We cannot just skip if the key value + // is the same but timestamps are different but fall in timestamp range. + inline int CompareKeyForSkip(const Slice& a, const Slice& b) { + return timestamp_lb_ != nullptr + ? user_comparator_.Compare(a, b) + : user_comparator_.CompareWithoutTimestamp(a, b); + } + const SliceTransform* prefix_extractor_; Env* const env_; Logger* logger_; @@ -338,6 +348,7 @@ class DBIter final : public Iterator { // if this value > 0 iterator will return internal keys SequenceNumber start_seqnum_; const Slice* const timestamp_ub_; + const Slice* const timestamp_lb_; const size_t timestamp_size_; }; diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 900582226..f5bc19a4e 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -235,6 +235,57 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) { Close(); } +TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) { + const int kNumKeysPerFile = 128; + const uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.disable_auto_compactions = true; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + const std::vector write_timestamps = {Timestamp(1, 0), + Timestamp(3, 0)}; + const std::vector read_timestamps = {Timestamp(2, 0), + Timestamp(4, 0)}; + const std::vector read_timestamps_lb = {Timestamp(1, 0), + Timestamp(1, 0)}; + for (size_t i = 0; i < write_timestamps.size(); ++i) { + WriteOptions write_opts; + Slice write_ts = write_timestamps[i]; + write_opts.timestamp = &write_ts; + for (uint64_t key = 0; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); + ASSERT_OK(s); + } + } + for (size_t i = 0; i < read_timestamps.size(); ++i) { + ReadOptions read_opts; + Slice read_ts = read_timestamps[i]; + Slice read_ts_lb = read_timestamps_lb[i]; + read_opts.timestamp = &read_ts; + read_opts.iter_start_ts = &read_ts_lb; + std::unique_ptr it(db_->NewIterator(read_opts)); + int count = 0; + uint64_t key = 0; + for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) { + CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i), + write_timestamps[i]); + if (i > 0) { + it->Next(); + CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i - 1), + write_timestamps[i - 1]); + } + } + size_t expected_count = kMaxKey + 1; + ASSERT_EQ(expected_count, count); + } + Close(); +} + TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) { const int kNumKeysPerFile = 128; const uint64_t kMaxKey = 0xffffffffffffffff; @@ -584,7 +635,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { std::vector read_ts_list; const auto& verify_records_func = [&](size_t i, size_t begin, size_t end, - ColumnFamilyHandle* cfh) { + ColumnFamilyHandle* cfh) { std::string value; std::string timestamp; @@ -622,9 +673,9 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { // higherlevel[0].largest.userkey ASSERT_OK(Flush(cf)); - // compact files (2 at each level) to a lower level such that all keys - // with the same timestamp is at one level, with newer versions at - // higher levels. + // compact files (2 at each level) to a lower level such that all + // keys with the same timestamp is at one level, with newer versions + // at higher levels. CompactionOptions compact_opt; compact_opt.compression = kNoCompression; db_->CompactFiles(compact_opt, handles_[cf], diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index dbc68b7cf..1806536fb 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1337,9 +1337,14 @@ struct ReadOptions { // specified timestamp. All timestamps of the same database must be of the // same length and format. The user is responsible for providing a customized // compare function via Comparator to order tuples. + // For iterator, iter_start_ts is the lower bound (older) and timestamp + // serves as the upper bound. Versions of the same record that fall in + // the timestamp range will be returned. If iter_start_ts is nullptr, + // only the most recent version visible to timestamp is returned. // The user-specified timestamp feature is still under active development, // and the API is subject to change. const Slice* timestamp; + const Slice* iter_start_ts; ReadOptions(); ReadOptions(bool cksum, bool cache); diff --git a/options/options.cc b/options/options.cc index 3a611af23..5841eb2e9 100644 --- a/options/options.cc +++ b/options/options.cc @@ -607,7 +607,8 @@ ReadOptions::ReadOptions() background_purge_on_iterator_cleanup(false), ignore_range_deletions(false), iter_start_seqnum(0), - timestamp(nullptr) {} + timestamp(nullptr), + iter_start_ts(nullptr) {} ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), @@ -627,6 +628,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) background_purge_on_iterator_cleanup(false), ignore_range_deletions(false), iter_start_seqnum(0), - timestamp(nullptr) {} + timestamp(nullptr), + iter_start_ts(nullptr) {} } // namespace ROCKSDB_NAMESPACE