diff --git a/db/range_del_aggregator_v2.cc b/db/range_del_aggregator_v2.cc index 015a5b2d8..df219317f 100644 --- a/db/range_del_aggregator_v2.cc +++ b/db/range_del_aggregator_v2.cc @@ -104,9 +104,122 @@ void TruncatedRangeDelIterator::SeekToFirst() { iter_->SeekToTopFirst(); } void TruncatedRangeDelIterator::SeekToLast() { iter_->SeekToTopLast(); } +ForwardRangeDelIterator::ForwardRangeDelIterator( + const InternalKeyComparator* icmp, + const std::vector>* iters) + : icmp_(icmp), + iters_(iters), + unused_idx_(0), + active_seqnums_(SeqMaxComparator()), + active_iters_(EndKeyMinComparator(icmp)), + inactive_iters_(StartKeyMinComparator(icmp)) {} + +bool ForwardRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) { + assert(iters_ != nullptr); + // Pick up previously unseen iterators. + for (auto it = std::next(iters_->begin(), unused_idx_); it != iters_->end(); + ++it, ++unused_idx_) { + auto& iter = *it; + iter->Seek(parsed.user_key); + PushIter(iter.get(), parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + // Move active iterators that end before parsed. + while (!active_iters_.empty() && + icmp_->Compare((*active_iters_.top())->end_key(), parsed) <= 0) { + TruncatedRangeDelIterator* iter = PopActiveIter(); + do { + iter->Next(); + } while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0); + PushIter(iter, parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + // Move inactive iterators that start before parsed. + while (!inactive_iters_.empty() && + icmp_->Compare(inactive_iters_.top()->start_key(), parsed) <= 0) { + TruncatedRangeDelIterator* iter = PopInactiveIter(); + while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0) { + iter->Next(); + } + PushIter(iter, parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + return active_seqnums_.empty() + ? false + : (*active_seqnums_.begin())->seq() > parsed.sequence; +} + +void ForwardRangeDelIterator::Invalidate() { + unused_idx_ = 0; + active_iters_.clear(); + active_seqnums_.clear(); + inactive_iters_.clear(); +} + +ReverseRangeDelIterator::ReverseRangeDelIterator( + const InternalKeyComparator* icmp, + const std::vector>* iters) + : icmp_(icmp), + iters_(iters), + unused_idx_(0), + active_seqnums_(SeqMaxComparator()), + active_iters_(StartKeyMaxComparator(icmp)), + inactive_iters_(EndKeyMaxComparator(icmp)) {} + +bool ReverseRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) { + assert(iters_ != nullptr); + // Pick up previously unseen iterators. + for (auto it = std::next(iters_->begin(), unused_idx_); it != iters_->end(); + ++it, ++unused_idx_) { + auto& iter = *it; + iter->SeekForPrev(parsed.user_key); + PushIter(iter.get(), parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + // Move active iterators that start after parsed. + while (!active_iters_.empty() && + icmp_->Compare(parsed, (*active_iters_.top())->start_key()) < 0) { + TruncatedRangeDelIterator* iter = PopActiveIter(); + do { + iter->Prev(); + } while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0); + PushIter(iter, parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + // Move inactive iterators that end after parsed. + while (!inactive_iters_.empty() && + icmp_->Compare(parsed, inactive_iters_.top()->end_key()) < 0) { + TruncatedRangeDelIterator* iter = PopInactiveIter(); + while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0) { + iter->Prev(); + } + PushIter(iter, parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + return active_seqnums_.empty() + ? false + : (*active_seqnums_.begin())->seq() > parsed.sequence; +} + +void ReverseRangeDelIterator::Invalidate() { + unused_idx_ = 0; + active_iters_.clear(); + active_seqnums_.clear(); + inactive_iters_.clear(); +} + RangeDelAggregatorV2::RangeDelAggregatorV2(const InternalKeyComparator* icmp, SequenceNumber upper_bound) - : icmp_(icmp), upper_bound_(upper_bound) {} + : icmp_(icmp), + upper_bound_(upper_bound), + forward_iter_(icmp, &iters_), + reverse_iter_(icmp, &iters_) {} void RangeDelAggregatorV2::AddTombstones( std::unique_ptr input_iter, @@ -143,20 +256,24 @@ bool RangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed, if (wrapped_range_del_agg != nullptr) { return wrapped_range_del_agg->ShouldDelete(parsed, mode); } - // TODO: avoid re-seeking every call - for (auto& iter : iters_) { - iter->Seek(parsed.user_key); - if (iter->Valid() && icmp_->Compare(iter->start_key(), parsed) <= 0 && - iter->seq() > parsed.sequence) { - return true; - } + + switch (mode) { + case RangeDelPositioningMode::kForwardTraversal: + reverse_iter_.Invalidate(); + return forward_iter_.ShouldDelete(parsed); + case RangeDelPositioningMode::kBackwardTraversal: + forward_iter_.Invalidate(); + return reverse_iter_.ShouldDelete(parsed); + default: + assert(false); + return false; } - return false; } bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start, const Slice& end) { assert(wrapped_range_del_agg == nullptr); + InvalidateRangeDelMapPositions(); // Set the internal start/end keys so that: // - if start_ikey has the same user key and sequence number as the current diff --git a/db/range_del_aggregator_v2.h b/db/range_del_aggregator_v2.h index 6689fad50..e8c52052a 100644 --- a/db/range_del_aggregator_v2.h +++ b/db/range_del_aggregator_v2.h @@ -78,6 +78,167 @@ class TruncatedRangeDelIterator { std::list pinned_bounds_; }; +struct SeqMaxComparator { + bool operator()(const TruncatedRangeDelIterator* a, + const TruncatedRangeDelIterator* b) const { + return a->seq() > b->seq(); + } +}; + +class ForwardRangeDelIterator { + public: + ForwardRangeDelIterator( + const InternalKeyComparator* icmp, + const std::vector>* iters); + + bool ShouldDelete(const ParsedInternalKey& parsed); + void Invalidate(); + + private: + using ActiveSeqSet = + std::multiset; + + struct StartKeyMinComparator { + explicit StartKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {} + + bool operator()(const TruncatedRangeDelIterator* a, + const TruncatedRangeDelIterator* b) const { + return icmp->Compare(a->start_key(), b->start_key()) > 0; + } + + const InternalKeyComparator* icmp; + }; + struct EndKeyMinComparator { + explicit EndKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {} + + bool operator()(const ActiveSeqSet::const_iterator& a, + const ActiveSeqSet::const_iterator& b) const { + return icmp->Compare((*a)->end_key(), (*b)->end_key()) > 0; + } + + const InternalKeyComparator* icmp; + }; + + void PushIter(TruncatedRangeDelIterator* iter, + const ParsedInternalKey& parsed) { + if (!iter->Valid()) { + // The iterator has been fully consumed, so we don't need to add it to + // either of the heaps. + } else if (icmp_->Compare(parsed, iter->start_key()) < 0) { + PushInactiveIter(iter); + } else { + PushActiveIter(iter); + } + } + + void PushActiveIter(TruncatedRangeDelIterator* iter) { + auto seq_pos = active_seqnums_.insert(iter); + active_iters_.push(seq_pos); + } + + TruncatedRangeDelIterator* PopActiveIter() { + auto active_top = active_iters_.top(); + auto iter = *active_top; + active_iters_.pop(); + active_seqnums_.erase(active_top); + return iter; + } + + void PushInactiveIter(TruncatedRangeDelIterator* iter) { + inactive_iters_.push(iter); + } + + TruncatedRangeDelIterator* PopInactiveIter() { + auto* iter = inactive_iters_.top(); + inactive_iters_.pop(); + return iter; + } + + const InternalKeyComparator* icmp_; + const std::vector>* iters_; + size_t unused_idx_; + ActiveSeqSet active_seqnums_; + BinaryHeap active_iters_; + BinaryHeap inactive_iters_; +}; + +class ReverseRangeDelIterator { + public: + ReverseRangeDelIterator( + const InternalKeyComparator* icmp, + const std::vector>* iters); + + bool ShouldDelete(const ParsedInternalKey& parsed); + void Invalidate(); + + private: + using ActiveSeqSet = + std::multiset; + + struct EndKeyMaxComparator { + explicit EndKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {} + + bool operator()(const TruncatedRangeDelIterator* a, + const TruncatedRangeDelIterator* b) const { + return icmp->Compare(a->end_key(), b->end_key()) < 0; + } + + const InternalKeyComparator* icmp; + }; + struct StartKeyMaxComparator { + explicit StartKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {} + + bool operator()(const ActiveSeqSet::const_iterator& a, + const ActiveSeqSet::const_iterator& b) const { + return icmp->Compare((*a)->start_key(), (*b)->start_key()) < 0; + } + + const InternalKeyComparator* icmp; + }; + + void PushIter(TruncatedRangeDelIterator* iter, + const ParsedInternalKey& parsed) { + if (!iter->Valid()) { + // The iterator has been fully consumed, so we don't need to add it to + // either of the heaps. + } else if (icmp_->Compare(iter->end_key(), parsed) <= 0) { + PushInactiveIter(iter); + } else { + PushActiveIter(iter); + } + } + + void PushActiveIter(TruncatedRangeDelIterator* iter) { + auto seq_pos = active_seqnums_.insert(iter); + active_iters_.push(seq_pos); + } + + TruncatedRangeDelIterator* PopActiveIter() { + auto active_top = active_iters_.top(); + auto iter = *active_top; + active_iters_.pop(); + active_seqnums_.erase(active_top); + return iter; + } + + void PushInactiveIter(TruncatedRangeDelIterator* iter) { + inactive_iters_.push(iter); + } + + TruncatedRangeDelIterator* PopInactiveIter() { + auto* iter = inactive_iters_.top(); + inactive_iters_.pop(); + return iter; + } + + const InternalKeyComparator* icmp_; + const std::vector>* iters_; + size_t unused_idx_; + ActiveSeqSet active_seqnums_; + BinaryHeap active_iters_; + BinaryHeap inactive_iters_; +}; + class RangeDelAggregatorV2 { public: RangeDelAggregatorV2(const InternalKeyComparator* icmp, @@ -95,9 +256,10 @@ class RangeDelAggregatorV2 { bool IsRangeOverlapped(const Slice& start, const Slice& end); - // TODO: no-op for now, but won't be once ShouldDelete leverages positioning - // mode and doesn't re-seek every ShouldDelete - void InvalidateRangeDelMapPositions() {} + void InvalidateRangeDelMapPositions() { + forward_iter_.Invalidate(); + reverse_iter_.Invalidate(); + } bool IsEmpty() const { return iters_.empty(); } bool AddFile(uint64_t file_number) { @@ -127,6 +289,9 @@ class RangeDelAggregatorV2 { std::list> pinned_fragments_; std::set files_seen_; + ForwardRangeDelIterator forward_iter_; + ReverseRangeDelIterator reverse_iter_; + // TODO: remove once V2 supports exposing tombstone iterators std::unique_ptr wrapped_range_del_agg; }; diff --git a/util/heap.h b/util/heap.h index 4d5894134..9447a105e 100644 --- a/util/heap.h +++ b/util/heap.h @@ -96,6 +96,10 @@ class BinaryHeap { return data_.empty(); } + size_t size() const { + return data_.size(); + } + void reset_root_cmp_cache() { root_cmp_cache_ = port::kMaxSizet; } private: