Speed up range scans with range tombstones (#4677)
Summary: Previously, every range tombstone iterator was seeked on every ShouldDelete call, which quickly degraded performance for long range scans. This PR improves performance by tracking iterator positions and only advancing iterators when necessary. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4677 Differential Revision: D13205373 Pulled By: abhimadan fbshipit-source-id: 80c199dace1e19362a4c61c686bf01913eae87cb
This commit is contained in:
parent
a21cb22ee3
commit
85394a96ca
@ -104,9 +104,122 @@ void TruncatedRangeDelIterator::SeekToFirst() { iter_->SeekToTopFirst(); }
|
||||
|
||||
void TruncatedRangeDelIterator::SeekToLast() { iter_->SeekToTopLast(); }
|
||||
|
||||
ForwardRangeDelIterator::ForwardRangeDelIterator(
|
||||
const InternalKeyComparator* icmp,
|
||||
const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>* iters)
|
||||
: icmp_(icmp),
|
||||
iters_(iters),
|
||||
unused_idx_(0),
|
||||
active_seqnums_(SeqMaxComparator()),
|
||||
active_iters_(EndKeyMinComparator(icmp)),
|
||||
inactive_iters_(StartKeyMinComparator(icmp)) {}
|
||||
|
||||
bool ForwardRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
|
||||
assert(iters_ != nullptr);
|
||||
// Pick up previously unseen iterators.
|
||||
for (auto it = std::next(iters_->begin(), unused_idx_); it != iters_->end();
|
||||
++it, ++unused_idx_) {
|
||||
auto& iter = *it;
|
||||
iter->Seek(parsed.user_key);
|
||||
PushIter(iter.get(), parsed);
|
||||
assert(active_iters_.size() == active_seqnums_.size());
|
||||
}
|
||||
|
||||
// Move active iterators that end before parsed.
|
||||
while (!active_iters_.empty() &&
|
||||
icmp_->Compare((*active_iters_.top())->end_key(), parsed) <= 0) {
|
||||
TruncatedRangeDelIterator* iter = PopActiveIter();
|
||||
do {
|
||||
iter->Next();
|
||||
} while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0);
|
||||
PushIter(iter, parsed);
|
||||
assert(active_iters_.size() == active_seqnums_.size());
|
||||
}
|
||||
|
||||
// Move inactive iterators that start before parsed.
|
||||
while (!inactive_iters_.empty() &&
|
||||
icmp_->Compare(inactive_iters_.top()->start_key(), parsed) <= 0) {
|
||||
TruncatedRangeDelIterator* iter = PopInactiveIter();
|
||||
while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0) {
|
||||
iter->Next();
|
||||
}
|
||||
PushIter(iter, parsed);
|
||||
assert(active_iters_.size() == active_seqnums_.size());
|
||||
}
|
||||
|
||||
return active_seqnums_.empty()
|
||||
? false
|
||||
: (*active_seqnums_.begin())->seq() > parsed.sequence;
|
||||
}
|
||||
|
||||
void ForwardRangeDelIterator::Invalidate() {
|
||||
unused_idx_ = 0;
|
||||
active_iters_.clear();
|
||||
active_seqnums_.clear();
|
||||
inactive_iters_.clear();
|
||||
}
|
||||
|
||||
ReverseRangeDelIterator::ReverseRangeDelIterator(
|
||||
const InternalKeyComparator* icmp,
|
||||
const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>* iters)
|
||||
: icmp_(icmp),
|
||||
iters_(iters),
|
||||
unused_idx_(0),
|
||||
active_seqnums_(SeqMaxComparator()),
|
||||
active_iters_(StartKeyMaxComparator(icmp)),
|
||||
inactive_iters_(EndKeyMaxComparator(icmp)) {}
|
||||
|
||||
bool ReverseRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
|
||||
assert(iters_ != nullptr);
|
||||
// Pick up previously unseen iterators.
|
||||
for (auto it = std::next(iters_->begin(), unused_idx_); it != iters_->end();
|
||||
++it, ++unused_idx_) {
|
||||
auto& iter = *it;
|
||||
iter->SeekForPrev(parsed.user_key);
|
||||
PushIter(iter.get(), parsed);
|
||||
assert(active_iters_.size() == active_seqnums_.size());
|
||||
}
|
||||
|
||||
// Move active iterators that start after parsed.
|
||||
while (!active_iters_.empty() &&
|
||||
icmp_->Compare(parsed, (*active_iters_.top())->start_key()) < 0) {
|
||||
TruncatedRangeDelIterator* iter = PopActiveIter();
|
||||
do {
|
||||
iter->Prev();
|
||||
} while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0);
|
||||
PushIter(iter, parsed);
|
||||
assert(active_iters_.size() == active_seqnums_.size());
|
||||
}
|
||||
|
||||
// Move inactive iterators that end after parsed.
|
||||
while (!inactive_iters_.empty() &&
|
||||
icmp_->Compare(parsed, inactive_iters_.top()->end_key()) < 0) {
|
||||
TruncatedRangeDelIterator* iter = PopInactiveIter();
|
||||
while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0) {
|
||||
iter->Prev();
|
||||
}
|
||||
PushIter(iter, parsed);
|
||||
assert(active_iters_.size() == active_seqnums_.size());
|
||||
}
|
||||
|
||||
return active_seqnums_.empty()
|
||||
? false
|
||||
: (*active_seqnums_.begin())->seq() > parsed.sequence;
|
||||
}
|
||||
|
||||
void ReverseRangeDelIterator::Invalidate() {
|
||||
unused_idx_ = 0;
|
||||
active_iters_.clear();
|
||||
active_seqnums_.clear();
|
||||
inactive_iters_.clear();
|
||||
}
|
||||
|
||||
RangeDelAggregatorV2::RangeDelAggregatorV2(const InternalKeyComparator* icmp,
|
||||
SequenceNumber upper_bound)
|
||||
: icmp_(icmp), upper_bound_(upper_bound) {}
|
||||
: icmp_(icmp),
|
||||
upper_bound_(upper_bound),
|
||||
forward_iter_(icmp, &iters_),
|
||||
reverse_iter_(icmp, &iters_) {}
|
||||
|
||||
void RangeDelAggregatorV2::AddTombstones(
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
|
||||
@ -143,20 +256,24 @@ bool RangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed,
|
||||
if (wrapped_range_del_agg != nullptr) {
|
||||
return wrapped_range_del_agg->ShouldDelete(parsed, mode);
|
||||
}
|
||||
// TODO: avoid re-seeking every call
|
||||
for (auto& iter : iters_) {
|
||||
iter->Seek(parsed.user_key);
|
||||
if (iter->Valid() && icmp_->Compare(iter->start_key(), parsed) <= 0 &&
|
||||
iter->seq() > parsed.sequence) {
|
||||
return true;
|
||||
}
|
||||
|
||||
switch (mode) {
|
||||
case RangeDelPositioningMode::kForwardTraversal:
|
||||
reverse_iter_.Invalidate();
|
||||
return forward_iter_.ShouldDelete(parsed);
|
||||
case RangeDelPositioningMode::kBackwardTraversal:
|
||||
forward_iter_.Invalidate();
|
||||
return reverse_iter_.ShouldDelete(parsed);
|
||||
default:
|
||||
assert(false);
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start,
|
||||
const Slice& end) {
|
||||
assert(wrapped_range_del_agg == nullptr);
|
||||
InvalidateRangeDelMapPositions();
|
||||
|
||||
// Set the internal start/end keys so that:
|
||||
// - if start_ikey has the same user key and sequence number as the current
|
||||
|
@ -78,6 +78,167 @@ class TruncatedRangeDelIterator {
|
||||
std::list<ParsedInternalKey> pinned_bounds_;
|
||||
};
|
||||
|
||||
struct SeqMaxComparator {
|
||||
bool operator()(const TruncatedRangeDelIterator* a,
|
||||
const TruncatedRangeDelIterator* b) const {
|
||||
return a->seq() > b->seq();
|
||||
}
|
||||
};
|
||||
|
||||
class ForwardRangeDelIterator {
|
||||
public:
|
||||
ForwardRangeDelIterator(
|
||||
const InternalKeyComparator* icmp,
|
||||
const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>* iters);
|
||||
|
||||
bool ShouldDelete(const ParsedInternalKey& parsed);
|
||||
void Invalidate();
|
||||
|
||||
private:
|
||||
using ActiveSeqSet =
|
||||
std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>;
|
||||
|
||||
struct StartKeyMinComparator {
|
||||
explicit StartKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {}
|
||||
|
||||
bool operator()(const TruncatedRangeDelIterator* a,
|
||||
const TruncatedRangeDelIterator* b) const {
|
||||
return icmp->Compare(a->start_key(), b->start_key()) > 0;
|
||||
}
|
||||
|
||||
const InternalKeyComparator* icmp;
|
||||
};
|
||||
struct EndKeyMinComparator {
|
||||
explicit EndKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {}
|
||||
|
||||
bool operator()(const ActiveSeqSet::const_iterator& a,
|
||||
const ActiveSeqSet::const_iterator& b) const {
|
||||
return icmp->Compare((*a)->end_key(), (*b)->end_key()) > 0;
|
||||
}
|
||||
|
||||
const InternalKeyComparator* icmp;
|
||||
};
|
||||
|
||||
void PushIter(TruncatedRangeDelIterator* iter,
|
||||
const ParsedInternalKey& parsed) {
|
||||
if (!iter->Valid()) {
|
||||
// The iterator has been fully consumed, so we don't need to add it to
|
||||
// either of the heaps.
|
||||
} else if (icmp_->Compare(parsed, iter->start_key()) < 0) {
|
||||
PushInactiveIter(iter);
|
||||
} else {
|
||||
PushActiveIter(iter);
|
||||
}
|
||||
}
|
||||
|
||||
void PushActiveIter(TruncatedRangeDelIterator* iter) {
|
||||
auto seq_pos = active_seqnums_.insert(iter);
|
||||
active_iters_.push(seq_pos);
|
||||
}
|
||||
|
||||
TruncatedRangeDelIterator* PopActiveIter() {
|
||||
auto active_top = active_iters_.top();
|
||||
auto iter = *active_top;
|
||||
active_iters_.pop();
|
||||
active_seqnums_.erase(active_top);
|
||||
return iter;
|
||||
}
|
||||
|
||||
void PushInactiveIter(TruncatedRangeDelIterator* iter) {
|
||||
inactive_iters_.push(iter);
|
||||
}
|
||||
|
||||
TruncatedRangeDelIterator* PopInactiveIter() {
|
||||
auto* iter = inactive_iters_.top();
|
||||
inactive_iters_.pop();
|
||||
return iter;
|
||||
}
|
||||
|
||||
const InternalKeyComparator* icmp_;
|
||||
const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>* iters_;
|
||||
size_t unused_idx_;
|
||||
ActiveSeqSet active_seqnums_;
|
||||
BinaryHeap<ActiveSeqSet::const_iterator, EndKeyMinComparator> active_iters_;
|
||||
BinaryHeap<TruncatedRangeDelIterator*, StartKeyMinComparator> inactive_iters_;
|
||||
};
|
||||
|
||||
class ReverseRangeDelIterator {
|
||||
public:
|
||||
ReverseRangeDelIterator(
|
||||
const InternalKeyComparator* icmp,
|
||||
const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>* iters);
|
||||
|
||||
bool ShouldDelete(const ParsedInternalKey& parsed);
|
||||
void Invalidate();
|
||||
|
||||
private:
|
||||
using ActiveSeqSet =
|
||||
std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>;
|
||||
|
||||
struct EndKeyMaxComparator {
|
||||
explicit EndKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {}
|
||||
|
||||
bool operator()(const TruncatedRangeDelIterator* a,
|
||||
const TruncatedRangeDelIterator* b) const {
|
||||
return icmp->Compare(a->end_key(), b->end_key()) < 0;
|
||||
}
|
||||
|
||||
const InternalKeyComparator* icmp;
|
||||
};
|
||||
struct StartKeyMaxComparator {
|
||||
explicit StartKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {}
|
||||
|
||||
bool operator()(const ActiveSeqSet::const_iterator& a,
|
||||
const ActiveSeqSet::const_iterator& b) const {
|
||||
return icmp->Compare((*a)->start_key(), (*b)->start_key()) < 0;
|
||||
}
|
||||
|
||||
const InternalKeyComparator* icmp;
|
||||
};
|
||||
|
||||
void PushIter(TruncatedRangeDelIterator* iter,
|
||||
const ParsedInternalKey& parsed) {
|
||||
if (!iter->Valid()) {
|
||||
// The iterator has been fully consumed, so we don't need to add it to
|
||||
// either of the heaps.
|
||||
} else if (icmp_->Compare(iter->end_key(), parsed) <= 0) {
|
||||
PushInactiveIter(iter);
|
||||
} else {
|
||||
PushActiveIter(iter);
|
||||
}
|
||||
}
|
||||
|
||||
void PushActiveIter(TruncatedRangeDelIterator* iter) {
|
||||
auto seq_pos = active_seqnums_.insert(iter);
|
||||
active_iters_.push(seq_pos);
|
||||
}
|
||||
|
||||
TruncatedRangeDelIterator* PopActiveIter() {
|
||||
auto active_top = active_iters_.top();
|
||||
auto iter = *active_top;
|
||||
active_iters_.pop();
|
||||
active_seqnums_.erase(active_top);
|
||||
return iter;
|
||||
}
|
||||
|
||||
void PushInactiveIter(TruncatedRangeDelIterator* iter) {
|
||||
inactive_iters_.push(iter);
|
||||
}
|
||||
|
||||
TruncatedRangeDelIterator* PopInactiveIter() {
|
||||
auto* iter = inactive_iters_.top();
|
||||
inactive_iters_.pop();
|
||||
return iter;
|
||||
}
|
||||
|
||||
const InternalKeyComparator* icmp_;
|
||||
const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>* iters_;
|
||||
size_t unused_idx_;
|
||||
ActiveSeqSet active_seqnums_;
|
||||
BinaryHeap<ActiveSeqSet::const_iterator, StartKeyMaxComparator> active_iters_;
|
||||
BinaryHeap<TruncatedRangeDelIterator*, EndKeyMaxComparator> inactive_iters_;
|
||||
};
|
||||
|
||||
class RangeDelAggregatorV2 {
|
||||
public:
|
||||
RangeDelAggregatorV2(const InternalKeyComparator* icmp,
|
||||
@ -95,9 +256,10 @@ class RangeDelAggregatorV2 {
|
||||
|
||||
bool IsRangeOverlapped(const Slice& start, const Slice& end);
|
||||
|
||||
// TODO: no-op for now, but won't be once ShouldDelete leverages positioning
|
||||
// mode and doesn't re-seek every ShouldDelete
|
||||
void InvalidateRangeDelMapPositions() {}
|
||||
void InvalidateRangeDelMapPositions() {
|
||||
forward_iter_.Invalidate();
|
||||
reverse_iter_.Invalidate();
|
||||
}
|
||||
|
||||
bool IsEmpty() const { return iters_.empty(); }
|
||||
bool AddFile(uint64_t file_number) {
|
||||
@ -127,6 +289,9 @@ class RangeDelAggregatorV2 {
|
||||
std::list<std::unique_ptr<FragmentedRangeTombstoneList>> pinned_fragments_;
|
||||
std::set<uint64_t> files_seen_;
|
||||
|
||||
ForwardRangeDelIterator forward_iter_;
|
||||
ReverseRangeDelIterator reverse_iter_;
|
||||
|
||||
// TODO: remove once V2 supports exposing tombstone iterators
|
||||
std::unique_ptr<RangeDelAggregator> wrapped_range_del_agg;
|
||||
};
|
||||
|
@ -96,6 +96,10 @@ class BinaryHeap {
|
||||
return data_.empty();
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
return data_.size();
|
||||
}
|
||||
|
||||
void reset_root_cmp_cache() { root_cmp_cache_ = port::kMaxSizet; }
|
||||
|
||||
private:
|
||||
|
Loading…
Reference in New Issue
Block a user