From 85394a96ca9e3361bc1be0fc4df5d8a2b4f3ab25 Mon Sep 17 00:00:00 2001 From: Abhishek Madan Date: Mon, 26 Nov 2018 16:31:30 -0800 Subject: [PATCH] Speed up range scans with range tombstones (#4677) Summary: Previously, every range tombstone iterator was seeked on every ShouldDelete call, which quickly degraded performance for long range scans. This PR improves performance by tracking iterator positions and only advancing iterators when necessary. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4677 Differential Revision: D13205373 Pulled By: abhimadan fbshipit-source-id: 80c199dace1e19362a4c61c686bf01913eae87cb --- db/range_del_aggregator_v2.cc | 135 +++++++++++++++++++++++++-- db/range_del_aggregator_v2.h | 171 +++++++++++++++++++++++++++++++++- util/heap.h | 4 + 3 files changed, 298 insertions(+), 12 deletions(-) diff --git a/db/range_del_aggregator_v2.cc b/db/range_del_aggregator_v2.cc index 015a5b2d8..df219317f 100644 --- a/db/range_del_aggregator_v2.cc +++ b/db/range_del_aggregator_v2.cc @@ -104,9 +104,122 @@ void TruncatedRangeDelIterator::SeekToFirst() { iter_->SeekToTopFirst(); } void TruncatedRangeDelIterator::SeekToLast() { iter_->SeekToTopLast(); } +ForwardRangeDelIterator::ForwardRangeDelIterator( + const InternalKeyComparator* icmp, + const std::vector>* iters) + : icmp_(icmp), + iters_(iters), + unused_idx_(0), + active_seqnums_(SeqMaxComparator()), + active_iters_(EndKeyMinComparator(icmp)), + inactive_iters_(StartKeyMinComparator(icmp)) {} + +bool ForwardRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) { + assert(iters_ != nullptr); + // Pick up previously unseen iterators. + for (auto it = std::next(iters_->begin(), unused_idx_); it != iters_->end(); + ++it, ++unused_idx_) { + auto& iter = *it; + iter->Seek(parsed.user_key); + PushIter(iter.get(), parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + // Move active iterators that end before parsed. + while (!active_iters_.empty() && + icmp_->Compare((*active_iters_.top())->end_key(), parsed) <= 0) { + TruncatedRangeDelIterator* iter = PopActiveIter(); + do { + iter->Next(); + } while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0); + PushIter(iter, parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + // Move inactive iterators that start before parsed. + while (!inactive_iters_.empty() && + icmp_->Compare(inactive_iters_.top()->start_key(), parsed) <= 0) { + TruncatedRangeDelIterator* iter = PopInactiveIter(); + while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0) { + iter->Next(); + } + PushIter(iter, parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + return active_seqnums_.empty() + ? false + : (*active_seqnums_.begin())->seq() > parsed.sequence; +} + +void ForwardRangeDelIterator::Invalidate() { + unused_idx_ = 0; + active_iters_.clear(); + active_seqnums_.clear(); + inactive_iters_.clear(); +} + +ReverseRangeDelIterator::ReverseRangeDelIterator( + const InternalKeyComparator* icmp, + const std::vector>* iters) + : icmp_(icmp), + iters_(iters), + unused_idx_(0), + active_seqnums_(SeqMaxComparator()), + active_iters_(StartKeyMaxComparator(icmp)), + inactive_iters_(EndKeyMaxComparator(icmp)) {} + +bool ReverseRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) { + assert(iters_ != nullptr); + // Pick up previously unseen iterators. + for (auto it = std::next(iters_->begin(), unused_idx_); it != iters_->end(); + ++it, ++unused_idx_) { + auto& iter = *it; + iter->SeekForPrev(parsed.user_key); + PushIter(iter.get(), parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + // Move active iterators that start after parsed. + while (!active_iters_.empty() && + icmp_->Compare(parsed, (*active_iters_.top())->start_key()) < 0) { + TruncatedRangeDelIterator* iter = PopActiveIter(); + do { + iter->Prev(); + } while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0); + PushIter(iter, parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + // Move inactive iterators that end after parsed. + while (!inactive_iters_.empty() && + icmp_->Compare(parsed, inactive_iters_.top()->end_key()) < 0) { + TruncatedRangeDelIterator* iter = PopInactiveIter(); + while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0) { + iter->Prev(); + } + PushIter(iter, parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + return active_seqnums_.empty() + ? false + : (*active_seqnums_.begin())->seq() > parsed.sequence; +} + +void ReverseRangeDelIterator::Invalidate() { + unused_idx_ = 0; + active_iters_.clear(); + active_seqnums_.clear(); + inactive_iters_.clear(); +} + RangeDelAggregatorV2::RangeDelAggregatorV2(const InternalKeyComparator* icmp, SequenceNumber upper_bound) - : icmp_(icmp), upper_bound_(upper_bound) {} + : icmp_(icmp), + upper_bound_(upper_bound), + forward_iter_(icmp, &iters_), + reverse_iter_(icmp, &iters_) {} void RangeDelAggregatorV2::AddTombstones( std::unique_ptr input_iter, @@ -143,20 +256,24 @@ bool RangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed, if (wrapped_range_del_agg != nullptr) { return wrapped_range_del_agg->ShouldDelete(parsed, mode); } - // TODO: avoid re-seeking every call - for (auto& iter : iters_) { - iter->Seek(parsed.user_key); - if (iter->Valid() && icmp_->Compare(iter->start_key(), parsed) <= 0 && - iter->seq() > parsed.sequence) { - return true; - } + + switch (mode) { + case RangeDelPositioningMode::kForwardTraversal: + reverse_iter_.Invalidate(); + return forward_iter_.ShouldDelete(parsed); + case RangeDelPositioningMode::kBackwardTraversal: + forward_iter_.Invalidate(); + return reverse_iter_.ShouldDelete(parsed); + default: + assert(false); + return false; } - return false; } bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start, const Slice& end) { assert(wrapped_range_del_agg == nullptr); + InvalidateRangeDelMapPositions(); // Set the internal start/end keys so that: // - if start_ikey has the same user key and sequence number as the current diff --git a/db/range_del_aggregator_v2.h b/db/range_del_aggregator_v2.h index 6689fad50..e8c52052a 100644 --- a/db/range_del_aggregator_v2.h +++ b/db/range_del_aggregator_v2.h @@ -78,6 +78,167 @@ class TruncatedRangeDelIterator { std::list pinned_bounds_; }; +struct SeqMaxComparator { + bool operator()(const TruncatedRangeDelIterator* a, + const TruncatedRangeDelIterator* b) const { + return a->seq() > b->seq(); + } +}; + +class ForwardRangeDelIterator { + public: + ForwardRangeDelIterator( + const InternalKeyComparator* icmp, + const std::vector>* iters); + + bool ShouldDelete(const ParsedInternalKey& parsed); + void Invalidate(); + + private: + using ActiveSeqSet = + std::multiset; + + struct StartKeyMinComparator { + explicit StartKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {} + + bool operator()(const TruncatedRangeDelIterator* a, + const TruncatedRangeDelIterator* b) const { + return icmp->Compare(a->start_key(), b->start_key()) > 0; + } + + const InternalKeyComparator* icmp; + }; + struct EndKeyMinComparator { + explicit EndKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {} + + bool operator()(const ActiveSeqSet::const_iterator& a, + const ActiveSeqSet::const_iterator& b) const { + return icmp->Compare((*a)->end_key(), (*b)->end_key()) > 0; + } + + const InternalKeyComparator* icmp; + }; + + void PushIter(TruncatedRangeDelIterator* iter, + const ParsedInternalKey& parsed) { + if (!iter->Valid()) { + // The iterator has been fully consumed, so we don't need to add it to + // either of the heaps. + } else if (icmp_->Compare(parsed, iter->start_key()) < 0) { + PushInactiveIter(iter); + } else { + PushActiveIter(iter); + } + } + + void PushActiveIter(TruncatedRangeDelIterator* iter) { + auto seq_pos = active_seqnums_.insert(iter); + active_iters_.push(seq_pos); + } + + TruncatedRangeDelIterator* PopActiveIter() { + auto active_top = active_iters_.top(); + auto iter = *active_top; + active_iters_.pop(); + active_seqnums_.erase(active_top); + return iter; + } + + void PushInactiveIter(TruncatedRangeDelIterator* iter) { + inactive_iters_.push(iter); + } + + TruncatedRangeDelIterator* PopInactiveIter() { + auto* iter = inactive_iters_.top(); + inactive_iters_.pop(); + return iter; + } + + const InternalKeyComparator* icmp_; + const std::vector>* iters_; + size_t unused_idx_; + ActiveSeqSet active_seqnums_; + BinaryHeap active_iters_; + BinaryHeap inactive_iters_; +}; + +class ReverseRangeDelIterator { + public: + ReverseRangeDelIterator( + const InternalKeyComparator* icmp, + const std::vector>* iters); + + bool ShouldDelete(const ParsedInternalKey& parsed); + void Invalidate(); + + private: + using ActiveSeqSet = + std::multiset; + + struct EndKeyMaxComparator { + explicit EndKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {} + + bool operator()(const TruncatedRangeDelIterator* a, + const TruncatedRangeDelIterator* b) const { + return icmp->Compare(a->end_key(), b->end_key()) < 0; + } + + const InternalKeyComparator* icmp; + }; + struct StartKeyMaxComparator { + explicit StartKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {} + + bool operator()(const ActiveSeqSet::const_iterator& a, + const ActiveSeqSet::const_iterator& b) const { + return icmp->Compare((*a)->start_key(), (*b)->start_key()) < 0; + } + + const InternalKeyComparator* icmp; + }; + + void PushIter(TruncatedRangeDelIterator* iter, + const ParsedInternalKey& parsed) { + if (!iter->Valid()) { + // The iterator has been fully consumed, so we don't need to add it to + // either of the heaps. + } else if (icmp_->Compare(iter->end_key(), parsed) <= 0) { + PushInactiveIter(iter); + } else { + PushActiveIter(iter); + } + } + + void PushActiveIter(TruncatedRangeDelIterator* iter) { + auto seq_pos = active_seqnums_.insert(iter); + active_iters_.push(seq_pos); + } + + TruncatedRangeDelIterator* PopActiveIter() { + auto active_top = active_iters_.top(); + auto iter = *active_top; + active_iters_.pop(); + active_seqnums_.erase(active_top); + return iter; + } + + void PushInactiveIter(TruncatedRangeDelIterator* iter) { + inactive_iters_.push(iter); + } + + TruncatedRangeDelIterator* PopInactiveIter() { + auto* iter = inactive_iters_.top(); + inactive_iters_.pop(); + return iter; + } + + const InternalKeyComparator* icmp_; + const std::vector>* iters_; + size_t unused_idx_; + ActiveSeqSet active_seqnums_; + BinaryHeap active_iters_; + BinaryHeap inactive_iters_; +}; + class RangeDelAggregatorV2 { public: RangeDelAggregatorV2(const InternalKeyComparator* icmp, @@ -95,9 +256,10 @@ class RangeDelAggregatorV2 { bool IsRangeOverlapped(const Slice& start, const Slice& end); - // TODO: no-op for now, but won't be once ShouldDelete leverages positioning - // mode and doesn't re-seek every ShouldDelete - void InvalidateRangeDelMapPositions() {} + void InvalidateRangeDelMapPositions() { + forward_iter_.Invalidate(); + reverse_iter_.Invalidate(); + } bool IsEmpty() const { return iters_.empty(); } bool AddFile(uint64_t file_number) { @@ -127,6 +289,9 @@ class RangeDelAggregatorV2 { std::list> pinned_fragments_; std::set files_seen_; + ForwardRangeDelIterator forward_iter_; + ReverseRangeDelIterator reverse_iter_; + // TODO: remove once V2 supports exposing tombstone iterators std::unique_ptr wrapped_range_del_agg; }; diff --git a/util/heap.h b/util/heap.h index 4d5894134..9447a105e 100644 --- a/util/heap.h +++ b/util/heap.h @@ -96,6 +96,10 @@ class BinaryHeap { return data_.empty(); } + size_t size() const { + return data_.size(); + } + void reset_root_cmp_cache() { root_cmp_cache_ = port::kMaxSizet; } private: