rocksdb/db/managed_iterator.cc
Venkatesh Radhakrishnan 7d817268b9 Managed iterator
Summary:
This is a diff for managed iterator. A managed iterator
is a wrapper around an iterator which saves the options for that
iterator as well as the current key/value so that the underlying iterator
and its associated memory can be released when it is aged out
automatically or on the request of the user. Will provide the automatic release as a follow-up diff.

Test Plan: Managed* tests in db_test and XF tests for managed iterator

Reviewers: igor, yhchiang, anthony, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D31401
2015-02-18 11:49:31 -08:00

256 lines
6.6 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include <limits>
#include <string>
#include <utility>
#include "db/column_family.h"
#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/managed_iterator.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "table/merger.h"
#include "util/xfunc.h"
namespace rocksdb {
namespace {
// Helper class that locks a mutex on construction and unlocks the mutex when
// the destructor of the MutexLock object is invoked.
//
// Typical usage:
//
// void MyClass::MyMethod() {
// MILock l(&mu_); // mu_ is an instance variable
// ... some complex code, possibly with multiple return paths ...
// }
class MILock {
public:
explicit MILock(std::mutex* mu, ManagedIterator* mi) : mu_(mu), mi_(mi) {
this->mu_->lock();
}
~MILock() {
this->mu_->unlock();
XFUNC_TEST("managed_xftest_release", "managed_unlock", managed_unlock1,
xf_manage_release, mi_);
}
private:
std::mutex* const mu_;
ManagedIterator* mi_;
// No copying allowed
MILock(const MILock&) = delete;
void operator=(const MILock&) = delete;
};
} // anonymous namespace
//
// Synchronization between modifiers, releasers, creators
// If iterator operation, wait till (!in_use), set in_use, do op, reset in_use
// if modifying mutable_iter, atomically exchange in_use:
// return if in_use set / otherwise set in use,
// atomically replace new iter with old , reset in use
// The releaser is the new operation and it holds a lock for a very short time
// The existing non-const iterator operations are supposed to be single
// threaded and hold the lock for the duration of the operation
// The existing const iterator operations use the cached key/values
// and don't do any locking.
ManagedIterator::ManagedIterator(DBImpl* db, const ReadOptions& read_options,
ColumnFamilyData* cfd)
: db_(db),
read_options_(read_options),
cfd_(cfd),
svnum_(cfd->GetSuperVersionNumber()),
mutable_iter_(nullptr),
valid_(false),
snapshot_created_(false),
release_supported_(true) {
read_options_.managed = false;
if ((!read_options_.tailing) && (read_options_.snapshot == nullptr)) {
assert(read_options_.snapshot = db_->GetSnapshot());
snapshot_created_ = true;
}
cfh_.SetCFD(cfd);
mutable_iter_ = unique_ptr<Iterator>(db->NewIterator(read_options_, &cfh_));
XFUNC_TEST("managed_xftest_dropold", "managed_create", xf_managed_create1,
xf_manage_create, this);
}
ManagedIterator::~ManagedIterator() {
Lock();
if (snapshot_created_) {
db_->ReleaseSnapshot(read_options_.snapshot);
snapshot_created_ = false;
read_options_.snapshot = nullptr;
}
}
bool ManagedIterator::Valid() const { return valid_; }
void ManagedIterator::SeekToLast() {
MILock l(&in_use_, this);
if (NeedToRebuild()) {
RebuildIterator();
}
assert(mutable_iter_ != nullptr);
mutable_iter_->SeekToLast();
if (mutable_iter_->status().ok()) {
UpdateCurrent();
}
}
void ManagedIterator::SeekToFirst() {
MILock l(&in_use_, this);
SeekInternal(Slice(), true);
}
void ManagedIterator::Seek(const Slice& user_key) {
MILock l(&in_use_, this);
SeekInternal(user_key, false);
}
void ManagedIterator::SeekInternal(const Slice& user_key, bool seek_to_first) {
if (NeedToRebuild()) {
RebuildIterator();
}
assert(mutable_iter_ != nullptr);
if (seek_to_first) {
mutable_iter_->SeekToFirst();
} else {
mutable_iter_->Seek(user_key);
}
UpdateCurrent();
}
void ManagedIterator::Prev() {
if (!valid_) {
status_ = Status::InvalidArgument("Iterator value invalid");
return;
}
MILock l(&in_use_, this);
if (NeedToRebuild()) {
std::string current_key = key().ToString();
Slice old_key(current_key);
RebuildIterator();
SeekInternal(old_key, false);
UpdateCurrent();
if (!valid_) {
return;
}
if (key().compare(old_key) != 0) {
valid_ = false;
status_ = Status::Incomplete("Cannot do Prev now");
return;
}
}
mutable_iter_->Prev();
if (mutable_iter_->status().ok()) {
UpdateCurrent();
status_ = Status::OK();
} else {
status_ = mutable_iter_->status();
}
}
void ManagedIterator::Next() {
if (!valid_) {
status_ = Status::InvalidArgument("Iterator value invalid");
return;
}
MILock l(&in_use_, this);
if (NeedToRebuild()) {
std::string current_key = key().ToString();
Slice old_key(current_key.data(), cached_key_.Size());
RebuildIterator();
SeekInternal(old_key, false);
UpdateCurrent();
if (!valid_) {
return;
}
if (key().compare(old_key) != 0) {
valid_ = false;
status_ = Status::Incomplete("Cannot do Next now");
return;
}
}
mutable_iter_->Next();
UpdateCurrent();
}
Slice ManagedIterator::key() const {
assert(valid_);
return cached_key_.GetKey();
}
Slice ManagedIterator::value() const {
assert(valid_);
return cached_value_.GetKey();
}
Status ManagedIterator::status() const { return status_; }
void ManagedIterator::RebuildIterator() {
svnum_ = cfd_->GetSuperVersionNumber();
mutable_iter_ = unique_ptr<Iterator>(db_->NewIterator(read_options_, &cfh_));
}
void ManagedIterator::UpdateCurrent() {
assert(mutable_iter_ != nullptr);
if (!(valid_ = mutable_iter_->Valid())) {
status_ = mutable_iter_->status();
return;
}
status_ = Status::OK();
cached_key_.SetKey(mutable_iter_->key());
cached_value_.SetKey(mutable_iter_->value());
}
void ManagedIterator::ReleaseIter(bool only_old) {
if ((mutable_iter_ == nullptr) || (!release_supported_)) {
return;
}
if (svnum_ != cfd_->GetSuperVersionNumber() || !only_old) {
if (!TryLock()) { // Don't release iter if in use
return;
}
mutable_iter_ = nullptr; // in_use for a very short time
UnLock();
}
}
bool ManagedIterator::NeedToRebuild() {
if ((mutable_iter_ == nullptr) || (status_.IsIncomplete()) ||
(!only_drop_old_ && (svnum_ != cfd_->GetSuperVersionNumber()))) {
return true;
}
return false;
}
void ManagedIterator::Lock() {
in_use_.lock();
return;
}
bool ManagedIterator::TryLock() { return in_use_.try_lock(); }
void ManagedIterator::UnLock() {
in_use_.unlock();
XFUNC_TEST("managed_xftest_release", "managed_unlock", managed_unlock1,
xf_manage_release, this);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE