diff --git a/table/iter_heap.h b/table/iter_heap.h index 9569d3638..5343175c3 100644 --- a/table/iter_heap.h +++ b/table/iter_heap.h @@ -5,36 +5,34 @@ // #pragma once -#include #include "rocksdb/comparator.h" #include "table/iterator_wrapper.h" namespace rocksdb { -// Return the max of two keys. +// When used with std::priority_queue, this comparison functor puts the +// iterator with the max/largest key on top. class MaxIteratorComparator { public: MaxIteratorComparator(const Comparator* comparator) : comparator_(comparator) {} - bool operator()(IteratorWrapper* a, IteratorWrapper* b) { - return comparator_->Compare(a->key(), b->key()) <= 0; + bool operator()(IteratorWrapper* a, IteratorWrapper* b) const { + return comparator_->Compare(a->key(), b->key()) < 0; } private: const Comparator* comparator_; }; -// Return the max of two keys. +// When used with std::priority_queue, this comparison functor puts the +// iterator with the min/smallest key on top. class MinIteratorComparator { public: - // if maxHeap is set comparator returns the max value. - // else returns the min Value. - // Can use to create a minHeap or a maxHeap. MinIteratorComparator(const Comparator* comparator) : comparator_(comparator) {} - bool operator()(IteratorWrapper* a, IteratorWrapper* b) { + bool operator()(IteratorWrapper* a, IteratorWrapper* b) const { return comparator_->Compare(a->key(), b->key()) > 0; } private: diff --git a/table/merger.cc b/table/merger.cc index 9781d0dca..943a360e9 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -10,7 +10,6 @@ #include "table/merger.h" #include -#include #include "rocksdb/comparator.h" #include "rocksdb/iterator.h" @@ -18,6 +17,7 @@ #include "table/iter_heap.h" #include "table/iterator_wrapper.h" #include "util/arena.h" +#include "util/heap.h" #include "util/stop_watch.h" #include "util/perf_context_imp.h" #include "util/autovector.h" @@ -25,21 +25,8 @@ namespace rocksdb { // Without anonymous namespace here, we fail the warning -Wmissing-prototypes namespace { -typedef std::priority_queue, - MaxIteratorComparator> MergerMaxIterHeap; - -typedef std::priority_queue, - MinIteratorComparator> MergerMinIterHeap; - -// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator. -MergerMaxIterHeap NewMergerMaxIterHeap(const Comparator* comparator) { - return MergerMaxIterHeap(MaxIteratorComparator(comparator)); -} - -// Return's a new MinHeap of IteratorWrapper's using the provided Comparator. -MergerMinIterHeap NewMergerMinIterHeap(const Comparator* comparator) { - return MergerMinIterHeap(MinIteratorComparator(comparator)); -} +typedef BinaryHeap MergerMaxIterHeap; +typedef BinaryHeap MergerMinIterHeap; } // namespace const size_t kNumIterReserve = 4; @@ -51,10 +38,8 @@ class MergingIterator : public Iterator { : is_arena_mode_(is_arena_mode), comparator_(comparator), current_(nullptr), - use_heap_(true), direction_(kForward), - maxHeap_(NewMergerMaxIterHeap(comparator_)), - minHeap_(NewMergerMinIterHeap(comparator_)) { + minHeap_(comparator_) { children_.resize(n); for (int i = 0; i < n; i++) { children_[i].Set(children[i]); @@ -64,6 +49,7 @@ class MergingIterator : public Iterator { minHeap_.push(&child); } } + current_ = CurrentForward(); } virtual void AddIterator(Iterator* iter) { @@ -72,6 +58,7 @@ class MergingIterator : public Iterator { auto new_wrapper = children_.back(); if (new_wrapper.Valid()) { minHeap_.push(&new_wrapper); + current_ = CurrentForward(); } } @@ -91,27 +78,25 @@ class MergingIterator : public Iterator { minHeap_.push(&child); } } - FindSmallest(); direction_ = kForward; + current_ = CurrentForward(); } virtual void SeekToLast() override { ClearHeaps(); + InitMaxHeap(); for (auto& child : children_) { child.SeekToLast(); if (child.Valid()) { - maxHeap_.push(&child); + maxHeap_->push(&child); } } - FindLargest(); direction_ = kReverse; + current_ = CurrentReverse(); } virtual void Seek(const Slice& target) override { - // Invalidate the heap. - use_heap_ = false; - IteratorWrapper* first_child = nullptr; - + ClearHeaps(); for (auto& child : children_) { { PERF_TIMER_GUARD(seek_child_seek_time); @@ -120,36 +105,15 @@ class MergingIterator : public Iterator { PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { - // This child has valid key - if (!use_heap_) { - if (first_child == nullptr) { - // It's the first child has valid key. Only put it int - // current_. Now the values in the heap should be invalid. - first_child = &child; - } else { - // We have more than one children with valid keys. Initialize - // the heap and put the first child into the heap. - PERF_TIMER_GUARD(seek_min_heap_time); - ClearHeaps(); - minHeap_.push(first_child); - } - } - if (use_heap_) { - PERF_TIMER_GUARD(seek_min_heap_time); - minHeap_.push(&child); - } + PERF_TIMER_GUARD(seek_min_heap_time); + minHeap_.push(&child); } } - if (use_heap_) { - // If heap is valid, need to put the smallest key to curent_. - PERF_TIMER_GUARD(seek_min_heap_time); - FindSmallest(); - } else { - // The heap is not valid, then the current_ iterator is the first - // one, or null if there is no first child. - current_ = first_child; - } direction_ = kForward; + { + PERF_TIMER_GUARD(seek_min_heap_time); + current_ = CurrentForward(); + } } virtual void Next() override { @@ -157,10 +121,11 @@ class MergingIterator : public Iterator { // Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already - // true for all of the non-current_ children since current_ is - // the smallest child and key() == current_->key(). Otherwise, - // we explicitly position the non-current_ children. + // true for all of the non-current children since current_ is + // the smallest child and key() == current_->key(). if (direction_ != kForward) { + // Otherwise, advance the non-current children. We advance current_ + // just after the if-block. ClearHeaps(); for (auto& child : children_) { if (&child != current_) { @@ -169,36 +134,42 @@ class MergingIterator : public Iterator { comparator_->Compare(key(), child.key()) == 0) { child.Next(); } - if (child.Valid()) { - minHeap_.push(&child); - } + } + if (child.Valid()) { + minHeap_.push(&child); } } direction_ = kForward; + // The loop advanced all non-current children to be > key() so current_ + // should still be strictly the smallest key. + assert(current_ == CurrentForward()); } // as the current points to the current record. move the iterator forward. - // and if it is valid add it to the heap. current_->Next(); - if (use_heap_) { - if (current_->Valid()) { - minHeap_.push(current_); - } - FindSmallest(); - } else if (!current_->Valid()) { - current_ = nullptr; + if (current_->Valid()) { + // current is still valid after the Next() call above. Call + // replace_top() to restore the heap property. When the same child + // iterator yields a sequence of keys, this is cheap. + minHeap_.replace_top(current_); + } else { + // current stopped being valid, remove it from the heap. + minHeap_.pop(); } + current_ = CurrentForward(); } virtual void Prev() override { assert(Valid()); // Ensure that all children are positioned before key(). // If we are moving in the reverse direction, it is already - // true for all of the non-current_ children since current_ is - // the largest child and key() == current_->key(). Otherwise, - // we explicitly position the non-current_ children. + // true for all of the non-current children since current_ is + // the largest child and key() == current_->key(). if (direction_ != kReverse) { + // Otherwise, retreat the non-current children. We retreat current_ + // just after the if-block. ClearHeaps(); + InitMaxHeap(); for (auto& child : children_) { if (&child != current_) { child.Seek(key()); @@ -222,19 +193,28 @@ class MergingIterator : public Iterator { continue; } } - if (child.Valid()) { - maxHeap_.push(&child); - } + } + if (child.Valid()) { + maxHeap_->push(&child); } } direction_ = kReverse; + // The loop retreated all non-current children to be < key() so current_ + // should still be strictly the largest key. + assert(current_ == CurrentReverse()); } current_->Prev(); if (current_->Valid()) { - maxHeap_.push(current_); + // current is still valid after the Prev() call above. Call + // replace_top() to restore the heap property. When the same child + // iterator yields a sequence of keys, this is cheap. + maxHeap_->replace_top(current_); + } else { + // current stopped being valid, remove it from the heap. + maxHeap_->pop(); } - FindLargest(); + current_ = CurrentReverse(); } virtual Slice key() const override { @@ -259,56 +239,54 @@ class MergingIterator : public Iterator { } private: - void FindSmallest(); - void FindLargest(); + // Clears heaps for both directions, used when changing direction or seeking void ClearHeaps(); + // Ensures that maxHeap_ is initialized when starting to go in the reverse + // direction + void InitMaxHeap(); bool is_arena_mode_; const Comparator* comparator_; autovector children_; + + // Cached pointer to child iterator with the current key, or nullptr if no + // child iterators are valid. This is the top of minHeap_ or maxHeap_ + // depending on the direction. IteratorWrapper* current_; - // If the value is true, both of iterators in the heap and current_ - // contain valid rows. If it is false, only current_ can possibly contain - // valid rows. - // This flag is always true for reverse direction, as we always use heap for - // the reverse iterating case. - bool use_heap_; // Which direction is the iterator moving? enum Direction { kForward, kReverse }; Direction direction_; - MergerMaxIterHeap maxHeap_; MergerMinIterHeap minHeap_; + // Max heap is used for reverse iteration, which is way less common than + // forward. Lazily initialize it to save memory. + std::unique_ptr maxHeap_; + + IteratorWrapper* CurrentForward() const { + assert(direction_ == kForward); + return !minHeap_.empty() ? minHeap_.top() : nullptr; + } + + IteratorWrapper* CurrentReverse() const { + assert(direction_ == kReverse); + assert(maxHeap_); + return !maxHeap_->empty() ? maxHeap_->top() : nullptr; + } }; -void MergingIterator::FindSmallest() { - assert(use_heap_); - if (minHeap_.empty()) { - current_ = nullptr; - } else { - current_ = minHeap_.top(); - assert(current_->Valid()); - minHeap_.pop(); - } -} - -void MergingIterator::FindLargest() { - assert(use_heap_); - if (maxHeap_.empty()) { - current_ = nullptr; - } else { - current_ = maxHeap_.top(); - assert(current_->Valid()); - maxHeap_.pop(); - } -} - void MergingIterator::ClearHeaps() { - use_heap_ = true; - maxHeap_ = NewMergerMaxIterHeap(comparator_); - minHeap_ = NewMergerMinIterHeap(comparator_); + minHeap_.clear(); + if (maxHeap_) { + maxHeap_->clear(); + } +} + +void MergingIterator::InitMaxHeap() { + if (!maxHeap_) { + maxHeap_.reset(new MergerMaxIterHeap(comparator_)); + } } Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n, diff --git a/util/heap.h b/util/heap.h new file mode 100644 index 000000000..7d9e11113 --- /dev/null +++ b/util/heap.h @@ -0,0 +1,140 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include +#include "util/autovector.h" + +namespace rocksdb { + +// Binary heap implementation optimized for use in multi-way merge sort. +// Comparison to std::priority_queue: +// - In libstdc++, std::priority_queue::pop() usually performs just over logN +// comparisons but never fewer. +// - std::priority_queue does not have a replace-top operation, requiring a +// pop+push. If the replacement element is the new top, this requires +// around 2logN comparisons. +// - This heap's pop() uses a "schoolbook" downheap which requires up to ~2logN +// comparisons. +// - This heap provides a replace_top() operation which requires [1, 2logN] +// comparisons. When the replacement element is also the new top, this +// takes just 1 or 2 comparisons. +// +// The last property can yield an order-of-magnitude performance improvement +// when merge-sorting real-world non-random data. If the merge operation is +// likely to take chunks of elements from the same input stream, only 1 +// comparison per element is needed. In RocksDB-land, this happens when +// compacting a database where keys are not randomly distributed across L0 +// files but nearby keys are likely to be in the same L0 file. +// +// The container uses the same counterintuitive ordering as +// std::priority_queue: the comparison operator is expected to provide the +// less-than relation, but top() will return the maximum. + +template> +class BinaryHeap { + public: + BinaryHeap() { } + explicit BinaryHeap(Compare cmp) : cmp_(std::move(cmp)) { } + + void push(const T& value) { + data_.push_back(value); + upheap(data_.size() - 1); + } + + void push(T&& value) { + data_.push_back(std::move(value)); + upheap(data_.size() - 1); + } + + const T& top() const { + assert(!empty()); + return data_.front(); + } + + void replace_top(const T& value) { + assert(!empty()); + data_.front() = value; + downheap(get_root()); + } + + void replace_top(T&& value) { + assert(!empty()); + data_.front() = std::move(value); + downheap(get_root()); + } + + void pop() { + assert(!empty()); + data_.front() = std::move(data_.back()); + data_.pop_back(); + if (!empty()) { + downheap(get_root()); + } + } + + void swap(BinaryHeap &other) { + std::swap(cmp_, other.cmp_); + data_.swap(other.data_); + } + + void clear() { + data_.clear(); + } + + bool empty() const { + return data_.empty(); + } + + private: + static inline size_t get_root() { return 0; } + static inline size_t get_parent(size_t index) { return (index - 1) / 2; } + static inline size_t get_left(size_t index) { return 2 * index + 1; } + static inline size_t get_right(size_t index) { return 2 * index + 2; } + + void upheap(size_t index) { + T v = std::move(data_[index]); + while (index > get_root()) { + const size_t parent = get_parent(index); + if (!cmp_(data_[parent], v)) { + break; + } + data_[index] = std::move(data_[parent]); + index = parent; + } + data_[index] = std::move(v); + } + + void downheap(size_t index) { + T v = std::move(data_[index]); + while (1) { + const size_t left_child = get_left(index); + if (get_left(index) >= data_.size()) { + break; + } + const size_t right_child = left_child + 1; + assert(right_child == get_right(index)); + size_t picked_child = left_child; + if (right_child < data_.size() && + cmp_(data_[left_child], data_[right_child])) { + picked_child = right_child; + } + if (!cmp_(v, data_[picked_child])) { + break; + } + data_[index] = std::move(data_[picked_child]); + index = picked_child; + } + data_[index] = std::move(v); + } + + Compare cmp_; + autovector data_; +}; + +} // namespace rocksdb