From b7a2369fb2ac8bb762553d8492c401fb80826498 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 7 Jul 2015 14:45:20 -0700 Subject: [PATCH] Revert "Replace std::priority_queue in MergingIterator with custom heap" Summary: This patch reverts "Replace std::priority_queue in MergingIterator with custom heap" (commit commit b6655a679d11f42ce9a4915f54d7995f85b7556a) as it causes db_stress failure. Test Plan: ./db_stress --test_batches_snapshots=1 --threads=32 --write_buffer_size=4194304 --destroy_db_initially=0 --reopen=20 --readpercent=45 --prefixpercent=5 --writepercent=35 --delpercent=5 --iterpercent=10 --db=/tmp/rocksdb_crashtest_KdCI5F --max_key=100000000 --mmap_read=0 --block_size=16384 --cache_size=1048576 --open_files=500000 --verify_checksum=1 --sync=0 --progress_reports=0 --disable_wal=0 --disable_data_sync=1 --target_file_size_base=2097152 --target_file_size_multiplier=2 --max_write_buffer_number=3 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --filter_deletes=0 --memtablerep=prefix_hash --prefix_size=7 --ops_per_thread=200 --kill_random_test=97 Reviewers: igor, anthony, lovro, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41343 --- table/iter_heap.h | 16 ++-- table/merger.cc | 185 ++++++++++++++++++++++++++-------------------- util/heap.h | 140 ----------------------------------- 3 files changed, 114 insertions(+), 227 deletions(-) delete mode 100644 util/heap.h diff --git a/table/iter_heap.h b/table/iter_heap.h index 5343175c3..9569d3638 100644 --- a/table/iter_heap.h +++ b/table/iter_heap.h @@ -5,34 +5,36 @@ // #pragma once +#include #include "rocksdb/comparator.h" #include "table/iterator_wrapper.h" namespace rocksdb { -// When used with std::priority_queue, this comparison functor puts the -// iterator with the max/largest key on top. +// Return the max of two keys. class MaxIteratorComparator { public: MaxIteratorComparator(const Comparator* comparator) : comparator_(comparator) {} - bool operator()(IteratorWrapper* a, IteratorWrapper* b) const { - return comparator_->Compare(a->key(), b->key()) < 0; + bool operator()(IteratorWrapper* a, IteratorWrapper* b) { + return comparator_->Compare(a->key(), b->key()) <= 0; } private: const Comparator* comparator_; }; -// When used with std::priority_queue, this comparison functor puts the -// iterator with the min/smallest key on top. +// Return the max of two keys. class MinIteratorComparator { public: + // if maxHeap is set comparator returns the max value. + // else returns the min Value. + // Can use to create a minHeap or a maxHeap. MinIteratorComparator(const Comparator* comparator) : comparator_(comparator) {} - bool operator()(IteratorWrapper* a, IteratorWrapper* b) const { + bool operator()(IteratorWrapper* a, IteratorWrapper* b) { return comparator_->Compare(a->key(), b->key()) > 0; } private: diff --git a/table/merger.cc b/table/merger.cc index f380e0137..32220571c 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -9,6 +9,7 @@ #include "table/merger.h" +#include #include #include "rocksdb/comparator.h" @@ -17,7 +18,6 @@ #include "table/iter_heap.h" #include "table/iterator_wrapper.h" #include "util/arena.h" -#include "util/heap.h" #include "util/stop_watch.h" #include "util/perf_context_imp.h" #include "util/autovector.h" @@ -25,8 +25,21 @@ namespace rocksdb { // Without anonymous namespace here, we fail the warning -Wmissing-prototypes namespace { -typedef BinaryHeap MergerMaxIterHeap; -typedef BinaryHeap MergerMinIterHeap; +typedef std::priority_queue, + MaxIteratorComparator> MergerMaxIterHeap; + +typedef std::priority_queue, + MinIteratorComparator> MergerMinIterHeap; + +// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator. +MergerMaxIterHeap NewMergerMaxIterHeap(const Comparator* comparator) { + return MergerMaxIterHeap(MaxIteratorComparator(comparator)); +} + +// Return's a new MinHeap of IteratorWrapper's using the provided Comparator. +MergerMinIterHeap NewMergerMinIterHeap(const Comparator* comparator) { + return MergerMinIterHeap(MinIteratorComparator(comparator)); +} } // namespace const size_t kNumIterReserve = 4; @@ -38,8 +51,10 @@ class MergingIterator : public Iterator { : is_arena_mode_(is_arena_mode), comparator_(comparator), current_(nullptr), + use_heap_(true), direction_(kForward), - minHeap_(comparator_) { + maxHeap_(NewMergerMaxIterHeap(comparator_)), + minHeap_(NewMergerMinIterHeap(comparator_)) { children_.resize(n); for (int i = 0; i < n; i++) { children_[i].Set(children[i]); @@ -49,7 +64,6 @@ class MergingIterator : public Iterator { minHeap_.push(&child); } } - current_ = CurrentForward(); } virtual void AddIterator(Iterator* iter) { @@ -58,7 +72,6 @@ class MergingIterator : public Iterator { auto new_wrapper = children_.back(); if (new_wrapper.Valid()) { minHeap_.push(&new_wrapper); - current_ = CurrentForward(); } } @@ -78,25 +91,27 @@ class MergingIterator : public Iterator { minHeap_.push(&child); } } + FindSmallest(); direction_ = kForward; - current_ = CurrentForward(); } virtual void SeekToLast() override { ClearHeaps(); - InitMaxHeap(); for (auto& child : children_) { child.SeekToLast(); if (child.Valid()) { - maxHeap_->push(&child); + maxHeap_.push(&child); } } + FindLargest(); direction_ = kReverse; - current_ = CurrentReverse(); } virtual void Seek(const Slice& target) override { - ClearHeaps(); + // Invalidate the heap. + use_heap_ = false; + IteratorWrapper* first_child = nullptr; + for (auto& child : children_) { { PERF_TIMER_GUARD(seek_child_seek_time); @@ -105,15 +120,36 @@ class MergingIterator : public Iterator { PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { - PERF_TIMER_GUARD(seek_min_heap_time); - minHeap_.push(&child); + // This child has valid key + if (!use_heap_) { + if (first_child == nullptr) { + // It's the first child has valid key. Only put it int + // current_. Now the values in the heap should be invalid. + first_child = &child; + } else { + // We have more than one children with valid keys. Initialize + // the heap and put the first child into the heap. + PERF_TIMER_GUARD(seek_min_heap_time); + ClearHeaps(); + minHeap_.push(first_child); + } + } + if (use_heap_) { + PERF_TIMER_GUARD(seek_min_heap_time); + minHeap_.push(&child); + } } } - direction_ = kForward; - { + if (use_heap_) { + // If heap is valid, need to put the smallest key to curent_. PERF_TIMER_GUARD(seek_min_heap_time); - current_ = CurrentForward(); + FindSmallest(); + } else { + // The heap is not valid, then the current_ iterator is the first + // one, or null if there is no first child. + current_ = first_child; } + direction_ = kForward; } virtual void Next() override { @@ -121,11 +157,10 @@ class MergingIterator : public Iterator { // Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already - // true for all of the non-current children since current_ is - // the smallest child and key() == current_->key(). + // true for all of the non-current_ children since current_ is + // the smallest child and key() == current_->key(). Otherwise, + // we explicitly position the non-current_ children. if (direction_ != kForward) { - // Otherwise, advance the non-current children. We advance current_ - // just after the if-block. ClearHeaps(); for (auto& child : children_) { if (&child != current_) { @@ -134,42 +169,36 @@ class MergingIterator : public Iterator { comparator_->Compare(key(), child.key()) == 0) { child.Next(); } - } - if (child.Valid()) { - minHeap_.push(&child); + if (child.Valid()) { + minHeap_.push(&child); + } } } direction_ = kForward; - // The loop advanced all non-current children to be > key() so current_ - // should still be strictly the smallest key. - assert(current_ == CurrentForward()); } // as the current points to the current record. move the iterator forward. + // and if it is valid add it to the heap. current_->Next(); - if (current_->Valid()) { - // current is still valid after the Next() call above. Call - // replace_top() to restore the heap property. When the same child - // iterator yields a sequence of keys, this is cheap. - minHeap_.replace_top(current_); - } else { - // current stopped being valid, remove it from the heap. - minHeap_.pop(); + if (use_heap_) { + if (current_->Valid()) { + minHeap_.push(current_); + } + FindSmallest(); + } else if (!current_->Valid()) { + current_ = nullptr; } - current_ = CurrentForward(); } virtual void Prev() override { assert(Valid()); // Ensure that all children are positioned before key(). // If we are moving in the reverse direction, it is already - // true for all of the non-current children since current_ is - // the largest child and key() == current_->key(). + // true for all of the non-current_ children since current_ is + // the largest child and key() == current_->key(). Otherwise, + // we explicitly position the non-current_ children. if (direction_ != kReverse) { - // Otherwise, retreat the non-current children. We retreat current_ - // just after the if-block. ClearHeaps(); - InitMaxHeap(); for (auto& child : children_) { if (&child != current_) { child.Seek(key()); @@ -180,9 +209,9 @@ class MergingIterator : public Iterator { // Child has no entries >= key(). Position at last entry. child.SeekToLast(); } - } - if (child.Valid()) { - maxHeap_->push(&child); + if (child.Valid()) { + maxHeap_.push(&child); + } } } direction_ = kReverse; @@ -196,15 +225,9 @@ class MergingIterator : public Iterator { current_->Prev(); if (current_->Valid()) { - // current is still valid after the Prev() call above. Call - // replace_top() to restore the heap property. When the same child - // iterator yields a sequence of keys, this is cheap. - maxHeap_->replace_top(current_); - } else { - // current stopped being valid, remove it from the heap. - maxHeap_->pop(); + maxHeap_.push(current_); } - current_ = CurrentReverse(); + FindLargest(); } virtual Slice key() const override { @@ -229,56 +252,58 @@ class MergingIterator : public Iterator { } private: - // Clears heaps for both directions, used when changing direction or seeking + void FindSmallest(); + void FindLargest(); void ClearHeaps(); - // Ensures that maxHeap_ is initialized when starting to go in the reverse - // direction - void InitMaxHeap(); bool is_arena_mode_; const Comparator* comparator_; autovector children_; - - // Cached pointer to child iterator with the current key, or nullptr if no - // child iterators are valid. This is the top of minHeap_ or maxHeap_ - // depending on the direction. IteratorWrapper* current_; + // If the value is true, both of iterators in the heap and current_ + // contain valid rows. If it is false, only current_ can possibly contain + // valid rows. + // This flag is always true for reverse direction, as we always use heap for + // the reverse iterating case. + bool use_heap_; // Which direction is the iterator moving? enum Direction { kForward, kReverse }; Direction direction_; + MergerMaxIterHeap maxHeap_; MergerMinIterHeap minHeap_; - // Max heap is used for reverse iteration, which is way less common than - // forward. Lazily initialize it to save memory. - std::unique_ptr maxHeap_; - - IteratorWrapper* CurrentForward() const { - assert(direction_ == kForward); - return !minHeap_.empty() ? minHeap_.top() : nullptr; - } - - IteratorWrapper* CurrentReverse() const { - assert(direction_ == kReverse); - assert(maxHeap_); - return !maxHeap_->empty() ? maxHeap_->top() : nullptr; - } }; -void MergingIterator::ClearHeaps() { - minHeap_.clear(); - if (maxHeap_) { - maxHeap_->clear(); +void MergingIterator::FindSmallest() { + assert(use_heap_); + if (minHeap_.empty()) { + current_ = nullptr; + } else { + current_ = minHeap_.top(); + assert(current_->Valid()); + minHeap_.pop(); } } -void MergingIterator::InitMaxHeap() { - if (!maxHeap_) { - maxHeap_.reset(new MergerMaxIterHeap(comparator_)); +void MergingIterator::FindLargest() { + assert(use_heap_); + if (maxHeap_.empty()) { + current_ = nullptr; + } else { + current_ = maxHeap_.top(); + assert(current_->Valid()); + maxHeap_.pop(); } } +void MergingIterator::ClearHeaps() { + use_heap_ = true; + maxHeap_ = NewMergerMaxIterHeap(comparator_); + minHeap_ = NewMergerMinIterHeap(comparator_); +} + Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n, Arena* arena) { assert(n >= 0); diff --git a/util/heap.h b/util/heap.h deleted file mode 100644 index 7d9e11113..000000000 --- a/util/heap.h +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#pragma once - -#include -#include -#include -#include "util/autovector.h" - -namespace rocksdb { - -// Binary heap implementation optimized for use in multi-way merge sort. -// Comparison to std::priority_queue: -// - In libstdc++, std::priority_queue::pop() usually performs just over logN -// comparisons but never fewer. -// - std::priority_queue does not have a replace-top operation, requiring a -// pop+push. If the replacement element is the new top, this requires -// around 2logN comparisons. -// - This heap's pop() uses a "schoolbook" downheap which requires up to ~2logN -// comparisons. -// - This heap provides a replace_top() operation which requires [1, 2logN] -// comparisons. When the replacement element is also the new top, this -// takes just 1 or 2 comparisons. -// -// The last property can yield an order-of-magnitude performance improvement -// when merge-sorting real-world non-random data. If the merge operation is -// likely to take chunks of elements from the same input stream, only 1 -// comparison per element is needed. In RocksDB-land, this happens when -// compacting a database where keys are not randomly distributed across L0 -// files but nearby keys are likely to be in the same L0 file. -// -// The container uses the same counterintuitive ordering as -// std::priority_queue: the comparison operator is expected to provide the -// less-than relation, but top() will return the maximum. - -template> -class BinaryHeap { - public: - BinaryHeap() { } - explicit BinaryHeap(Compare cmp) : cmp_(std::move(cmp)) { } - - void push(const T& value) { - data_.push_back(value); - upheap(data_.size() - 1); - } - - void push(T&& value) { - data_.push_back(std::move(value)); - upheap(data_.size() - 1); - } - - const T& top() const { - assert(!empty()); - return data_.front(); - } - - void replace_top(const T& value) { - assert(!empty()); - data_.front() = value; - downheap(get_root()); - } - - void replace_top(T&& value) { - assert(!empty()); - data_.front() = std::move(value); - downheap(get_root()); - } - - void pop() { - assert(!empty()); - data_.front() = std::move(data_.back()); - data_.pop_back(); - if (!empty()) { - downheap(get_root()); - } - } - - void swap(BinaryHeap &other) { - std::swap(cmp_, other.cmp_); - data_.swap(other.data_); - } - - void clear() { - data_.clear(); - } - - bool empty() const { - return data_.empty(); - } - - private: - static inline size_t get_root() { return 0; } - static inline size_t get_parent(size_t index) { return (index - 1) / 2; } - static inline size_t get_left(size_t index) { return 2 * index + 1; } - static inline size_t get_right(size_t index) { return 2 * index + 2; } - - void upheap(size_t index) { - T v = std::move(data_[index]); - while (index > get_root()) { - const size_t parent = get_parent(index); - if (!cmp_(data_[parent], v)) { - break; - } - data_[index] = std::move(data_[parent]); - index = parent; - } - data_[index] = std::move(v); - } - - void downheap(size_t index) { - T v = std::move(data_[index]); - while (1) { - const size_t left_child = get_left(index); - if (get_left(index) >= data_.size()) { - break; - } - const size_t right_child = left_child + 1; - assert(right_child == get_right(index)); - size_t picked_child = left_child; - if (right_child < data_.size() && - cmp_(data_[left_child], data_[right_child])) { - picked_child = right_child; - } - if (!cmp_(v, data_[picked_child])) { - break; - } - data_[index] = std::move(data_[picked_child]); - index = picked_child; - } - data_[index] = std::move(v); - } - - Compare cmp_; - autovector data_; -}; - -} // namespace rocksdb