From d93812c9aeb7de454f2f564be57f41b7195f4b43 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 6 Mar 2020 16:21:03 -0800 Subject: [PATCH] Iterator with timestamp (#6255) Summary: Preliminary support for iterator with user timestamp. Current implementation does not consider merge operator and reverse iterator. Auto compaction is also disabled in unit tests. Create an iterator with timestamp. ``` ... read_opts.timestamp = &ts; auto* iter = db->NewIterator(read_opts); // target is key without timestamp. for (iter->Seek(target); iter->Valid(); iter->Next()) {} for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {} delete iter; read_opts.timestamp = &ts1; // lower_bound and upper_bound are without timestamp. read_opts.iterate_lower_bound = &lower_bound; read_opts.iterate_upper_bound = &upper_bound; auto* iter1 = db->NewIterator(read_opts); // Do Seek or SeekToFirst() delete iter1; ``` Test plan (dev server) ``` $make check ``` Simple benchmarking (dev server) 1. The overhead introduced by this PR even when timestamp is disabled. key size: 16 bytes value size: 100 bytes Entries: 1000000 Data reside in main memory, and try to stress iterator. Repeated three times on master and this PR. - Seek without next ``` ./db_bench -db=/dev/shm/rocksdbtest-1000 -benchmarks=fillseq,seekrandom -enable_pipelined_write=false -disable_wal=true -format_version=3 ``` master: 159047.0 ops/sec this PR: 158922.3 ops/sec (2% drop in throughput) - Seek and next 10 times ``` ./db_bench -db=/dev/shm/rocksdbtest-1000 -benchmarks=fillseq,seekrandom -enable_pipelined_write=false -disable_wal=true -format_version=3 -seek_nexts=10 ``` master: 109539.3 ops/sec this PR: 107519.7 ops/sec (2% drop in throughput) Pull Request resolved: https://github.com/facebook/rocksdb/pull/6255 Differential Revision: D19438227 Pulled By: riversand963 fbshipit-source-id: b66b4979486f8474619f4aa6bdd88598870b0746 --- HISTORY.md | 3 + db/arena_wrapped_db_iter.h | 3 + db/db_basic_test.cc | 742 ++++++++++++++---- db/db_iter.cc | 129 ++- db/db_iter.h | 16 +- db/dbformat.cc | 9 + db/dbformat.h | 29 +- db/version_set.cc | 4 +- include/rocksdb/comparator.h | 19 +- include/rocksdb/iterator.h | 9 + table/block_based/block_based_table_reader.cc | 17 +- table/internal_iterator.h | 1 + tools/ldb_cmd.cc | 4 +- util/comparator.cc | 8 +- util/user_comparator_wrapper.h | 15 +- utilities/ttl/db_ttl_impl.h | 2 +- 16 files changed, 795 insertions(+), 215 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 5a6c9bf2c..49e6cb9fc 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,9 @@ ### Performance Improvements * In CompactRange, for levels starting from 0, if the level does not have any file with any key falling in the specified range, the level is skipped. So instead of always compacting from level 0, the compaction starts from the first level with keys in the specified range until the last such level. +### New Features +* Basic support for user timestamp in iterator. Seek/SeekToFirst/Next and lower/upper bounds are supported. Reverse iteration is not supported. Merge is not considered. + ## 6.8.0 (02/24/2020) ### Java API Changes * Major breaking changes to Java comparators, toward standardizing on ByteBuffer for performant, locale-neutral operations on keys (#6252). diff --git a/db/arena_wrapped_db_iter.h b/db/arena_wrapped_db_iter.h index 0c135f857..af6eb68f7 100644 --- a/db/arena_wrapped_db_iter.h +++ b/db/arena_wrapped_db_iter.h @@ -51,6 +51,8 @@ class ArenaWrappedDBIter : public Iterator { bool Valid() const override { return db_iter_->Valid(); } void SeekToFirst() override { db_iter_->SeekToFirst(); } void SeekToLast() override { db_iter_->SeekToLast(); } + // 'target' does not contain timestamp, even if user timestamp feature is + // enabled. void Seek(const Slice& target) override { db_iter_->Seek(target); } void SeekForPrev(const Slice& target) override { db_iter_->SeekForPrev(target); @@ -60,6 +62,7 @@ class ArenaWrappedDBIter : public Iterator { Slice key() const override { return db_iter_->key(); } Slice value() const override { return db_iter_->value(); } Status status() const override { return db_iter_->status(); } + Slice timestamp() const override { return db_iter_->timestamp(); } bool IsBlob() const { return db_iter_->IsBlob(); } Status GetProperty(std::string prop_name, std::string* prop) override; diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index c7f32f354..e2fd94027 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -392,7 +392,7 @@ TEST_F(DBBasicTest, FlushEmptyColumnFamily) { sleeping_task_low.WaitUntilDone(); } -TEST_F(DBBasicTest, FLUSH) { +TEST_F(DBBasicTest, Flush) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); WriteOptions writeOpt = WriteOptions(); @@ -2196,9 +2196,36 @@ class DBBasicTestWithTimestampBase : public DBTestBase { : DBTestBase(dbname) {} protected: - class TestComparatorBase : public Comparator { + static std::string Key1(uint64_t k) { + uint32_t x = 1; + const bool is_little_endian = (*reinterpret_cast(&x) != 0); + std::string ret; + if (is_little_endian) { + ret.assign(reinterpret_cast(&k), sizeof(k)); + } else { + ret.resize(sizeof(k)); + ret[0] = k & 0xff; + ret[1] = (k >> 8) & 0xff; + ret[2] = (k >> 16) & 0xff; + ret[3] = (k >> 24) & 0xff; + ret[4] = (k >> 32) & 0xff; + ret[5] = (k >> 40) & 0xff; + ret[6] = (k >> 48) & 0xff; + ret[7] = (k >> 56) & 0xff; + } + std::reverse(ret.begin(), ret.end()); + return ret; + } + + class TestComparator : public Comparator { + private: + const Comparator* cmp_without_ts_; + public: - explicit TestComparatorBase(size_t ts_sz) : Comparator(ts_sz) {} + explicit TestComparator(size_t ts_sz) + : Comparator(ts_sz), cmp_without_ts_(nullptr) { + cmp_without_ts_ = BytewiseComparator(); + } const char* Name() const override { return "TestComparator"; } @@ -2211,20 +2238,23 @@ class DBBasicTestWithTimestampBase : public DBTestBase { if (r != 0 || 0 == timestamp_size()) { return r; } - return CompareTimestamp( + return -CompareTimestamp( Slice(a.data() + a.size() - timestamp_size(), timestamp_size()), Slice(b.data() + b.size() - timestamp_size(), timestamp_size())); } - virtual int CompareImpl(const Slice& a, const Slice& b) const = 0; - - int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override { - assert(a.size() >= timestamp_size()); - assert(b.size() >= timestamp_size()); - Slice k1 = StripTimestampFromUserKey(a, timestamp_size()); - Slice k2 = StripTimestampFromUserKey(b, timestamp_size()); - - return CompareImpl(k1, k2); + using Comparator::CompareWithoutTimestamp; + int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b, + bool b_has_ts) const override { + if (a_has_ts) { + assert(a.size() >= timestamp_size()); + } + if (b_has_ts) { + assert(b.size() >= timestamp_size()); + } + Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, timestamp_size()) : a; + Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, timestamp_size()) : b; + return cmp_without_ts_->Compare(lhs, rhs); } int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override { @@ -2240,59 +2270,408 @@ class DBBasicTestWithTimestampBase : public DBTestBase { uint64_t low2 = 0; uint64_t high1 = 0; uint64_t high2 = 0; - auto* ptr1 = const_cast(&ts1); - auto* ptr2 = const_cast(&ts2); + const size_t kSize = ts1.size(); + std::unique_ptr ts1_buf(new char[kSize]); + memcpy(ts1_buf.get(), ts1.data(), ts1.size()); + std::unique_ptr ts2_buf(new char[kSize]); + memcpy(ts2_buf.get(), ts2.data(), ts2.size()); + Slice ts1_copy = Slice(ts1_buf.get(), kSize); + Slice ts2_copy = Slice(ts2_buf.get(), kSize); + auto* ptr1 = const_cast(&ts1_copy); + auto* ptr2 = const_cast(&ts2_copy); if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) || !GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) { assert(false); } if (high1 < high2) { - return 1; - } else if (high1 > high2) { return -1; + } else if (high1 > high2) { + return 1; } if (low1 < low2) { - return 1; - } else if (low1 > low2) { return -1; + } else if (low1 > low2) { + return 1; } return 0; } }; - Slice EncodeTimestamp(uint64_t low, uint64_t high, std::string* ts) { - assert(nullptr != ts); - ts->clear(); - PutFixed64(ts, low); - PutFixed64(ts, high); - assert(ts->size() == sizeof(low) + sizeof(high)); - return Slice(*ts); + std::string Timestamp(uint64_t low, uint64_t high) { + std::string ts; + PutFixed64(&ts, low); + PutFixed64(&ts, high); + return ts; + } + + void CheckIterUserEntry(const Iterator* it, const Slice& expected_key, + const Slice& expected_value, + const Slice& expected_ts) const { + ASSERT_TRUE(it->Valid()); + ASSERT_OK(it->status()); + ASSERT_EQ(expected_key, it->key()); + ASSERT_EQ(expected_value, it->value()); + ASSERT_EQ(expected_ts, it->timestamp()); + } + + void CheckIterEntry(const Iterator* it, const Slice& expected_ukey, + SequenceNumber expected_seq, ValueType expected_val_type, + const Slice& expected_value, const Slice& expected_ts) { + ASSERT_TRUE(it->Valid()); + ASSERT_OK(it->status()); + std::string ukey_and_ts; + ukey_and_ts.assign(expected_ukey.data(), expected_ukey.size()); + ukey_and_ts.append(expected_ts.data(), expected_ts.size()); + ParsedInternalKey parsed_ikey(ukey_and_ts, expected_seq, expected_val_type); + std::string ikey; + AppendInternalKey(&ikey, parsed_ikey); + ASSERT_EQ(Slice(ikey), it->key()); + if (expected_val_type == kTypeValue) { + ASSERT_EQ(expected_value, it->value()); + } + ASSERT_EQ(expected_ts, it->timestamp()); } }; class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase { public: DBBasicTestWithTimestamp() - : DBBasicTestWithTimestampBase("/db_basic_test_with_timestamp") {} + : DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {} +}; - protected: - class TestComparator : public TestComparatorBase { - public: - const int kKeyPrefixLength = - 3; // 3: length of "key" in generated keys ("key" + std::to_string(j)) - explicit TestComparator(size_t ts_sz) : TestComparatorBase(ts_sz) {} +TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) { + const int kNumKeysPerFile = 2048; + const uint64_t kMaxKey = 16384; + Options options = CurrentOptions(); + options.env = env_; + // TODO(yanqin) re-enable auto compaction + options.disable_auto_compactions = true; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + const std::vector start_keys = {1, 0}; + const std::vector write_timestamps = {Timestamp(1, 0), + Timestamp(3, 0)}; + const std::vector read_timestamps = {Timestamp(2, 0), + Timestamp(4, 0)}; + for (size_t i = 0; i < write_timestamps.size(); ++i) { + WriteOptions write_opts; + Slice write_ts = write_timestamps[i]; + write_opts.timestamp = &write_ts; + for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); + ASSERT_OK(s); + } + } + for (size_t i = 0; i < read_timestamps.size(); ++i) { + ReadOptions read_opts; + Slice read_ts = read_timestamps[i]; + read_opts.timestamp = &read_ts; + std::unique_ptr it(db_->NewIterator(read_opts)); + int count = 0; + uint64_t key = 0; + for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid(); + it->Next(), ++count, ++key) { + CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i), + write_timestamps[i]); + } + size_t expected_count = kMaxKey - start_keys[i] + 1; + ASSERT_EQ(expected_count, count); - int CompareImpl(const Slice& a, const Slice& b) const override { - int n1 = atoi( - std::string(a.data() + kKeyPrefixLength, a.size() - kKeyPrefixLength) - .c_str()); - int n2 = atoi( - std::string(b.data() + kKeyPrefixLength, b.size() - kKeyPrefixLength) - .c_str()); - return (n1 < n2) ? -1 : (n1 > n2) ? 1 : 0; + // SeekToFirst() with lower bound. + // Then iter with lower and upper bounds. + uint64_t l = 0; + uint64_t r = kMaxKey + 1; + while (l < r) { + std::string lb_str = Key1(l); + Slice lb = lb_str; + std::string ub_str = Key1(r); + Slice ub = ub_str; + read_opts.iterate_lower_bound = &lb; + read_opts.iterate_upper_bound = &ub; + it.reset(db_->NewIterator(read_opts)); + for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0; + it->Valid(); it->Next(), ++key, ++count) { + CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i), + write_timestamps[i]); + } + ASSERT_EQ(r - std::max(l, start_keys[i]), count); + l += (kMaxKey / 100); + r -= (kMaxKey / 100); + } + } + Close(); +} + +TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) { + const int kNumKeysPerFile = 2048; + const uint64_t kMaxKey = 0xffffffffffffffff; + const uint64_t kMinKey = kMaxKey - 16383; + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + // TODO(yanqin) re-enable auto compaction + options.disable_auto_compactions = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + std::vector start_seqs; + + const int kNumTimestamps = 4; + std::vector write_ts_list; + for (int t = 0; t != kNumTimestamps; ++t) { + write_ts_list.push_back(Timestamp(2 * t, /*do not care*/ 17)); + } + WriteOptions write_opts; + for (size_t i = 0; i != write_ts_list.size(); ++i) { + Slice write_ts = write_ts_list[i]; + write_opts.timestamp = &write_ts; + uint64_t k = kMinKey; + do { + Status s = db_->Put(write_opts, Key1(k), "value" + std::to_string(i)); + ASSERT_OK(s); + if (k == kMaxKey) { + break; + } + ++k; + } while (k != 0); + start_seqs.push_back(db_->GetLatestSequenceNumber()); + } + std::vector read_ts_list; + for (int t = 0; t != kNumTimestamps - 1; ++t) { + read_ts_list.push_back(Timestamp(2 * t + 3, /*do not care*/ 17)); + } + ReadOptions read_opts; + for (size_t i = 0; i != read_ts_list.size(); ++i) { + Slice read_ts = read_ts_list[i]; + read_opts.timestamp = &read_ts; + read_opts.iter_start_seqnum = start_seqs[i]; + std::unique_ptr iter(db_->NewIterator(read_opts)); + SequenceNumber expected_seq = start_seqs[i] + 1; + uint64_t key = kMinKey; + for (iter->Seek(Key1(kMinKey)); iter->Valid(); iter->Next()) { + CheckIterEntry(iter.get(), Key1(key), expected_seq, kTypeValue, + "value" + std::to_string(i + 1), write_ts_list[i + 1]); + ++key; + ++expected_seq; + } + } + Close(); +} + +TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + constexpr size_t kNumKeys = 16; + options.max_sequential_skip_in_iterations = kNumKeys / 2; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + // TODO(yanqin) re-enable auto compaction + options.disable_auto_compactions = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + // Insert kNumKeys + WriteOptions write_opts; + Status s; + for (size_t i = 0; i != kNumKeys; ++i) { + std::string ts_str = Timestamp(static_cast(i + 1), 0); + Slice ts = ts_str; + write_opts.timestamp = &ts; + s = db_->Put(write_opts, "foo", "value" + std::to_string(i)); + ASSERT_OK(s); + } + { + ReadOptions read_opts; + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + read_opts.timestamp = &ts; + std::unique_ptr iter(db_->NewIterator(read_opts)); + iter->SeekToFirst(); + CheckIterUserEntry(iter.get(), "foo", "value0", ts_str); + ASSERT_EQ( + 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); + } + Close(); +} + +TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + constexpr size_t kNumKeys = 16; + options.max_sequential_skip_in_iterations = kNumKeys / 2; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + // TODO(yanqin) re-enable auto compaction + options.disable_auto_compactions = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + // Write kNumKeys + 1 keys + WriteOptions write_opts; + Status s; + for (size_t i = 0; i != kNumKeys; ++i) { + std::string ts_str = Timestamp(static_cast(i + 1), 0); + Slice ts = ts_str; + write_opts.timestamp = &ts; + s = db_->Put(write_opts, "a", "value" + std::to_string(i)); + ASSERT_OK(s); + } + { + std::string ts_str = Timestamp(static_cast(kNumKeys + 1), 0); + WriteBatch batch(0, 0, kTimestampSize); + batch.Put("a", "new_value"); + batch.Put("b", "new_value"); + s = batch.AssignTimestamp(ts_str); + ASSERT_OK(s); + s = db_->Write(write_opts, &batch); + ASSERT_OK(s); + } + { + ReadOptions read_opts; + std::string ts_str = Timestamp(static_cast(kNumKeys + 1), 0); + Slice ts = ts_str; + read_opts.timestamp = &ts; + std::unique_ptr iter(db_->NewIterator(read_opts)); + iter->Seek("a"); + iter->Next(); + CheckIterUserEntry(iter.get(), "b", "new_value", ts_str); + ASSERT_EQ( + 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); + } + Close(); +} + +TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + constexpr size_t max_skippable_internal_keys = 2; + const size_t kNumKeys = max_skippable_internal_keys + 2; + WriteOptions write_opts; + Status s; + { + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Put(write_opts, "a", "value")); + } + for (size_t i = 0; i < kNumKeys; ++i) { + std::string ts_str = Timestamp(static_cast(i + 1), 0); + Slice ts = ts_str; + write_opts.timestamp = &ts; + s = db_->Put(write_opts, "b", "value" + std::to_string(i)); + ASSERT_OK(s); + } + { + ReadOptions read_opts; + read_opts.max_skippable_internal_keys = max_skippable_internal_keys; + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + read_opts.timestamp = &ts; + std::unique_ptr iter(db_->NewIterator(read_opts)); + iter->SeekToFirst(); + iter->Next(); + ASSERT_TRUE(iter->status().IsIncomplete()); + } + Close(); +} + +class DBBasicTestWithTimestampCompressionSettings + : public DBBasicTestWithTimestampBase, + public testing::WithParamInterface, CompressionType, uint32_t>> { + public: + DBBasicTestWithTimestampCompressionSettings() + : DBBasicTestWithTimestampBase( + "db_basic_test_with_timestamp_compression") {} +}; + +TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) { + const int kNumKeysPerFile = 8192; + const size_t kNumTimestamps = 6; + Options options = CurrentOptions(); + options.create_if_missing = true; + options.env = env_; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + size_t ts_sz = Timestamp(0, 0).size(); + TestComparator test_cmp(ts_sz); + options.comparator = &test_cmp; + BlockBasedTableOptions bbto; + bbto.filter_policy = std::get<0>(GetParam()); + bbto.whole_key_filtering = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + + const CompressionType comp_type = std::get<1>(GetParam()); +#if LZ4_VERSION_NUMBER < 10400 // r124+ + if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) { + return; + } +#endif // LZ4_VERSION_NUMBER >= 10400 + if (!ZSTD_Supported() && comp_type == kZSTD) { + return; + } + if (!Zlib_Supported() && comp_type == kZlibCompression) { + return; + } + + options.compression = comp_type; + options.compression_opts.max_dict_bytes = std::get<2>(GetParam()); + if (comp_type == kZSTD) { + options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); + } + options.target_file_size_base = 1 << 26; // 64MB + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(2, num_cfs); + std::vector write_ts_list; + std::vector read_ts_list; + + for (size_t i = 0; i != kNumTimestamps; ++i) { + write_ts_list.push_back(Timestamp(i * 2, 0)); + read_ts_list.push_back(Timestamp(1 + i * 2, 0)); + const Slice write_ts = write_ts_list.back(); + WriteOptions wopts; + wopts.timestamp = &write_ts; + for (int cf = 0; cf != static_cast(num_cfs); ++cf) { + for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) { + ASSERT_OK(Put(cf, Key1(j), + "value_" + std::to_string(j) + "_" + std::to_string(i), + wopts)); + } + } + } + const auto& verify_db_func = [&]() { + for (size_t i = 0; i != kNumTimestamps; ++i) { + ReadOptions ropts; + const Slice read_ts = read_ts_list[i]; + ropts.timestamp = &read_ts; + for (int cf = 0; cf != static_cast(num_cfs); ++cf) { + ColumnFamilyHandle* cfh = handles_[cf]; + for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) { + std::string value; + ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value)); + ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), + value); + } + } } }; -}; + verify_db_func(); + Close(); +} #ifndef ROCKSDB_LITE // A class which remembers the name of each flushed file. @@ -2325,7 +2704,7 @@ class FlushedFileCollector : public EventListener { InstrumentedMutex mutex_; }; -TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) { +TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { const int kNumKeysPerFile = 8192; const size_t kNumTimestamps = 2; const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps; @@ -2338,23 +2717,39 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) { FlushedFileCollector* collector = new FlushedFileCollector(); options.listeners.emplace_back(collector); - std::string tmp; - size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size(); + size_t ts_sz = Timestamp(0, 0).size(); TestComparator test_cmp(ts_sz); options.comparator = &test_cmp; BlockBasedTableOptions bbto; - bbto.filter_policy.reset(NewBloomFilterPolicy( - 10 /*bits_per_key*/, false /*use_block_based_builder*/)); + bbto.filter_policy = std::get<0>(GetParam()); bbto.whole_key_filtering = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + + const CompressionType comp_type = std::get<1>(GetParam()); +#if LZ4_VERSION_NUMBER < 10400 // r124+ + if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) { + return; + } +#endif // LZ4_VERSION_NUMBER >= 10400 + if (!ZSTD_Supported() && comp_type == kZSTD) { + return; + } + if (!Zlib_Supported() && comp_type == kZlibCompression) { + return; + } + + options.compression = comp_type; + options.compression_opts.max_dict_bytes = std::get<2>(GetParam()); + if (comp_type == kZSTD) { + options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); + } DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); + size_t num_cfs = handles_.size(); ASSERT_EQ(2, num_cfs); - std::vector write_ts_strs(kNumTimestamps); - std::vector read_ts_strs(kNumTimestamps); - std::vector write_ts_list; - std::vector read_ts_list; + std::vector write_ts_list; + std::vector read_ts_list; const auto& verify_record_func = [&](size_t i, size_t k, ColumnFamilyHandle* cfh) { @@ -2362,26 +2757,26 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) { std::string timestamp; ReadOptions ropts; - ropts.timestamp = &read_ts_list[i]; + const Slice read_ts = read_ts_list[i]; + ropts.timestamp = &read_ts; std::string expected_timestamp = std::string(write_ts_list[i].data(), write_ts_list[i].size()); - ASSERT_OK( - db_->Get(ropts, cfh, "key" + std::to_string(k), &value, ×tamp)); + ASSERT_OK(db_->Get(ropts, cfh, Key1(k), &value, ×tamp)); ASSERT_EQ("value_" + std::to_string(k) + "_" + std::to_string(i), value); ASSERT_EQ(expected_timestamp, timestamp); }; for (size_t i = 0; i != kNumTimestamps; ++i) { - write_ts_list.emplace_back(EncodeTimestamp(i * 2, 0, &write_ts_strs[i])); - read_ts_list.emplace_back(EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i])); - const Slice& write_ts = write_ts_list.back(); + write_ts_list.push_back(Timestamp(i * 2, 0)); + read_ts_list.push_back(Timestamp(1 + i * 2, 0)); + const Slice write_ts = write_ts_list.back(); WriteOptions wopts; wopts.timestamp = &write_ts; for (int cf = 0; cf != static_cast(num_cfs); ++cf) { size_t memtable_get_start = 0; for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { - ASSERT_OK(Put(cf, "key" + std::to_string(j), + ASSERT_OK(Put(cf, Key1(j), "value_" + std::to_string(j) + "_" + std::to_string(i), wopts)); if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) { @@ -2411,7 +2806,8 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) { const auto& verify_db_func = [&]() { for (size_t i = 0; i != kNumTimestamps; ++i) { ReadOptions ropts; - ropts.timestamp = &read_ts_list[i]; + const Slice read_ts = read_ts_list[i]; + ropts.timestamp = &read_ts; std::string expected_timestamp(write_ts_list[i].data(), write_ts_list[i].size()); for (int cf = 0; cf != static_cast(num_cfs); ++cf) { @@ -2423,130 +2819,148 @@ TEST_F(DBBasicTestWithTimestamp, PutAndGetWithCompaction) { } }; verify_db_func(); + Close(); } #endif // !ROCKSDB_LITE -class DBBasicTestWithTimestampWithParam +INSTANTIATE_TEST_CASE_P( + Timestamp, DBBasicTestWithTimestampCompressionSettings, + ::testing::Combine( + ::testing::Values(std::shared_ptr(nullptr), + std::shared_ptr( + NewBloomFilterPolicy(10, false))), + ::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression, + kLZ4HCCompression, kZSTD), + ::testing::Values(0, 1 << 14))); + +class DBBasicTestWithTimestampPrefixSeek : public DBBasicTestWithTimestampBase, - public testing::WithParamInterface { + public testing::WithParamInterface< + std::tuple, + std::shared_ptr, bool>> { public: - DBBasicTestWithTimestampWithParam() + DBBasicTestWithTimestampPrefixSeek() : DBBasicTestWithTimestampBase( - "/db_basic_test_with_timestamp_with_param") {} - - protected: - class TestComparator : public TestComparatorBase { - private: - const Comparator* cmp_without_ts_; - - public: - explicit TestComparator(size_t ts_sz) - : TestComparatorBase(ts_sz), cmp_without_ts_(nullptr) { - cmp_without_ts_ = BytewiseComparator(); - } - - int CompareImpl(const Slice& a, const Slice& b) const override { - return cmp_without_ts_->Compare(a, b); - } - }; + "/db_basic_test_with_timestamp_prefix_seek") {} }; -TEST_P(DBBasicTestWithTimestampWithParam, PutAndGet) { - const int kNumKeysPerFile = 8192; - const size_t kNumTimestamps = 6; - bool memtable_only = GetParam(); +TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) { + const size_t kNumKeysPerFile = 4096; Options options = CurrentOptions(); - options.create_if_missing = true; options.env = env_; - options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); - std::string tmp; - size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size(); - TestComparator test_cmp(ts_sz); + options.create_if_missing = true; + // TODO(yanqin): re-enable auto compactions + options.disable_auto_compactions = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; + options.prefix_extractor = std::get<0>(GetParam()); + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); BlockBasedTableOptions bbto; - bbto.filter_policy.reset(NewBloomFilterPolicy( - 10 /*bits_per_key*/, false /*use_block_based_builder*/)); - bbto.whole_key_filtering = true; + bbto.filter_policy = std::get<1>(GetParam()); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); - std::vector compression_types; - compression_types.push_back(kNoCompression); - if (Zlib_Supported()) { - compression_types.push_back(kZlibCompression); - } -#if LZ4_VERSION_NUMBER >= 10400 // r124+ - compression_types.push_back(kLZ4Compression); - compression_types.push_back(kLZ4HCCompression); -#endif // LZ4_VERSION_NUMBER >= 10400 - if (ZSTD_Supported()) { - compression_types.push_back(kZSTD); - } - - // Switch compression dictionary on/off to check key extraction - // correctness in kBuffered state - std::vector max_dict_bytes_list = {0, 1 << 14}; // 0 or 16KB - - for (auto compression_type : compression_types) { - for (uint32_t max_dict_bytes : max_dict_bytes_list) { - options.compression = compression_type; - options.compression_opts.max_dict_bytes = max_dict_bytes; - if (compression_type == kZSTD) { - options.compression_opts.zstd_max_train_bytes = max_dict_bytes; - } - options.target_file_size_base = 1 << 26; // 64MB - - DestroyAndReopen(options); - CreateAndReopenWithCF({"pikachu"}, options); - size_t num_cfs = handles_.size(); - ASSERT_EQ(2, num_cfs); - std::vector write_ts_strs(kNumTimestamps); - std::vector read_ts_strs(kNumTimestamps); - std::vector write_ts_list; - std::vector read_ts_list; - - for (size_t i = 0; i != kNumTimestamps; ++i) { - write_ts_list.emplace_back( - EncodeTimestamp(i * 2, 0, &write_ts_strs[i])); - read_ts_list.emplace_back( - EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i])); - const Slice& write_ts = write_ts_list.back(); - WriteOptions wopts; - wopts.timestamp = &write_ts; - for (int cf = 0; cf != static_cast(num_cfs); ++cf) { - for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) { - ASSERT_OK(Put( - cf, "key" + std::to_string(j), - "value_" + std::to_string(j) + "_" + std::to_string(i), wopts)); - } - if (!memtable_only) { - ASSERT_OK(Flush(cf)); - } + const uint64_t kMaxKey = 0xffffffffffffffff; + const uint64_t kMinKey = 0xffffffffffff8000; + const std::vector write_ts_list = {Timestamp(3, 0xffffffff), + Timestamp(6, 0xffffffff)}; + WriteOptions write_opts; + { + for (size_t i = 0; i != write_ts_list.size(); ++i) { + Slice write_ts = write_ts_list[i]; + write_opts.timestamp = &write_ts; + uint64_t key = kMinKey; + do { + Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); + ASSERT_OK(s); + if (key == kMaxKey) { + break; } - } - const auto& verify_db_func = [&]() { - for (size_t i = 0; i != kNumTimestamps; ++i) { - ReadOptions ropts; - ropts.timestamp = &read_ts_list[i]; - for (int cf = 0; cf != static_cast(num_cfs); ++cf) { - ColumnFamilyHandle* cfh = handles_[cf]; - for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; - ++j) { - std::string value; - ASSERT_OK( - db_->Get(ropts, cfh, "key" + std::to_string(j), &value)); - ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), - value); - } - } - } - }; - verify_db_func(); + ++key; + } while (true); } } + const std::vector read_ts_list = {Timestamp(5, 0xffffffff), + Timestamp(9, 0xffffffff)}; + { + ReadOptions read_opts; + read_opts.total_order_seek = false; + read_opts.prefix_same_as_start = std::get<2>(GetParam()); + fprintf(stdout, "%s %s %d\n", options.prefix_extractor->Name(), + bbto.filter_policy ? bbto.filter_policy->Name() : "null", + static_cast(read_opts.prefix_same_as_start)); + for (size_t i = 0; i != read_ts_list.size(); ++i) { + Slice read_ts = read_ts_list[i]; + read_opts.timestamp = &read_ts; + std::unique_ptr iter(db_->NewIterator(read_opts)); + + // Seek to kMaxKey + iter->Seek(Key1(kMaxKey)); + CheckIterUserEntry(iter.get(), Key1(kMaxKey), "value" + std::to_string(i), + write_ts_list[i]); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + } + const std::vector targets = {kMinKey, kMinKey + 0x10, + kMinKey + 0x100, kMaxKey}; + const SliceTransform* const pe = options.prefix_extractor.get(); + ASSERT_NE(nullptr, pe); + const size_t kPrefixShift = + 8 * (Key1(0).size() - pe->Transform(Key1(0)).size()); + const uint64_t kPrefixMask = + ~((static_cast(1) << kPrefixShift) - 1); + const uint64_t kNumKeysWithinPrefix = + (static_cast(1) << kPrefixShift); + for (size_t i = 0; i != read_ts_list.size(); ++i) { + Slice read_ts = read_ts_list[i]; + read_opts.timestamp = &read_ts; + std::unique_ptr it(db_->NewIterator(read_opts)); + for (size_t j = 0; j != targets.size(); ++j) { + std::string start_key = Key1(targets[j]); + uint64_t expected_ub = + (targets[j] & kPrefixMask) - 1 + kNumKeysWithinPrefix; + uint64_t expected_key = targets[j]; + size_t count = 0; + it->Seek(Key1(targets[j])); + while (it->Valid()) { + std::string saved_prev_key; + saved_prev_key.assign(it->key().data(), it->key().size()); + + // Out of prefix + if (!read_opts.prefix_same_as_start && + pe->Transform(saved_prev_key) != pe->Transform(start_key)) { + break; + } + CheckIterUserEntry(it.get(), Key1(expected_key), + "value" + std::to_string(i), write_ts_list[i]); + ++count; + ++expected_key; + it->Next(); + } + ASSERT_EQ(expected_ub - targets[j] + 1, count); + } + } + } + Close(); } -INSTANTIATE_TEST_CASE_P(Timestamp, DBBasicTestWithTimestampWithParam, - ::testing::Bool()); +// TODO(yanqin): consider handling non-fixed-length prefix extractors, e.g. +// NoopTransform. +INSTANTIATE_TEST_CASE_P( + Timestamp, DBBasicTestWithTimestampPrefixSeek, + ::testing::Combine( + ::testing::Values( + std::shared_ptr(NewFixedPrefixTransform(4)), + std::shared_ptr(NewFixedPrefixTransform(7)), + std::shared_ptr(NewFixedPrefixTransform(8))), + ::testing::Values(std::shared_ptr(nullptr), + std::shared_ptr( + NewBloomFilterPolicy(10 /*bits_per_key*/, false)), + std::shared_ptr( + NewBloomFilterPolicy(20 /*bits_per_key*/, + false))), + ::testing::Bool())); } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_iter.cc b/db/db_iter.cc index e5d402948..d74162c82 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -49,6 +49,8 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, read_callback_(read_callback), sequence_(s), statistics_(cf_options.statistics), + max_skip_(max_sequential_skip_in_iterations), + max_skippable_internal_keys_(read_options.max_skippable_internal_keys), num_internal_keys_skipped_(0), iterate_lower_bound_(read_options.iterate_lower_bound), iterate_upper_bound_(read_options.iterate_upper_bound), @@ -69,16 +71,17 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, range_del_agg_(&cf_options.internal_comparator, s), db_impl_(db_impl), cfd_(cfd), - start_seqnum_(read_options.iter_start_seqnum) { + start_seqnum_(read_options.iter_start_seqnum), + timestamp_ub_(read_options.timestamp), + timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) { RecordTick(statistics_, NO_ITERATOR_CREATED); - max_skip_ = max_sequential_skip_in_iterations; - max_skippable_internal_keys_ = read_options.max_skippable_internal_keys; if (pin_thru_lifetime_) { pinned_iters_mgr_.StartPinning(); } if (iter_.iter()) { iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_); } + assert(timestamp_size_ == user_comparator_.timestamp_size()); } Status DBIter::GetProperty(std::string prop_name, std::string* prop) { @@ -220,9 +223,13 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, is_key_seqnum_zero_ = (ikey_.sequence == 0); assert(iterate_upper_bound_ == nullptr || iter_.MayBeOutOfUpperBound() || - user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) < 0); + user_comparator_.CompareWithoutTimestamp( + ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_, + /*b_has_ts=*/false) < 0); if (iterate_upper_bound_ != nullptr && iter_.MayBeOutOfUpperBound() && - user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) { + user_comparator_.CompareWithoutTimestamp( + ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_, + /*b_has_ts=*/false) >= 0) { break; } @@ -237,20 +244,25 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, return false; } - if (IsVisible(ikey_.sequence)) { + assert(ikey_.user_key.size() >= timestamp_size_); + Slice ts; + if (timestamp_size_ > 0) { + ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_); + } + if (IsVisible(ikey_.sequence, ts)) { // If the previous entry is of seqnum 0, the current entry will not // possibly be skipped. This condition can potentially be relaxed to // prev_key.seq <= ikey_.sequence. We are cautious because it will be more // prone to bugs causing the same user key with the same sequence number. if (!is_prev_key_seqnum_zero && skipping_saved_key && - user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey()) <= - 0) { + user_comparator_.CompareWithoutTimestamp( + ikey_.user_key, saved_key_.GetUserKey()) <= 0) { num_skipped++; // skip this entry PERF_COUNTER_ADD(internal_key_skipped_count, 1); } else { assert(!skipping_saved_key || - user_comparator_.Compare(ikey_.user_key, - saved_key_.GetUserKey()) > 0); + user_comparator_.CompareWithoutTimestamp( + ikey_.user_key, saved_key_.GetUserKey()) > 0); num_skipped = 0; reseek_done = false; switch (ikey_.type) { @@ -356,8 +368,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, // This key was inserted after our snapshot was taken. // If this happens too many times in a row for the same user key, we want // to seek to the target sequence number. - int cmp = - user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey()); + int cmp = user_comparator_.CompareWithoutTimestamp( + ikey_.user_key, saved_key_.GetUserKey()); if (cmp == 0 || (skipping_saved_key && cmp < 0)) { num_skipped++; } else { @@ -388,8 +400,17 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, // We're looking for the next user-key but all we see are the same // user-key with decreasing sequence numbers. Fast forward to // sequence number 0 and type deletion (the smallest type). - AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(), - 0, kTypeDeletion)); + if (timestamp_size_ == 0) { + AppendInternalKey( + &last_key, + ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion)); + } else { + std::string min_ts(timestamp_size_, static_cast(0)); + AppendInternalKeyWithDifferentTimestamp( + &last_key, + ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion), + min_ts); + } // Don't set skipping_saved_key = false because we may still see more // user-keys equal to saved_key_. } else { @@ -398,9 +419,17 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, // Note that this only covers a case when a higher key was overwritten // many times since our snapshot was taken, not the case when a lot of // different keys were inserted after our snapshot was taken. - AppendInternalKey(&last_key, - ParsedInternalKey(saved_key_.GetUserKey(), sequence_, - kValueTypeForSeek)); + if (timestamp_size_ == 0) { + AppendInternalKey( + &last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_, + kValueTypeForSeek)); + } else { + AppendInternalKeyWithDifferentTimestamp( + &last_key, + ParsedInternalKey(saved_key_.GetUserKey(), sequence_, + kValueTypeForSeek), + *timestamp_ub_); + } } iter_.Seek(last_key); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); @@ -519,6 +548,13 @@ bool DBIter::MergeValuesNewToOld() { } void DBIter::Prev() { + if (timestamp_size_ > 0) { + valid_ = false; + status_ = Status::NotSupported( + "SeekToLast/SeekForPrev/Prev currently not supported with timestamp."); + return; + } + assert(valid_); assert(status_.ok()); @@ -697,7 +733,13 @@ bool DBIter::FindValueForCurrentKey() { return false; } - if (!IsVisible(ikey.sequence) || + assert(ikey.user_key.size() >= timestamp_size_); + Slice ts; + if (timestamp_size_ > 0) { + ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_, + timestamp_size_); + } + if (!IsVisible(ikey.sequence, ts) || !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) { break; } @@ -853,6 +895,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (!ParseKey(&ikey)) { return false; } + assert(ikey.user_key.size() >= timestamp_size_); + Slice ts; + if (timestamp_size_ > 0) { + ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_, + timestamp_size_); + } + if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) { // No visible values for this key, even though FindValueForCurrentKey() // has seen some. This is possible if we're using a tailing iterator, and @@ -861,7 +910,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { return true; } - if (IsVisible(ikey.sequence)) { + if (IsVisible(ikey.sequence, ts)) { break; } @@ -1001,7 +1050,13 @@ bool DBIter::FindUserKeyBeforeSavedKey() { } assert(ikey.sequence != kMaxSequenceNumber); - if (!IsVisible(ikey.sequence)) { + assert(ikey.user_key.size() >= timestamp_size_); + Slice ts; + if (timestamp_size_ > 0) { + ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_, + timestamp_size_); + } + if (!IsVisible(ikey.sequence, ts)) { PERF_COUNTER_ADD(internal_recent_skipped_count, 1); } else { PERF_COUNTER_ADD(internal_key_skipped_count, 1); @@ -1046,10 +1101,18 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) { return false; } -bool DBIter::IsVisible(SequenceNumber sequence) { +bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts) { + // Remember that comparator orders preceding timestamp as larger. + int cmp_ts = timestamp_ub_ != nullptr + ? user_comparator_.CompareTimestamp(ts, *timestamp_ub_) + : 0; + if (cmp_ts > 0) { + return false; + } if (read_callback_ == nullptr) { return sequence <= sequence_; } else { + // TODO(yanqin): support timestamp in read_callback_. return read_callback_->IsVisible(sequence); } } @@ -1058,14 +1121,16 @@ void DBIter::SetSavedKeyToSeekTarget(const Slice& target) { is_key_seqnum_zero_ = false; SequenceNumber seq = sequence_; saved_key_.Clear(); - saved_key_.SetInternalKey(target, seq); + saved_key_.SetInternalKey(target, seq, kValueTypeForSeek, timestamp_ub_); if (iterate_lower_bound_ != nullptr && - user_comparator_.Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) < - 0) { + user_comparator_.CompareWithoutTimestamp( + saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_, + /*b_has_ts=*/false) < 0) { // Seek key is smaller than the lower bound. saved_key_.Clear(); - saved_key_.SetInternalKey(*iterate_lower_bound_, seq); + saved_key_.SetInternalKey(*iterate_lower_bound_, seq, kValueTypeForSeek, + timestamp_ub_); } } @@ -1155,6 +1220,13 @@ void DBIter::SeekForPrev(const Slice& target) { } #endif // ROCKSDB_LITE + if (timestamp_size_ > 0) { + valid_ = false; + status_ = Status::NotSupported( + "SeekToLast/SeekForPrev/Prev currently not supported with timestamp."); + return; + } + status_ = Status::OK(); ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); @@ -1248,6 +1320,13 @@ void DBIter::SeekToFirst() { } void DBIter::SeekToLast() { + if (timestamp_size_ > 0) { + valid_ = false; + status_ = Status::NotSupported( + "SeekToLast/SeekForPrev/Prev currently not supported with timestamp."); + return; + } + if (iterate_upper_bound_ != nullptr) { // Seek to last key strictly less than ReadOptions.iterate_upper_bound. SeekForPrev(*iterate_upper_bound_); diff --git a/db/db_iter.h b/db/db_iter.h index 32704e4d5..c2f545c4f 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -146,7 +146,8 @@ class DBIter final : public Iterator { if (start_seqnum_ > 0) { return saved_key_.GetInternalKey(); } else { - return saved_key_.GetUserKey(); + const Slice ukey_and_ts = saved_key_.GetUserKey(); + return Slice(ukey_and_ts.data(), ukey_and_ts.size() - timestamp_size_); } } Slice value() const override { @@ -169,6 +170,13 @@ class DBIter final : public Iterator { return status_; } } + Slice timestamp() const override { + assert(valid_); + assert(timestamp_size_ > 0); + const Slice ukey_and_ts = saved_key_.GetUserKey(); + assert(timestamp_size_ < ukey_and_ts.size()); + return ExtractTimestampFromUserKey(ukey_and_ts, timestamp_size_); + } bool IsBlob() const { assert(valid_ && (allow_blob_ || !is_blob_)); return is_blob_; @@ -178,6 +186,8 @@ class DBIter final : public Iterator { void Next() final override; void Prev() final override; + // 'target' does not contain timestamp, even if user timestamp feature is + // enabled. void Seek(const Slice& target) final override; void SeekForPrev(const Slice& target) final override; void SeekToFirst() final override; @@ -221,7 +231,7 @@ class DBIter final : public Iterator { // entry can be found within the prefix. void PrevInternal(const Slice* prefix); bool TooManyInternalKeysSkipped(bool increment = true); - bool IsVisible(SequenceNumber sequence); + bool IsVisible(SequenceNumber sequence, const Slice& ts); // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // is called @@ -327,6 +337,8 @@ class DBIter final : public Iterator { // for diff snapshots we want the lower bound on the seqnum; // if this value > 0 iterator will return internal keys SequenceNumber start_seqnum_; + const Slice* const timestamp_ub_; + const size_t timestamp_size_; }; // Return a new iterator that converts internal keys (yielded by diff --git a/db/dbformat.cc b/db/dbformat.cc index e10af2b85..46137feb7 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -75,6 +75,15 @@ void AppendInternalKey(std::string* result, const ParsedInternalKey& key) { PutFixed64(result, PackSequenceAndType(key.sequence, key.type)); } +void AppendInternalKeyWithDifferentTimestamp(std::string* result, + const ParsedInternalKey& key, + const Slice& ts) { + assert(key.user_key.size() >= ts.size()); + result->append(key.user_key.data(), key.user_key.size() - ts.size()); + result->append(ts.data(), ts.size()); + PutFixed64(result, PackSequenceAndType(key.sequence, key.type)); +} + void AppendInternalKeyFooter(std::string* result, SequenceNumber s, ValueType t) { PutFixed64(result, PackSequenceAndType(s, t)); diff --git a/db/dbformat.h b/db/dbformat.h index c3c75afd7..e664fa9b2 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -104,6 +104,7 @@ struct ParsedInternalKey { ParsedInternalKey() : sequence(kMaxSequenceNumber) // Make code analyzer happy {} // Intentionally left uninitialized (for speed) + // u contains timestamp if user timestamp feature is enabled. ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t) : user_key(u), sequence(seq), type(t) {} std::string DebugString(bool hex = false) const; @@ -132,6 +133,12 @@ EntryType GetEntryType(ValueType value_type); // Append the serialization of "key" to *result. extern void AppendInternalKey(std::string* result, const ParsedInternalKey& key); + +// Append the serialization of "key" to *result, replacing the original +// timestamp with argument ts. +extern void AppendInternalKeyWithDifferentTimestamp( + std::string* result, const ParsedInternalKey& key, const Slice& ts); + // Serialized internal key consists of user key followed by footer. // This function appends the footer to *result, assuming that *result already // contains the user key at the end. @@ -192,7 +199,8 @@ class InternalKeyComparator public: explicit InternalKeyComparator(const Comparator* c) - : user_comparator_(c), + : Comparator(c->timestamp_size()), + user_comparator_(c), name_("rocksdb.InternalKeyComparator:" + std::string(user_comparator_.Name())) {} virtual ~InternalKeyComparator() {} @@ -439,24 +447,31 @@ class IterKey { void SetInternalKey(const Slice& key_prefix, const Slice& user_key, SequenceNumber s, - ValueType value_type = kValueTypeForSeek) { + ValueType value_type = kValueTypeForSeek, + const Slice* ts = nullptr) { size_t psize = key_prefix.size(); size_t usize = user_key.size(); - EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t)); + size_t ts_sz = (ts != nullptr ? ts->size() : 0); + EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t) + ts_sz); if (psize > 0) { memcpy(buf_, key_prefix.data(), psize); } memcpy(buf_ + psize, user_key.data(), usize); - EncodeFixed64(buf_ + usize + psize, PackSequenceAndType(s, value_type)); + if (ts) { + memcpy(buf_ + psize + usize, ts->data(), ts_sz); + } + EncodeFixed64(buf_ + usize + psize + ts_sz, + PackSequenceAndType(s, value_type)); key_ = buf_; - key_size_ = psize + usize + sizeof(uint64_t); + key_size_ = psize + usize + sizeof(uint64_t) + ts_sz; is_user_key_ = false; } void SetInternalKey(const Slice& user_key, SequenceNumber s, - ValueType value_type = kValueTypeForSeek) { - SetInternalKey(Slice(), user_key, s, value_type); + ValueType value_type = kValueTypeForSeek, + const Slice* ts = nullptr) { + SetInternalKey(Slice(), user_key, s, value_type, ts); } void Reserve(size_t size) { diff --git a/db/version_set.cc b/db/version_set.cc index 6476a9cca..834f1d2e0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -955,8 +955,8 @@ class LevelIterator final : public InternalIterator { bool KeyReachedUpperBound(const Slice& internal_key) { return read_options_.iterate_upper_bound != nullptr && user_comparator_.CompareWithoutTimestamp( - ExtractUserKey(internal_key), - *read_options_.iterate_upper_bound) >= 0; + ExtractUserKey(internal_key), /*a_has_ts=*/true, + *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0; } InternalIterator* NewFileIterator() { diff --git a/include/rocksdb/comparator.h b/include/rocksdb/comparator.h index 76981d108..53a46ad33 100644 --- a/include/rocksdb/comparator.h +++ b/include/rocksdb/comparator.h @@ -42,6 +42,9 @@ class Comparator { // < 0 iff "a" < "b", // == 0 iff "a" == "b", // > 0 iff "a" > "b" + // Note that Compare(a, b) also compares timestamp if timestamp size is + // non-zero. For the same user key with different timestamps, larger (newer) + // timestamp comes first. virtual int Compare(const Slice& a, const Slice& b) const = 0; // Compares two slices for equality. The following invariant should always @@ -97,15 +100,27 @@ class Comparator { inline size_t timestamp_size() const { return timestamp_size_; } - virtual int CompareWithoutTimestamp(const Slice& a, const Slice& b) const { - return Compare(a, b); + int CompareWithoutTimestamp(const Slice& a, const Slice& b) const { + return CompareWithoutTimestamp(a, /*a_has_ts=*/true, b, /*b_has_ts=*/true); } + // For two events e1 and e2 whose timestamps are t1 and t2 respectively, + // Returns value: + // < 0 iff t1 < t2 + // == 0 iff t1 == t2 + // > 0 iff t1 > t2 + // Note that an all-zero byte array will be the smallest (oldest) timestamp + // of the same length. virtual int CompareTimestamp(const Slice& /*ts1*/, const Slice& /*ts2*/) const { return 0; } + virtual int CompareWithoutTimestamp(const Slice& a, bool /*a_has_ts*/, + const Slice& b, bool /*b_has_ts*/) const { + return Compare(a, b); + } + private: size_t timestamp_size_; }; diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 2f8f1e385..aa2f2a3ff 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -45,6 +45,7 @@ class Iterator : public Cleanable { // Position at the last key in the source. The iterator is // Valid() after this call iff the source is not empty. + // Currently incompatible with user timestamp. virtual void SeekToLast() = 0; // Position at the first key in the source that at or past target. @@ -53,11 +54,13 @@ class Iterator : public Cleanable { // All Seek*() methods clear any error status() that the iterator had prior to // the call; after the seek, status() indicates only the error (if any) that // happened during the seek, not any past errors. + // Target does not contain timestamp. virtual void Seek(const Slice& target) = 0; // Position at the last key in the source that at or before target. // The iterator is Valid() after this call iff the source contains // an entry that comes at or before target. + // Currently incompatible with user timestamp. virtual void SeekForPrev(const Slice& target) = 0; // Moves to the next entry in the source. After this call, Valid() is @@ -67,6 +70,7 @@ class Iterator : public Cleanable { // Moves to the previous entry in the source. After this call, Valid() is // true iff the iterator was not positioned at the first entry in source. + // Currently incompatible with user timestamp. // REQUIRES: Valid() virtual void Prev() = 0; @@ -108,6 +112,11 @@ class Iterator : public Cleanable { // Get the user-key portion of the internal key at which the iteration // stopped. virtual Status GetProperty(std::string prop_name, std::string* prop); + + virtual Slice timestamp() const { + assert(false); + return Slice(); + } }; // Return an empty iterator (yields nothing). diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 1c10855bf..6a0d8ef5f 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -3124,8 +3124,9 @@ void BlockBasedTableIterator::FindBlockForward() { read_options_.iterate_upper_bound != nullptr && block_iter_points_to_real_block_ && !data_block_within_upper_bound_; assert(!next_block_is_out_of_bound || - user_comparator_.Compare(*read_options_.iterate_upper_bound, - index_iter_->user_key()) <= 0); + user_comparator_.CompareWithoutTimestamp( + *read_options_.iterate_upper_bound, /*a_has_ts=*/false, + index_iter_->user_key(), /*b_has_ts=*/true) <= 0); ResetDataIter(); index_iter_->Next(); if (next_block_is_out_of_bound) { @@ -3184,8 +3185,10 @@ void BlockBasedTableIterator::FindKeyBackward() { template void BlockBasedTableIterator::CheckOutOfBound() { if (read_options_.iterate_upper_bound != nullptr && Valid()) { - is_out_of_bound_ = user_comparator_.Compare( - *read_options_.iterate_upper_bound, user_key()) <= 0; + is_out_of_bound_ = + user_comparator_.CompareWithoutTimestamp( + *read_options_.iterate_upper_bound, /*a_has_ts=*/false, user_key(), + /*b_has_ts=*/true) <= 0; } } @@ -3195,8 +3198,10 @@ void BlockBasedTableIteratoruser_key()) > 0); + (user_comparator_.CompareWithoutTimestamp( + *read_options_.iterate_upper_bound, /*a_has_ts=*/false, + index_iter_->user_key(), + /*b_has_ts=*/true) > 0); } } diff --git a/table/internal_iterator.h b/table/internal_iterator.h index 1d36e5019..fef6afe70 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -52,6 +52,7 @@ class InternalIteratorBase : public Cleanable { // All Seek*() methods clear any error status() that the iterator had prior to // the call; after the seek, status() indicates only the error (if any) that // happened during the seek, not any past errors. + // 'target' contains user timestamp if timestamp is enabled. virtual void Seek(const Slice& target) = 0; // Position at the first key in the source that at or before target diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 48d32d2ef..d33e4dbca 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1749,7 +1749,7 @@ void DBDumperCommand::DoDumpCommand() { break; if (is_db_ttl_) { TtlIterator* it_ttl = static_cast_with_check(iter); - rawtime = it_ttl->timestamp(); + rawtime = it_ttl->ttl_timestamp(); if (rawtime < ttl_start || rawtime >= ttl_end) { continue; } @@ -2578,7 +2578,7 @@ void ScanCommand::DoCommand() { it->Next()) { if (is_db_ttl_) { TtlIterator* it_ttl = static_cast_with_check(it); - int rawtime = it_ttl->timestamp(); + int rawtime = it_ttl->ttl_timestamp(); if (rawtime < ttl_start || rawtime >= ttl_end) { continue; } diff --git a/util/comparator.cc b/util/comparator.cc index 48340bd96..44d45732a 100644 --- a/util/comparator.cc +++ b/util/comparator.cc @@ -125,7 +125,9 @@ class BytewiseComparatorImpl : public Comparator { return false; } - int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override { + using Comparator::CompareWithoutTimestamp; + int CompareWithoutTimestamp(const Slice& a, bool /*a_has_ts*/, const Slice& b, + bool /*b_has_ts*/) const override { return a.compare(b); } }; @@ -197,7 +199,9 @@ class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl { return false; } - int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override { + using Comparator::CompareWithoutTimestamp; + int CompareWithoutTimestamp(const Slice& a, bool /*a_has_ts*/, const Slice& b, + bool /*b_has_ts*/) const override { return -a.compare(b); } }; diff --git a/util/user_comparator_wrapper.h b/util/user_comparator_wrapper.h index fdb27f33e..e8d18237a 100644 --- a/util/user_comparator_wrapper.h +++ b/util/user_comparator_wrapper.h @@ -18,8 +18,8 @@ namespace ROCKSDB_NAMESPACE { class UserComparatorWrapper final : public Comparator { public: explicit UserComparatorWrapper(const Comparator* const user_cmp) - : user_comparator_(user_cmp) {} - + : Comparator(user_cmp->timestamp_size()), user_comparator_(user_cmp) {} + ~UserComparatorWrapper() = default; const Comparator* user_comparator() const { return user_comparator_; } @@ -58,6 +58,17 @@ class UserComparatorWrapper final : public Comparator { return user_comparator_->CanKeysWithDifferentByteContentsBeEqual(); } + int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override { + return user_comparator_->CompareTimestamp(ts1, ts2); + } + + using Comparator::CompareWithoutTimestamp; + int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b, + bool b_has_ts) const override { + PERF_COUNTER_ADD(user_key_comparison_count, 1); + return user_comparator_->CompareWithoutTimestamp(a, a_has_ts, b, b_has_ts); + } + private: const Comparator* user_comparator_; }; diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index ab6063a47..5c1f7c79b 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -129,7 +129,7 @@ class TtlIterator : public Iterator { Slice key() const override { return iter_->key(); } - int32_t timestamp() const { + int32_t ttl_timestamp() const { return DecodeFixed32(iter_->value().data() + iter_->value().size() - DBWithTTLImpl::kTSLength); }