diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 60e2681fa..efe27870d 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -383,7 +383,13 @@ bool Compaction::KeyNotExistsBeyondOutputLevel( auto* f = files[level_ptrs->at(lvl)]; if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { // We've advanced far enough - if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) { + // In the presence of user-defined timestamp, we may need to handle + // the case in which f->smallest.user_key() (including ts) has the + // same user key, but the ts part is smaller. If so, + // Compare(user_key, f->smallest.user_key()) returns -1. + // That's why we need CompareWithoutTimestamp(). + if (user_cmp->CompareWithoutTimestamp(user_key, + f->smallest.user_key()) >= 0) { // Key falls in this file's range, so it may // exist beyond output level return false; diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 4555ec568..1e2aff520 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -44,7 +44,8 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, const std::atomic* manual_compaction_paused, - const std::shared_ptr info_log) + const std::shared_ptr info_log, + const std::string* full_history_ts_low) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, @@ -53,7 +54,7 @@ CompactionIterator::CompactionIterator( std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), compaction_filter, shutting_down, preserve_deletes_seqnum, - manual_compaction_paused, info_log) {} + manual_compaction_paused, info_log, full_history_ts_low) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -68,7 +69,8 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, const std::atomic* manual_compaction_paused, - const std::shared_ptr info_log) + const std::shared_ptr info_log, + const std::string* full_history_ts_low) : input_(input), cmp_(cmp), merge_helper_(merge_helper), @@ -90,7 +92,10 @@ CompactionIterator::CompactionIterator( merge_out_iter_(merge_helper_), current_key_committed_(false), info_log_(info_log), - allow_data_in_errors_(allow_data_in_errors) { + allow_data_in_errors_(allow_data_in_errors), + timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0), + full_history_ts_low_(full_history_ts_low), + cmp_with_history_ts_low_(0) { assert(compaction_filter_ == nullptr || compaction_ != nullptr); assert(snapshots_ != nullptr); bottommost_level_ = compaction_ == nullptr @@ -117,6 +122,8 @@ CompactionIterator::CompactionIterator( for (size_t i = 1; i < snapshots_->size(); ++i) { assert(snapshots_->at(i - 1) < snapshots_->at(i)); } + assert(timestamp_size_ == 0 || !full_history_ts_low_ || + timestamp_size_ == full_history_ts_low_->size()); #endif input_->SetPinnedItersMgr(&pinned_iters_mgr_); TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get()); @@ -298,7 +305,8 @@ void CompactionIterator::NextFromInput() { TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); // Update input statistics - if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { + if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion || + ikey_.type == kTypeDeletionWithTimestamp) { iter_stats_.num_input_deletion_records++; } iter_stats_.total_input_raw_key_bytes += key_.size(); @@ -319,11 +327,33 @@ void CompactionIterator::NextFromInput() { // First occurrence of this user key // Copy key for output key_ = current_key_.SetInternalKey(key_, &ikey_); + + UpdateTimestampAndCompareWithFullHistoryLow(); + + // If + // (1) !has_current_user_key_, OR + // (2) timestamp is disabled, OR + // (3) all history will be preserved, OR + // (4) user key (excluding timestamp) is different from previous key, OR + // (5) timestamp is NO older than *full_history_ts_low_ + // then current_user_key_ must be treated as a different user key. + // This means, if a user key (excluding ts) is the same as the previous + // user key, and its ts is older than *full_history_ts_low_, then we + // consider this key for GC, e.g. it may be dropped if certain conditions + // match. + if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ || + 0 != cmp_->CompareWithoutTimestamp(ikey_.user_key, + current_user_key_) || + cmp_with_history_ts_low_ >= 0) { + // Initialize for future comparison for rule (A) and etc. + current_user_key_sequence_ = kMaxSequenceNumber; + current_user_key_snapshot_ = 0; + has_current_user_key_ = true; + } current_user_key_ = ikey_.user_key; - has_current_user_key_ = true; + has_outputted_key_ = false; - current_user_key_sequence_ = kMaxSequenceNumber; - current_user_key_snapshot_ = 0; + current_key_committed_ = KeyCommitted(ikey_.sequence); // Apply the compaction filter to the first committed version of the user @@ -543,9 +573,12 @@ void CompactionIterator::NextFromInput() { last_sequence, current_user_key_sequence_); } - ++iter_stats_.num_record_drop_hidden; // (A) + ++iter_stats_.num_record_drop_hidden; // rule (A) input_->Next(); - } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && + } else if (compaction_ != nullptr && + (ikey_.type == kTypeDeletion || + (ikey_.type == kTypeDeletionWithTimestamp && + cmp_with_history_ts_low_ < 0)) && IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikeyNotNeededForIncrementalSnapshot() && compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, @@ -569,13 +602,19 @@ void CompactionIterator::NextFromInput() { // given that: // (1) The deletion is earlier than earliest_write_conflict_snapshot, and // (2) No value exist earlier than the deletion. + // + // Note also that a deletion marker of type kTypeDeletionWithTimestamp + // will be treated as a different user key unless the timestamp is older + // than *full_history_ts_low_. ++iter_stats_.num_record_drop_obsolete; if (!bottommost_level_) { ++iter_stats_.num_optimized_del_drop_obsolete; } input_->Next(); - } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ && - ikeyNotNeededForIncrementalSnapshot()) { + } else if ((ikey_.type == kTypeDeletion || + (ikey_.type == kTypeDeletionWithTimestamp && + cmp_with_history_ts_low_ < 0)) && + bottommost_level_ && ikeyNotNeededForIncrementalSnapshot()) { // Handle the case where we have a delete key at the bottom most level // We can skip outputting the key iff there are no subsequent puts for this // key @@ -583,12 +622,17 @@ void CompactionIterator::NextFromInput() { ikey_.user_key, &level_ptrs_)); ParsedInternalKey next_ikey; input_->Next(); - // Skip over all versions of this key that happen to occur in the same snapshot - // range as the delete + // Skip over all versions of this key that happen to occur in the same + // snapshot range as the delete. + // + // Note that a deletion marker of type kTypeDeletionWithTimestamp will be + // considered to have a different user key unless the timestamp is older + // than *full_history_ts_low_. while (!IsPausingManualCompaction() && !IsShuttingDown() && input_->Valid() && (ParseInternalKey(input_->key(), &next_ikey) == Status::OK()) && - cmp_->Equal(ikey_.user_key, next_ikey.user_key) && + 0 == cmp_->CompareWithoutTimestamp(ikey_.user_key, + next_ikey.user_key) && (prev_snapshot == 0 || DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) { input_->Next(); @@ -597,7 +641,8 @@ void CompactionIterator::NextFromInput() { // delete too if (input_->Valid() && (ParseInternalKey(input_->key(), &next_ikey) == Status::OK()) && - cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { + 0 == cmp_->CompareWithoutTimestamp(ikey_.user_key, + next_ikey.user_key)) { valid_ = true; at_next_ = true; } @@ -735,7 +780,18 @@ void CompactionIterator::PrepareOutput() { ikey_.type); } ikey_.sequence = 0; - current_key_.UpdateInternalKey(0, ikey_.type); + if (!timestamp_size_) { + current_key_.UpdateInternalKey(0, ikey_.type); + } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) { + // We can also zero out timestamp for better compression. + // For the same user key (excluding timestamp), the timestamp-based + // history can be collapsed to save some space if the timestamp is + // older than *full_history_ts_low_. + const std::string kTsMin(timestamp_size_, static_cast(0)); + const Slice ts_slice = kTsMin; + ikey_.SetTimestamp(ts_slice); + current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice); + } } } } diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 29dedd3c7..b8454593c 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -75,7 +75,8 @@ class CompactionIterator { const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, const std::atomic* manual_compaction_paused = nullptr, - const std::shared_ptr info_log = nullptr); + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr); // Constructor with custom CompactionProxy, used for tests. CompactionIterator(InternalIterator* input, const Comparator* cmp, @@ -92,7 +93,8 @@ class CompactionIterator { const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, const std::atomic* manual_compaction_paused = nullptr, - const std::shared_ptr info_log = nullptr); + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr); ~CompactionIterator(); @@ -152,6 +154,18 @@ class CompactionIterator { bool IsInEarliestSnapshot(SequenceNumber sequence); + // Extract user-defined timestamp from user key if possible and compare it + // with *full_history_ts_low_ if applicable. + inline void UpdateTimestampAndCompareWithFullHistoryLow() { + if (full_history_ts_low_) { + assert(timestamp_size_ > 0); + current_ts_ = + ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_); + cmp_with_history_ts_low_ = + cmp_->CompareTimestamp(current_ts_, *full_history_ts_low_); + } + } + InternalIterator* input_; const Comparator* cmp_; MergeHelper* merge_helper_; @@ -199,11 +213,13 @@ class CompactionIterator { // Stores whether ikey_.user_key is valid. If set to false, the user key is // not compared against the current key in the underlying iterator. bool has_current_user_key_ = false; - bool at_next_ = false; // If false, the iterator - // Holds a copy of the current compaction iterator output (or current key in - // the underlying iterator during NextFromInput()). + // If false, the iterator holds a copy of the current compaction iterator + // output (or current key in the underlying iterator during NextFromInput()). + bool at_next_ = false; + IterKey current_key_; Slice current_user_key_; + Slice current_ts_; SequenceNumber current_user_key_sequence_; SequenceNumber current_user_key_snapshot_; @@ -237,6 +253,19 @@ class CompactionIterator { bool allow_data_in_errors_; + // Comes from comparator. + const size_t timestamp_size_; + + // Lower bound timestamp to retain full history in terms of user-defined + // timestamp. If a key's timestamp is older than full_history_ts_low_, then + // the key *may* be eligible for garbage collection (GC). The skipping logic + // is in `NextFromInput()` and `PrepareOutput()`. + // If nullptr, NO GC will be performed and all history will be preserved. + const std::string* const full_history_ts_low_; + + // Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_) + int cmp_with_history_ts_low_; + bool IsShuttingDown() { // This is a best-effort facility, so memory_order_relaxed is sufficient. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 57db42489..a88809eab 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -216,6 +216,9 @@ class CompactionIteratorTest : public testing::TestWithParam { CompactionIteratorTest() : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {} + explicit CompactionIteratorTest(const Comparator* ucmp) + : cmp_(ucmp), icmp_(cmp_), snapshots_({}) {} + void InitIterators( const std::vector& ks, const std::vector& vs, const std::vector& range_del_ks, @@ -224,7 +227,9 @@ class CompactionIteratorTest : public testing::TestWithParam { SequenceNumber last_committed_sequence = kMaxSequenceNumber, MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr, bool bottommost_level = false, - SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) { + SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, + bool key_not_exists_beyond_output_level = false, + const std::string* full_history_ts_low = nullptr) { std::unique_ptr unfragmented_range_del_iter( new test::VectorIterator(range_del_ks, range_del_vs)); auto tombstone_list = std::make_shared( @@ -236,10 +241,12 @@ class CompactionIteratorTest : public testing::TestWithParam { range_del_agg_->AddTombstones(std::move(range_del_iter)); std::unique_ptr compaction; - if (filter || bottommost_level) { + if (filter || bottommost_level || key_not_exists_beyond_output_level) { compaction_proxy_ = new FakeCompaction(); compaction_proxy_->is_bottommost_level = bottommost_level; compaction_proxy_->is_allow_ingest_behind = AllowIngestBehind(); + compaction_proxy_->key_not_exists_beyond_output_level = + key_not_exists_beyond_output_level; compaction.reset(compaction_proxy_); } bool use_snapshot_checker = UseSnapshotChecker() || GetParam(); @@ -252,6 +259,11 @@ class CompactionIteratorTest : public testing::TestWithParam { 0 /*latest_snapshot*/, snapshot_checker_.get(), 0 /*level*/, nullptr /*statistics*/, &shutting_down_)); + if (c_iter_) { + // Since iter_ is still used in ~CompactionIterator(), we call + // ~CompactionIterator() first. + c_iter_.reset(); + } iter_.reset(new LoggingForwardVectorIterator(ks, vs)); iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator( @@ -260,7 +272,9 @@ class CompactionIteratorTest : public testing::TestWithParam { Env::Default(), false /* report_detailed_time */, false, range_del_agg_.get(), nullptr /* blob_file_builder */, false /*allow_data_in_errors*/, std::move(compaction), filter, - &shutting_down_)); + &shutting_down_, /*preserve_deletes_seqnum=*/0, + /*manual_compaction_paused=*/nullptr, /*info_log=*/nullptr, + full_history_ts_low)); } void AddSnapshot(SequenceNumber snapshot, @@ -282,10 +296,13 @@ class CompactionIteratorTest : public testing::TestWithParam { MergeOperator* merge_operator = nullptr, CompactionFilter* compaction_filter = nullptr, bool bottommost_level = false, - SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) { + SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, + bool key_not_exists_beyond_output_level = false, + const std::string* full_history_ts_low = nullptr) { InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber, last_committed_seq, merge_operator, compaction_filter, - bottommost_level, earliest_write_conflict_snapshot); + bottommost_level, earliest_write_conflict_snapshot, + key_not_exists_beyond_output_level, full_history_ts_low); c_iter_->SeekToFirst(); for (size_t i = 0; i < expected_keys.size(); i++) { std::string info = "i = " + ToString(i); @@ -299,6 +316,11 @@ class CompactionIteratorTest : public testing::TestWithParam { ASSERT_FALSE(c_iter_->Valid()); } + void ClearSnapshots() { + snapshots_.clear(); + snapshot_map_.clear(); + } + const Comparator* cmp_; const InternalKeyComparator icmp_; std::vector snapshots_; @@ -1033,6 +1055,185 @@ INSTANTIATE_TEST_CASE_P(CompactionIteratorWithAllowIngestBehindTestInstance, CompactionIteratorWithAllowIngestBehindTest, testing::Values(true, false)); +class CompactionIteratorTsGcTest : public CompactionIteratorTest { + public: + CompactionIteratorTsGcTest() + : CompactionIteratorTest(test::ComparatorWithU64Ts()) {} +}; + +TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeValue), + test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, + kTypeDeletionWithTimestamp)}; + const std::vector input_values = {"a3", ""}; + std::string full_history_ts_low; + // All keys' timestamps are newer than or equal to 102, thus none of them + // will be eligible for GC. + PutFixed64(&full_history_ts_low, 102); + const std::vector& expected_keys = input_keys; + const std::vector& expected_values = input_values; + const std::vector> params = { + {false, false}, {false, true}, {true, true}}; + for (const std::pair& param : params) { + const bool bottommost_level = param.first; + const bool key_not_exists_beyond_output_level = param.second; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + bottommost_level, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + key_not_exists_beyond_output_level, &full_history_ts_low); + } +} + +TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue), + test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeValue)}; + const std::vector input_values = {"", "a2", "a1"}; + std::string full_history_ts_low; + // All keys' timestamps are older than 104. + PutFixed64(&full_history_ts_low, 104); + { + // With a snapshot at seq 3, both the deletion marker and the key at 3 must + // be preserved. + AddSnapshot(3); + const std::vector expected_keys = {input_keys[0], + input_keys[1]}; + const std::vector expected_values = {"", "a2"}; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/false, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low); + ClearSnapshots(); + } + { + // No snapshot, the deletion marker should be preserved because the user + // key may appear beyond output level. + const std::vector expected_keys = {input_keys[0]}; + const std::vector expected_values = {""}; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/false, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low); + } + { + // No snapshot, the deletion marker can be dropped because the user key + // does not appear in higher levels. + const std::vector expected_keys = {}; + const std::vector expected_values = {}; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/false, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low); + } +} + +TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue), + test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeValue), + test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)}; + const std::vector input_values = {"", "a2", "a1", "a0"}; + { + std::string full_history_ts_low; + // Keys whose timestamps larger than or equal to 102 will be preserved. + PutFixed64(&full_history_ts_low, 102); + const std::vector expected_keys = {input_keys[0], + input_keys[1]}; + const std::vector expected_values = {"", "a2"}; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/false, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low); + } +} + +TEST_P(CompactionIteratorTsGcTest, DropTombstones) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue), + test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)}; + const std::vector input_values = {"", "a2", "", "a0"}; + const std::vector expected_keys = {input_keys[0], input_keys[1]}; + const std::vector expected_values = {"", "a2"}; + + // Take a snapshot at seq 2. + AddSnapshot(2); + + { + // Non-bottommost level, but key does not exist beyond output level. + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, 102); + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_sequence=*/kMaxSequenceNumber, + /*merge_op=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/false, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low); + } + { + // Bottommost level + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, 102); + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/true, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low); + } +} + +TEST_P(CompactionIteratorTsGcTest, RewriteTs) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue), + test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)}; + const std::vector input_values = {"", "a2", "", "a0"}; + const std::vector expected_keys = { + input_keys[0], input_keys[1], input_keys[2], + test::KeyStr(/*ts=*/0, user_key, /*seq=*/0, kTypeValue)}; + const std::vector expected_values = {"", "a2", "", "a0"}; + + AddSnapshot(1); + AddSnapshot(2); + + { + // Bottommost level and need to rewrite both ts and seq. + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, 102); + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/true, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low); + } +} + +INSTANTIATE_TEST_CASE_P(CompactionIteratorTsGcTestInstance, + CompactionIteratorTsGcTest, + testing::Values(true, false)); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_iter.cc b/db/db_iter.cc index a9eee88dd..c99383e2a 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -436,11 +436,11 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, &last_key, ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion)); } else { - std::string min_ts(timestamp_size_, static_cast(0)); + const std::string kTsMin(timestamp_size_, static_cast(0)); AppendInternalKeyWithDifferentTimestamp( &last_key, ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion), - min_ts); + kTsMin); } // Don't set skipping_saved_key = false because we may still see more // user-keys equal to saved_key_. diff --git a/db/dbformat.h b/db/dbformat.h index 81c852ac4..d55bbae44 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -119,6 +119,12 @@ struct ParsedInternalKey { sequence = 0; type = kTypeDeletion; } + + void SetTimestamp(const Slice& ts) { + assert(ts.size() <= user_key.size()); + const char* addr = user_key.data() - ts.size(); + memcpy(const_cast(addr), ts.data(), ts.size()); + } }; // Return the length of the encoding of "key". @@ -475,9 +481,14 @@ class IterKey { // Update the sequence number in the internal key. Guarantees not to // invalidate slices to the key (and the user key). - void UpdateInternalKey(uint64_t seq, ValueType t) { + void UpdateInternalKey(uint64_t seq, ValueType t, const Slice* ts = nullptr) { assert(!IsKeyPinned()); assert(key_size_ >= kNumInternalBytes); + if (ts) { + assert(key_size_ >= kNumInternalBytes + ts->size()); + memcpy(&buf_[key_size_ - kNumInternalBytes - ts->size()], ts->data(), + ts->size()); + } uint64_t newval = (seq << 8) | t; EncodeFixed64(&buf_[key_size_ - kNumInternalBytes], newval); } diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 24005e782..18349cf4c 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -205,6 +205,16 @@ std::string KeyStr(const std::string& user_key, const SequenceNumber& seq, return k.Encode().ToString(); } +std::string KeyStr(uint64_t ts, const std::string& user_key, + const SequenceNumber& seq, const ValueType& t, + bool corrupt) { + std::string user_key_with_ts(user_key); + std::string ts_str; + PutFixed64(&ts_str, ts); + user_key_with_ts.append(ts_str); + return KeyStr(user_key_with_ts, seq, t, corrupt); +} + std::string RandomName(Random* rnd, const size_t len) { std::stringstream ss; for (size_t i = 0; i < len; ++i) { diff --git a/test_util/testutil.h b/test_util/testutil.h index 4255a48f2..320cccc29 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -394,6 +394,10 @@ extern std::string KeyStr(const std::string& user_key, const SequenceNumber& seq, const ValueType& t, bool corrupt = false); +extern std::string KeyStr(uint64_t ts, const std::string& user_key, + const SequenceNumber& seq, const ValueType& t, + bool corrupt = false); + class SleepingBackgroundTask { public: SleepingBackgroundTask()