rocksdb/table/merger.cc
Abhishek Kona 3f7af03a2d Use a priority queue to merge files.
Summary:
Use a std::priority_queue in merger.cc instead of doing a o(n) search
every time.
Currently only the ForwardIteration uses a Priority Queue.

Test Plan: make all check

Reviewers: dhruba

Reviewed By: dhruba

CC: emayanke, zshao

Differential Revision: https://reviews.facebook.net/D7629
2013-01-02 13:52:25 -08:00

230 lines
5.5 KiB
C++

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/merger.h"
#include "leveldb/comparator.h"
#include "leveldb/iterator.h"
#include "table/iter_heap.h"
#include "table/iterator_wrapper.h"
namespace leveldb {
namespace {
class MergingIterator : public Iterator {
public:
MergingIterator(const Comparator* comparator, Iterator** children, int n)
: comparator_(comparator),
children_(new IteratorWrapper[n]),
n_(n),
current_(NULL),
direction_(kForward),
maxHeap_(NewMaxIterHeap(comparator_)),
minHeap_ (NewMinIterHeap(comparator_)) {
for (int i = 0; i < n; i++) {
children_[i].Set(children[i]);
}
for (int i = 0; i < n; ++i) {
if (children_[i].Valid()) {
minHeap_.push(&children_[i]);
}
}
}
virtual ~MergingIterator() {
delete[] children_;
}
virtual bool Valid() const {
return (current_ != NULL);
}
virtual void SeekToFirst() {
ClearHeaps();
for (int i = 0; i < n_; i++) {
children_[i].SeekToFirst();
if (children_[i].Valid()) {
minHeap_.push(&children_[i]);
}
}
FindSmallest();
direction_ = kForward;
}
virtual void SeekToLast() {
ClearHeaps();
for (int i = 0; i < n_; i++) {
children_[i].SeekToLast();
if (children_[i].Valid()) {
maxHeap_.push(&children_[i]);
}
}
FindLargest();
direction_ = kReverse;
}
virtual void Seek(const Slice& target) {
ClearHeaps();
for (int i = 0; i < n_; i++) {
children_[i].Seek(target);
if (children_[i].Valid()) {
minHeap_.push(&children_[i]);
}
}
FindSmallest();
direction_ = kForward;
}
virtual void Next() {
assert(Valid());
// Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already
// true for all of the non-current_ children since current_ is
// the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kForward) {
ClearHeaps();
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid() &&
comparator_->Compare(key(), child->key()) == 0) {
child->Next();
}
if (child->Valid()) {
minHeap_.push(child);
}
}
}
direction_ = kForward;
}
// as the current points to the current record. move the iterator forward.
// and if it is valid add it to the heap.
current_->Next();
if (current_->Valid()){
minHeap_.push(current_);
}
FindSmallest();
}
virtual void Prev() {
assert(Valid());
// Ensure that all children are positioned before key().
// If we are moving in the reverse direction, it is already
// true for all of the non-current_ children since current_ is
// the largest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kReverse) {
ClearHeaps();
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid()) {
// Child is at first entry >= key(). Step back one to be < key()
child->Prev();
} else {
// Child has no entries >= key(). Position at last entry.
child->SeekToLast();
}
if (child->Valid()) {
maxHeap_.push(child);
}
}
}
direction_ = kReverse;
}
current_->Prev();
if (current_->Valid()) {
maxHeap_.push(current_);
}
FindLargest();
}
virtual Slice key() const {
assert(Valid());
return current_->key();
}
virtual Slice value() const {
assert(Valid());
return current_->value();
}
virtual Status status() const {
Status status;
for (int i = 0; i < n_; i++) {
status = children_[i].status();
if (!status.ok()) {
break;
}
}
return status;
}
private:
void FindSmallest();
void FindLargest();
void ClearHeaps();
const Comparator* comparator_;
IteratorWrapper* children_;
int n_;
IteratorWrapper* current_;
// Which direction is the iterator moving?
enum Direction {
kForward,
kReverse
};
Direction direction_;
MaxIterHeap maxHeap_;
MinIterHeap minHeap_;
};
void MergingIterator::FindSmallest() {
assert (direction_ == kForward);
if (minHeap_.empty()) {
current_ = NULL;
} else {
current_ = minHeap_.top();
assert(current_->Valid());
minHeap_.pop();
}
}
void MergingIterator::FindLargest() {
assert(direction_ == kReverse);
if (maxHeap_.empty()) {
current_ = NULL;
} else {
current_ = maxHeap_.top();
assert(current_->Valid());
maxHeap_.pop();
}
}
void MergingIterator::ClearHeaps() {
maxHeap_ = NewMaxIterHeap(comparator_);
minHeap_ = NewMinIterHeap(comparator_);
}
} // namespace
Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) {
assert(n >= 0);
if (n == 0) {
return NewEmptyIterator();
} else if (n == 1) {
return list[0];
} else {
return new MergingIterator(cmp, list, n);
}
}
} // namespace leveldb