// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "table/merger.h" #include "leveldb/comparator.h" #include "leveldb/iterator.h" #include "table/iter_heap.h" #include "table/iterator_wrapper.h" namespace leveldb { namespace { class MergingIterator : public Iterator { public: MergingIterator(const Comparator* comparator, Iterator** children, int n) : comparator_(comparator), children_(new IteratorWrapper[n]), n_(n), current_(NULL), direction_(kForward), maxHeap_(NewMaxIterHeap(comparator_)), minHeap_ (NewMinIterHeap(comparator_)) { for (int i = 0; i < n; i++) { children_[i].Set(children[i]); } for (int i = 0; i < n; ++i) { if (children_[i].Valid()) { minHeap_.push(&children_[i]); } } } virtual ~MergingIterator() { delete[] children_; } virtual bool Valid() const { return (current_ != NULL); } virtual void SeekToFirst() { ClearHeaps(); for (int i = 0; i < n_; i++) { children_[i].SeekToFirst(); if (children_[i].Valid()) { minHeap_.push(&children_[i]); } } FindSmallest(); direction_ = kForward; } virtual void SeekToLast() { ClearHeaps(); for (int i = 0; i < n_; i++) { children_[i].SeekToLast(); if (children_[i].Valid()) { maxHeap_.push(&children_[i]); } } FindLargest(); direction_ = kReverse; } virtual void Seek(const Slice& target) { ClearHeaps(); for (int i = 0; i < n_; i++) { children_[i].Seek(target); if (children_[i].Valid()) { minHeap_.push(&children_[i]); } } FindSmallest(); direction_ = kForward; } virtual void Next() { assert(Valid()); // Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already // true for all of the non-current_ children since current_ is // the smallest child and key() == current_->key(). Otherwise, // we explicitly position the non-current_ children. if (direction_ != kForward) { ClearHeaps(); for (int i = 0; i < n_; i++) { IteratorWrapper* child = &children_[i]; if (child != current_) { child->Seek(key()); if (child->Valid() && comparator_->Compare(key(), child->key()) == 0) { child->Next(); } if (child->Valid()) { minHeap_.push(child); } } } direction_ = kForward; } // as the current points to the current record. move the iterator forward. // and if it is valid add it to the heap. current_->Next(); if (current_->Valid()){ minHeap_.push(current_); } FindSmallest(); } virtual void Prev() { assert(Valid()); // Ensure that all children are positioned before key(). // If we are moving in the reverse direction, it is already // true for all of the non-current_ children since current_ is // the largest child and key() == current_->key(). Otherwise, // we explicitly position the non-current_ children. if (direction_ != kReverse) { ClearHeaps(); for (int i = 0; i < n_; i++) { IteratorWrapper* child = &children_[i]; if (child != current_) { child->Seek(key()); if (child->Valid()) { // Child is at first entry >= key(). Step back one to be < key() child->Prev(); } else { // Child has no entries >= key(). Position at last entry. child->SeekToLast(); } if (child->Valid()) { maxHeap_.push(child); } } } direction_ = kReverse; } current_->Prev(); if (current_->Valid()) { maxHeap_.push(current_); } FindLargest(); } virtual Slice key() const { assert(Valid()); return current_->key(); } virtual Slice value() const { assert(Valid()); return current_->value(); } virtual Status status() const { Status status; for (int i = 0; i < n_; i++) { status = children_[i].status(); if (!status.ok()) { break; } } return status; } private: void FindSmallest(); void FindLargest(); void ClearHeaps(); const Comparator* comparator_; IteratorWrapper* children_; int n_; IteratorWrapper* current_; // Which direction is the iterator moving? enum Direction { kForward, kReverse }; Direction direction_; MaxIterHeap maxHeap_; MinIterHeap minHeap_; }; void MergingIterator::FindSmallest() { assert (direction_ == kForward); if (minHeap_.empty()) { current_ = NULL; } else { current_ = minHeap_.top(); assert(current_->Valid()); minHeap_.pop(); } } void MergingIterator::FindLargest() { assert(direction_ == kReverse); if (maxHeap_.empty()) { current_ = NULL; } else { current_ = maxHeap_.top(); assert(current_->Valid()); maxHeap_.pop(); } } void MergingIterator::ClearHeaps() { maxHeap_ = NewMaxIterHeap(comparator_); minHeap_ = NewMinIterHeap(comparator_); } } // namespace Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) { assert(n >= 0); if (n == 0) { return NewEmptyIterator(); } else if (n == 1) { return list[0]; } else { return new MergingIterator(cmp, list, n); } } } // namespace leveldb