Replace std::priority_queue in MergingIterator with custom heap, take 2
Summary: Repeat ofb6655a679d
(reverted inb7a2369fb2
) with a proper fix for the issue that57d216ea65
was trying to fix. Test Plan: make check for i in $(seq 100); do ./db_stress --test_batches_snapshots=1 --threads=32 --write_buffer_size=4194304 --destroy_db_initially=0 --reopen=20 --readpercent=45 --prefixpercent=5 --writepercent=35 --delpercent=5 --iterpercent=10 --db=/tmp/rocksdb_crashtest_KdCI5F --max_key=100000000 --mmap_read=0 --block_size=16384 --cache_size=1048576 --open_files=500000 --verify_checksum=1 --sync=0 --progress_reports=0 --disable_wal=0 --disable_data_sync=1 --target_file_size_base=2097152 --target_file_size_multiplier=2 --max_write_buffer_number=3 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --filter_deletes=0 --memtablerep=prefix_hash --prefix_size=7 --ops_per_thread=200 || break; done Reviewers: anthony, sdong, igor, yhchiang Reviewed By: igor, yhchiang Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D41391
This commit is contained in:
parent
9a6a0bd8c9
commit
e1c99e10c1
6
Makefile
6
Makefile
@ -294,7 +294,8 @@ TESTS = \
|
|||||||
perf_context_test \
|
perf_context_test \
|
||||||
optimistic_transaction_test \
|
optimistic_transaction_test \
|
||||||
write_callback_test \
|
write_callback_test \
|
||||||
compaction_job_stats_test
|
compaction_job_stats_test \
|
||||||
|
heap_test
|
||||||
|
|
||||||
SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/)
|
SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/)
|
||||||
|
|
||||||
@ -873,6 +874,9 @@ memtable_list_test: db/memtable_list_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
|||||||
write_callback_test: db/write_callback_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
write_callback_test: db/write_callback_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||||
$(AM_LINK)
|
$(AM_LINK)
|
||||||
|
|
||||||
|
heap_test: util/heap_test.o $(GTEST)
|
||||||
|
$(AM_LINK)
|
||||||
|
|
||||||
sst_dump: tools/sst_dump.o $(LIBOBJECTS)
|
sst_dump: tools/sst_dump.o $(LIBOBJECTS)
|
||||||
$(AM_LINK)
|
$(AM_LINK)
|
||||||
|
|
||||||
|
@ -5,36 +5,34 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
#include <queue>
|
|
||||||
|
|
||||||
#include "rocksdb/comparator.h"
|
#include "rocksdb/comparator.h"
|
||||||
#include "table/iterator_wrapper.h"
|
#include "table/iterator_wrapper.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
// Return the max of two keys.
|
// When used with std::priority_queue, this comparison functor puts the
|
||||||
|
// iterator with the max/largest key on top.
|
||||||
class MaxIteratorComparator {
|
class MaxIteratorComparator {
|
||||||
public:
|
public:
|
||||||
MaxIteratorComparator(const Comparator* comparator) :
|
MaxIteratorComparator(const Comparator* comparator) :
|
||||||
comparator_(comparator) {}
|
comparator_(comparator) {}
|
||||||
|
|
||||||
bool operator()(IteratorWrapper* a, IteratorWrapper* b) {
|
bool operator()(IteratorWrapper* a, IteratorWrapper* b) const {
|
||||||
return comparator_->Compare(a->key(), b->key()) <= 0;
|
return comparator_->Compare(a->key(), b->key()) < 0;
|
||||||
}
|
}
|
||||||
private:
|
private:
|
||||||
const Comparator* comparator_;
|
const Comparator* comparator_;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Return the max of two keys.
|
// When used with std::priority_queue, this comparison functor puts the
|
||||||
|
// iterator with the min/smallest key on top.
|
||||||
class MinIteratorComparator {
|
class MinIteratorComparator {
|
||||||
public:
|
public:
|
||||||
// if maxHeap is set comparator returns the max value.
|
|
||||||
// else returns the min Value.
|
|
||||||
// Can use to create a minHeap or a maxHeap.
|
|
||||||
MinIteratorComparator(const Comparator* comparator) :
|
MinIteratorComparator(const Comparator* comparator) :
|
||||||
comparator_(comparator) {}
|
comparator_(comparator) {}
|
||||||
|
|
||||||
bool operator()(IteratorWrapper* a, IteratorWrapper* b) {
|
bool operator()(IteratorWrapper* a, IteratorWrapper* b) const {
|
||||||
return comparator_->Compare(a->key(), b->key()) > 0;
|
return comparator_->Compare(a->key(), b->key()) > 0;
|
||||||
}
|
}
|
||||||
private:
|
private:
|
||||||
|
204
table/merger.cc
204
table/merger.cc
@ -9,7 +9,6 @@
|
|||||||
|
|
||||||
#include "table/merger.h"
|
#include "table/merger.h"
|
||||||
|
|
||||||
#include <queue>
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "rocksdb/comparator.h"
|
#include "rocksdb/comparator.h"
|
||||||
@ -18,6 +17,7 @@
|
|||||||
#include "table/iter_heap.h"
|
#include "table/iter_heap.h"
|
||||||
#include "table/iterator_wrapper.h"
|
#include "table/iterator_wrapper.h"
|
||||||
#include "util/arena.h"
|
#include "util/arena.h"
|
||||||
|
#include "util/heap.h"
|
||||||
#include "util/stop_watch.h"
|
#include "util/stop_watch.h"
|
||||||
#include "util/perf_context_imp.h"
|
#include "util/perf_context_imp.h"
|
||||||
#include "util/autovector.h"
|
#include "util/autovector.h"
|
||||||
@ -25,21 +25,8 @@
|
|||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
|
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
|
||||||
namespace {
|
namespace {
|
||||||
typedef std::priority_queue<IteratorWrapper*, std::vector<IteratorWrapper*>,
|
typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
|
||||||
MaxIteratorComparator> MergerMaxIterHeap;
|
typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
|
||||||
|
|
||||||
typedef std::priority_queue<IteratorWrapper*, std::vector<IteratorWrapper*>,
|
|
||||||
MinIteratorComparator> MergerMinIterHeap;
|
|
||||||
|
|
||||||
// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator.
|
|
||||||
MergerMaxIterHeap NewMergerMaxIterHeap(const Comparator* comparator) {
|
|
||||||
return MergerMaxIterHeap(MaxIteratorComparator(comparator));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return's a new MinHeap of IteratorWrapper's using the provided Comparator.
|
|
||||||
MergerMinIterHeap NewMergerMinIterHeap(const Comparator* comparator) {
|
|
||||||
return MergerMinIterHeap(MinIteratorComparator(comparator));
|
|
||||||
}
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
const size_t kNumIterReserve = 4;
|
const size_t kNumIterReserve = 4;
|
||||||
@ -51,10 +38,8 @@ class MergingIterator : public Iterator {
|
|||||||
: is_arena_mode_(is_arena_mode),
|
: is_arena_mode_(is_arena_mode),
|
||||||
comparator_(comparator),
|
comparator_(comparator),
|
||||||
current_(nullptr),
|
current_(nullptr),
|
||||||
use_heap_(true),
|
|
||||||
direction_(kForward),
|
direction_(kForward),
|
||||||
maxHeap_(NewMergerMaxIterHeap(comparator_)),
|
minHeap_(comparator_) {
|
||||||
minHeap_(NewMergerMinIterHeap(comparator_)) {
|
|
||||||
children_.resize(n);
|
children_.resize(n);
|
||||||
for (int i = 0; i < n; i++) {
|
for (int i = 0; i < n; i++) {
|
||||||
children_[i].Set(children[i]);
|
children_[i].Set(children[i]);
|
||||||
@ -64,6 +49,7 @@ class MergingIterator : public Iterator {
|
|||||||
minHeap_.push(&child);
|
minHeap_.push(&child);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
current_ = CurrentForward();
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void AddIterator(Iterator* iter) {
|
virtual void AddIterator(Iterator* iter) {
|
||||||
@ -72,6 +58,7 @@ class MergingIterator : public Iterator {
|
|||||||
auto new_wrapper = children_.back();
|
auto new_wrapper = children_.back();
|
||||||
if (new_wrapper.Valid()) {
|
if (new_wrapper.Valid()) {
|
||||||
minHeap_.push(&new_wrapper);
|
minHeap_.push(&new_wrapper);
|
||||||
|
current_ = CurrentForward();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,27 +78,25 @@ class MergingIterator : public Iterator {
|
|||||||
minHeap_.push(&child);
|
minHeap_.push(&child);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FindSmallest();
|
|
||||||
direction_ = kForward;
|
direction_ = kForward;
|
||||||
|
current_ = CurrentForward();
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void SeekToLast() override {
|
virtual void SeekToLast() override {
|
||||||
ClearHeaps();
|
ClearHeaps();
|
||||||
|
InitMaxHeap();
|
||||||
for (auto& child : children_) {
|
for (auto& child : children_) {
|
||||||
child.SeekToLast();
|
child.SeekToLast();
|
||||||
if (child.Valid()) {
|
if (child.Valid()) {
|
||||||
maxHeap_.push(&child);
|
maxHeap_->push(&child);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FindLargest();
|
|
||||||
direction_ = kReverse;
|
direction_ = kReverse;
|
||||||
|
current_ = CurrentReverse();
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void Seek(const Slice& target) override {
|
virtual void Seek(const Slice& target) override {
|
||||||
// Invalidate the heap.
|
ClearHeaps();
|
||||||
use_heap_ = false;
|
|
||||||
IteratorWrapper* first_child = nullptr;
|
|
||||||
|
|
||||||
for (auto& child : children_) {
|
for (auto& child : children_) {
|
||||||
{
|
{
|
||||||
PERF_TIMER_GUARD(seek_child_seek_time);
|
PERF_TIMER_GUARD(seek_child_seek_time);
|
||||||
@ -120,36 +105,15 @@ class MergingIterator : public Iterator {
|
|||||||
PERF_COUNTER_ADD(seek_child_seek_count, 1);
|
PERF_COUNTER_ADD(seek_child_seek_count, 1);
|
||||||
|
|
||||||
if (child.Valid()) {
|
if (child.Valid()) {
|
||||||
// This child has valid key
|
PERF_TIMER_GUARD(seek_min_heap_time);
|
||||||
if (!use_heap_) {
|
minHeap_.push(&child);
|
||||||
if (first_child == nullptr) {
|
|
||||||
// It's the first child has valid key. Only put it int
|
|
||||||
// current_. Now the values in the heap should be invalid.
|
|
||||||
first_child = &child;
|
|
||||||
} else {
|
|
||||||
// We have more than one children with valid keys. Initialize
|
|
||||||
// the heap and put the first child into the heap.
|
|
||||||
PERF_TIMER_GUARD(seek_min_heap_time);
|
|
||||||
ClearHeaps();
|
|
||||||
minHeap_.push(first_child);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (use_heap_) {
|
|
||||||
PERF_TIMER_GUARD(seek_min_heap_time);
|
|
||||||
minHeap_.push(&child);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (use_heap_) {
|
|
||||||
// If heap is valid, need to put the smallest key to curent_.
|
|
||||||
PERF_TIMER_GUARD(seek_min_heap_time);
|
|
||||||
FindSmallest();
|
|
||||||
} else {
|
|
||||||
// The heap is not valid, then the current_ iterator is the first
|
|
||||||
// one, or null if there is no first child.
|
|
||||||
current_ = first_child;
|
|
||||||
}
|
|
||||||
direction_ = kForward;
|
direction_ = kForward;
|
||||||
|
{
|
||||||
|
PERF_TIMER_GUARD(seek_min_heap_time);
|
||||||
|
current_ = CurrentForward();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void Next() override {
|
virtual void Next() override {
|
||||||
@ -157,10 +121,11 @@ class MergingIterator : public Iterator {
|
|||||||
|
|
||||||
// Ensure that all children are positioned after key().
|
// Ensure that all children are positioned after key().
|
||||||
// If we are moving in the forward direction, it is already
|
// If we are moving in the forward direction, it is already
|
||||||
// true for all of the non-current_ children since current_ is
|
// true for all of the non-current children since current_ is
|
||||||
// the smallest child and key() == current_->key(). Otherwise,
|
// the smallest child and key() == current_->key().
|
||||||
// we explicitly position the non-current_ children.
|
|
||||||
if (direction_ != kForward) {
|
if (direction_ != kForward) {
|
||||||
|
// Otherwise, advance the non-current children. We advance current_
|
||||||
|
// just after the if-block.
|
||||||
ClearHeaps();
|
ClearHeaps();
|
||||||
for (auto& child : children_) {
|
for (auto& child : children_) {
|
||||||
if (&child != current_) {
|
if (&child != current_) {
|
||||||
@ -169,36 +134,46 @@ class MergingIterator : public Iterator {
|
|||||||
comparator_->Compare(key(), child.key()) == 0) {
|
comparator_->Compare(key(), child.key()) == 0) {
|
||||||
child.Next();
|
child.Next();
|
||||||
}
|
}
|
||||||
if (child.Valid()) {
|
}
|
||||||
minHeap_.push(&child);
|
if (child.Valid()) {
|
||||||
}
|
minHeap_.push(&child);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
direction_ = kForward;
|
direction_ = kForward;
|
||||||
|
// The loop advanced all non-current children to be > key() so current_
|
||||||
|
// should still be strictly the smallest key.
|
||||||
|
assert(current_ == CurrentForward());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For the heap modifications below to be correct, current_ must be the
|
||||||
|
// current top of the heap.
|
||||||
|
assert(current_ == CurrentForward());
|
||||||
|
|
||||||
// as the current points to the current record. move the iterator forward.
|
// as the current points to the current record. move the iterator forward.
|
||||||
// and if it is valid add it to the heap.
|
|
||||||
current_->Next();
|
current_->Next();
|
||||||
if (use_heap_) {
|
if (current_->Valid()) {
|
||||||
if (current_->Valid()) {
|
// current is still valid after the Next() call above. Call
|
||||||
minHeap_.push(current_);
|
// replace_top() to restore the heap property. When the same child
|
||||||
}
|
// iterator yields a sequence of keys, this is cheap.
|
||||||
FindSmallest();
|
minHeap_.replace_top(current_);
|
||||||
} else if (!current_->Valid()) {
|
} else {
|
||||||
current_ = nullptr;
|
// current stopped being valid, remove it from the heap.
|
||||||
|
minHeap_.pop();
|
||||||
}
|
}
|
||||||
|
current_ = CurrentForward();
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void Prev() override {
|
virtual void Prev() override {
|
||||||
assert(Valid());
|
assert(Valid());
|
||||||
// Ensure that all children are positioned before key().
|
// Ensure that all children are positioned before key().
|
||||||
// If we are moving in the reverse direction, it is already
|
// If we are moving in the reverse direction, it is already
|
||||||
// true for all of the non-current_ children since current_ is
|
// true for all of the non-current children since current_ is
|
||||||
// the largest child and key() == current_->key(). Otherwise,
|
// the largest child and key() == current_->key().
|
||||||
// we explicitly position the non-current_ children.
|
|
||||||
if (direction_ != kReverse) {
|
if (direction_ != kReverse) {
|
||||||
|
// Otherwise, retreat the non-current children. We retreat current_
|
||||||
|
// just after the if-block.
|
||||||
ClearHeaps();
|
ClearHeaps();
|
||||||
|
InitMaxHeap();
|
||||||
for (auto& child : children_) {
|
for (auto& child : children_) {
|
||||||
if (&child != current_) {
|
if (&child != current_) {
|
||||||
child.Seek(key());
|
child.Seek(key());
|
||||||
@ -209,9 +184,9 @@ class MergingIterator : public Iterator {
|
|||||||
// Child has no entries >= key(). Position at last entry.
|
// Child has no entries >= key(). Position at last entry.
|
||||||
child.SeekToLast();
|
child.SeekToLast();
|
||||||
}
|
}
|
||||||
if (child.Valid()) {
|
}
|
||||||
maxHeap_.push(&child);
|
if (child.Valid()) {
|
||||||
}
|
maxHeap_->push(&child);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
direction_ = kReverse;
|
direction_ = kReverse;
|
||||||
@ -219,15 +194,24 @@ class MergingIterator : public Iterator {
|
|||||||
// because it is possible to have some keys larger than the seek-key
|
// because it is possible to have some keys larger than the seek-key
|
||||||
// inserted between Seek() and SeekToLast(), which makes current_ not
|
// inserted between Seek() and SeekToLast(), which makes current_ not
|
||||||
// equal to CurrentReverse().
|
// equal to CurrentReverse().
|
||||||
//
|
current_ = CurrentReverse();
|
||||||
// assert(current_ == CurrentReverse());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For the heap modifications below to be correct, current_ must be the
|
||||||
|
// current top of the heap.
|
||||||
|
assert(current_ == CurrentReverse());
|
||||||
|
|
||||||
current_->Prev();
|
current_->Prev();
|
||||||
if (current_->Valid()) {
|
if (current_->Valid()) {
|
||||||
maxHeap_.push(current_);
|
// current is still valid after the Prev() call above. Call
|
||||||
|
// replace_top() to restore the heap property. When the same child
|
||||||
|
// iterator yields a sequence of keys, this is cheap.
|
||||||
|
maxHeap_->replace_top(current_);
|
||||||
|
} else {
|
||||||
|
// current stopped being valid, remove it from the heap.
|
||||||
|
maxHeap_->pop();
|
||||||
}
|
}
|
||||||
FindLargest();
|
current_ = CurrentReverse();
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Slice key() const override {
|
virtual Slice key() const override {
|
||||||
@ -252,56 +236,54 @@ class MergingIterator : public Iterator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void FindSmallest();
|
// Clears heaps for both directions, used when changing direction or seeking
|
||||||
void FindLargest();
|
|
||||||
void ClearHeaps();
|
void ClearHeaps();
|
||||||
|
// Ensures that maxHeap_ is initialized when starting to go in the reverse
|
||||||
|
// direction
|
||||||
|
void InitMaxHeap();
|
||||||
|
|
||||||
bool is_arena_mode_;
|
bool is_arena_mode_;
|
||||||
const Comparator* comparator_;
|
const Comparator* comparator_;
|
||||||
autovector<IteratorWrapper, kNumIterReserve> children_;
|
autovector<IteratorWrapper, kNumIterReserve> children_;
|
||||||
|
|
||||||
|
// Cached pointer to child iterator with the current key, or nullptr if no
|
||||||
|
// child iterators are valid. This is the top of minHeap_ or maxHeap_
|
||||||
|
// depending on the direction.
|
||||||
IteratorWrapper* current_;
|
IteratorWrapper* current_;
|
||||||
// If the value is true, both of iterators in the heap and current_
|
|
||||||
// contain valid rows. If it is false, only current_ can possibly contain
|
|
||||||
// valid rows.
|
|
||||||
// This flag is always true for reverse direction, as we always use heap for
|
|
||||||
// the reverse iterating case.
|
|
||||||
bool use_heap_;
|
|
||||||
// Which direction is the iterator moving?
|
// Which direction is the iterator moving?
|
||||||
enum Direction {
|
enum Direction {
|
||||||
kForward,
|
kForward,
|
||||||
kReverse
|
kReverse
|
||||||
};
|
};
|
||||||
Direction direction_;
|
Direction direction_;
|
||||||
MergerMaxIterHeap maxHeap_;
|
|
||||||
MergerMinIterHeap minHeap_;
|
MergerMinIterHeap minHeap_;
|
||||||
|
// Max heap is used for reverse iteration, which is way less common than
|
||||||
|
// forward. Lazily initialize it to save memory.
|
||||||
|
std::unique_ptr<MergerMaxIterHeap> maxHeap_;
|
||||||
|
|
||||||
|
IteratorWrapper* CurrentForward() const {
|
||||||
|
assert(direction_ == kForward);
|
||||||
|
return !minHeap_.empty() ? minHeap_.top() : nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
IteratorWrapper* CurrentReverse() const {
|
||||||
|
assert(direction_ == kReverse);
|
||||||
|
assert(maxHeap_);
|
||||||
|
return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
void MergingIterator::FindSmallest() {
|
|
||||||
assert(use_heap_);
|
|
||||||
if (minHeap_.empty()) {
|
|
||||||
current_ = nullptr;
|
|
||||||
} else {
|
|
||||||
current_ = minHeap_.top();
|
|
||||||
assert(current_->Valid());
|
|
||||||
minHeap_.pop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void MergingIterator::FindLargest() {
|
|
||||||
assert(use_heap_);
|
|
||||||
if (maxHeap_.empty()) {
|
|
||||||
current_ = nullptr;
|
|
||||||
} else {
|
|
||||||
current_ = maxHeap_.top();
|
|
||||||
assert(current_->Valid());
|
|
||||||
maxHeap_.pop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void MergingIterator::ClearHeaps() {
|
void MergingIterator::ClearHeaps() {
|
||||||
use_heap_ = true;
|
minHeap_.clear();
|
||||||
maxHeap_ = NewMergerMaxIterHeap(comparator_);
|
if (maxHeap_) {
|
||||||
minHeap_ = NewMergerMinIterHeap(comparator_);
|
maxHeap_->clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void MergingIterator::InitMaxHeap() {
|
||||||
|
if (!maxHeap_) {
|
||||||
|
maxHeap_.reset(new MergerMaxIterHeap(comparator_));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n,
|
Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n,
|
||||||
|
140
util/heap.h
Normal file
140
util/heap.h
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <functional>
|
||||||
|
#include "util/autovector.h"
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
|
||||||
|
// Binary heap implementation optimized for use in multi-way merge sort.
|
||||||
|
// Comparison to std::priority_queue:
|
||||||
|
// - In libstdc++, std::priority_queue::pop() usually performs just over logN
|
||||||
|
// comparisons but never fewer.
|
||||||
|
// - std::priority_queue does not have a replace-top operation, requiring a
|
||||||
|
// pop+push. If the replacement element is the new top, this requires
|
||||||
|
// around 2logN comparisons.
|
||||||
|
// - This heap's pop() uses a "schoolbook" downheap which requires up to ~2logN
|
||||||
|
// comparisons.
|
||||||
|
// - This heap provides a replace_top() operation which requires [1, 2logN]
|
||||||
|
// comparisons. When the replacement element is also the new top, this
|
||||||
|
// takes just 1 or 2 comparisons.
|
||||||
|
//
|
||||||
|
// The last property can yield an order-of-magnitude performance improvement
|
||||||
|
// when merge-sorting real-world non-random data. If the merge operation is
|
||||||
|
// likely to take chunks of elements from the same input stream, only 1
|
||||||
|
// comparison per element is needed. In RocksDB-land, this happens when
|
||||||
|
// compacting a database where keys are not randomly distributed across L0
|
||||||
|
// files but nearby keys are likely to be in the same L0 file.
|
||||||
|
//
|
||||||
|
// The container uses the same counterintuitive ordering as
|
||||||
|
// std::priority_queue: the comparison operator is expected to provide the
|
||||||
|
// less-than relation, but top() will return the maximum.
|
||||||
|
|
||||||
|
template<typename T, typename Compare = std::less<T>>
|
||||||
|
class BinaryHeap {
|
||||||
|
public:
|
||||||
|
BinaryHeap() { }
|
||||||
|
explicit BinaryHeap(Compare cmp) : cmp_(std::move(cmp)) { }
|
||||||
|
|
||||||
|
void push(const T& value) {
|
||||||
|
data_.push_back(value);
|
||||||
|
upheap(data_.size() - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void push(T&& value) {
|
||||||
|
data_.push_back(std::move(value));
|
||||||
|
upheap(data_.size() - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
const T& top() const {
|
||||||
|
assert(!empty());
|
||||||
|
return data_.front();
|
||||||
|
}
|
||||||
|
|
||||||
|
void replace_top(const T& value) {
|
||||||
|
assert(!empty());
|
||||||
|
data_.front() = value;
|
||||||
|
downheap(get_root());
|
||||||
|
}
|
||||||
|
|
||||||
|
void replace_top(T&& value) {
|
||||||
|
assert(!empty());
|
||||||
|
data_.front() = std::move(value);
|
||||||
|
downheap(get_root());
|
||||||
|
}
|
||||||
|
|
||||||
|
void pop() {
|
||||||
|
assert(!empty());
|
||||||
|
data_.front() = std::move(data_.back());
|
||||||
|
data_.pop_back();
|
||||||
|
if (!empty()) {
|
||||||
|
downheap(get_root());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void swap(BinaryHeap &other) {
|
||||||
|
std::swap(cmp_, other.cmp_);
|
||||||
|
data_.swap(other.data_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void clear() {
|
||||||
|
data_.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool empty() const {
|
||||||
|
return data_.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
static inline size_t get_root() { return 0; }
|
||||||
|
static inline size_t get_parent(size_t index) { return (index - 1) / 2; }
|
||||||
|
static inline size_t get_left(size_t index) { return 2 * index + 1; }
|
||||||
|
static inline size_t get_right(size_t index) { return 2 * index + 2; }
|
||||||
|
|
||||||
|
void upheap(size_t index) {
|
||||||
|
T v = std::move(data_[index]);
|
||||||
|
while (index > get_root()) {
|
||||||
|
const size_t parent = get_parent(index);
|
||||||
|
if (!cmp_(data_[parent], v)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
data_[index] = std::move(data_[parent]);
|
||||||
|
index = parent;
|
||||||
|
}
|
||||||
|
data_[index] = std::move(v);
|
||||||
|
}
|
||||||
|
|
||||||
|
void downheap(size_t index) {
|
||||||
|
T v = std::move(data_[index]);
|
||||||
|
while (1) {
|
||||||
|
const size_t left_child = get_left(index);
|
||||||
|
if (get_left(index) >= data_.size()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
const size_t right_child = left_child + 1;
|
||||||
|
assert(right_child == get_right(index));
|
||||||
|
size_t picked_child = left_child;
|
||||||
|
if (right_child < data_.size() &&
|
||||||
|
cmp_(data_[left_child], data_[right_child])) {
|
||||||
|
picked_child = right_child;
|
||||||
|
}
|
||||||
|
if (!cmp_(v, data_[picked_child])) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
data_[index] = std::move(data_[picked_child]);
|
||||||
|
index = picked_child;
|
||||||
|
}
|
||||||
|
data_[index] = std::move(v);
|
||||||
|
}
|
||||||
|
|
||||||
|
Compare cmp_;
|
||||||
|
autovector<T> data_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace rocksdb
|
138
util/heap_test.cc
Normal file
138
util/heap_test.cc
Normal file
@ -0,0 +1,138 @@
|
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <climits>
|
||||||
|
|
||||||
|
#include <queue>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
#include "util/heap.h"
|
||||||
|
|
||||||
|
#ifndef GFLAGS
|
||||||
|
const int64_t FLAGS_iters = 100000;
|
||||||
|
#else
|
||||||
|
#include <gflags/gflags.h>
|
||||||
|
DEFINE_int64(iters, 100000, "number of pseudo-random operations in each test");
|
||||||
|
#endif // GFLAGS
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Compares the custom heap implementation in util/heap.h against
|
||||||
|
* std::priority_queue on a pseudo-random sequence of operations.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
|
||||||
|
using HeapTestValue = uint64_t;
|
||||||
|
using Params = std::tuple<size_t, HeapTestValue, int64_t>;
|
||||||
|
|
||||||
|
class HeapTest : public ::testing::TestWithParam<Params> {
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_P(HeapTest, Test) {
|
||||||
|
// This test performs the same pseudorandom sequence of operations on a
|
||||||
|
// BinaryHeap and an std::priority_queue, comparing output. The three
|
||||||
|
// possible operations are insert, replace top and pop.
|
||||||
|
//
|
||||||
|
// Insert is chosen slightly more often than the others so that the size of
|
||||||
|
// the heap slowly grows. Once the size heats the MAX_HEAP_SIZE limit, we
|
||||||
|
// disallow inserting until the heap becomes empty, testing the "draining"
|
||||||
|
// scenario.
|
||||||
|
|
||||||
|
const auto MAX_HEAP_SIZE = std::get<0>(GetParam());
|
||||||
|
const auto MAX_VALUE = std::get<1>(GetParam());
|
||||||
|
const auto RNG_SEED = std::get<2>(GetParam());
|
||||||
|
|
||||||
|
BinaryHeap<HeapTestValue> heap;
|
||||||
|
std::priority_queue<HeapTestValue> ref;
|
||||||
|
|
||||||
|
std::mt19937 rng(RNG_SEED);
|
||||||
|
std::uniform_int_distribution<HeapTestValue> value_dist(0, MAX_VALUE);
|
||||||
|
int ndrains = 0;
|
||||||
|
bool draining = false; // hit max size, draining until we empty the heap
|
||||||
|
size_t size = 0;
|
||||||
|
for (int64_t i = 0; i < FLAGS_iters; ++i) {
|
||||||
|
if (size == 0) {
|
||||||
|
draining = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!draining &&
|
||||||
|
(size == 0 || std::bernoulli_distribution(0.4)(rng))) {
|
||||||
|
// insert
|
||||||
|
HeapTestValue val = value_dist(rng);
|
||||||
|
heap.push(val);
|
||||||
|
ref.push(val);
|
||||||
|
++size;
|
||||||
|
if (size == MAX_HEAP_SIZE) {
|
||||||
|
draining = true;
|
||||||
|
++ndrains;
|
||||||
|
}
|
||||||
|
} else if (std::bernoulli_distribution(0.5)(rng)) {
|
||||||
|
// replace top
|
||||||
|
HeapTestValue val = value_dist(rng);
|
||||||
|
heap.replace_top(val);
|
||||||
|
ref.pop();
|
||||||
|
ref.push(val);
|
||||||
|
} else {
|
||||||
|
// pop
|
||||||
|
assert(size > 0);
|
||||||
|
heap.pop();
|
||||||
|
ref.pop();
|
||||||
|
--size;
|
||||||
|
}
|
||||||
|
|
||||||
|
// After every operation, check that the public methods give the same
|
||||||
|
// results
|
||||||
|
assert((size == 0) == ref.empty());
|
||||||
|
ASSERT_EQ(size == 0, heap.empty());
|
||||||
|
if (size > 0) {
|
||||||
|
ASSERT_EQ(ref.top(), heap.top());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Probabilities should be set up to occasionally hit the max heap size and
|
||||||
|
// drain it
|
||||||
|
assert(ndrains > 0);
|
||||||
|
|
||||||
|
heap.clear();
|
||||||
|
ASSERT_TRUE(heap.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Basic test, MAX_VALUE = 3*MAX_HEAP_SIZE (occasional duplicates)
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
Basic, HeapTest,
|
||||||
|
::testing::Values(Params(1000, 3000, 0x1b575cf05b708945))
|
||||||
|
);
|
||||||
|
// Mid-size heap with small values (many duplicates)
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
SmallValues, HeapTest,
|
||||||
|
::testing::Values(Params(100, 10, 0x5ae213f7bd5dccd0))
|
||||||
|
);
|
||||||
|
// Small heap, large value range (no duplicates)
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
SmallHeap, HeapTest,
|
||||||
|
::testing::Values(Params(10, ULLONG_MAX, 0x3e1fa8f4d01707cf))
|
||||||
|
);
|
||||||
|
// Two-element heap
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
TwoElementHeap, HeapTest,
|
||||||
|
::testing::Values(Params(2, 5, 0x4b5e13ea988c6abc))
|
||||||
|
);
|
||||||
|
// One-element heap
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
OneElementHeap, HeapTest,
|
||||||
|
::testing::Values(Params(1, 3, 0x176a1019ab0b612e))
|
||||||
|
);
|
||||||
|
|
||||||
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
::testing::InitGoogleTest(&argc, argv);
|
||||||
|
#ifdef GFLAGS
|
||||||
|
GFLAGS::ParseCommandLineFlags(&argc, &argv, true);
|
||||||
|
#endif // GFLAGS
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user