Remove managed iterator
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/4124 Differential Revision: D8829910 Pulled By: siying fbshipit-source-id: f3e952ccf3a631071a5d77c48e327046f8abb560
This commit is contained in:
parent
995fcf7573
commit
ddc07b40fc
@ -494,7 +494,6 @@ set(SOURCES
|
||||
db/log_reader.cc
|
||||
db/log_writer.cc
|
||||
db/malloc_stats.cc
|
||||
db/managed_iterator.cc
|
||||
db/memtable.cc
|
||||
db/memtable_list.cc
|
||||
db/merge_helper.cc
|
||||
|
@ -5,6 +5,7 @@
|
||||
* With LRUCache, when high_pri_pool_ratio > 0, midpoint insertion strategy will be enabled to put low-pri items to the tail of low-pri list (the midpoint) when they first inserted into the cache. This is to make cache entries never get hit age out faster, improving cache efficiency when large background scan presents.
|
||||
* For bottommost_compression, a compatible CompressionOptions is added via `bottommost_compression_opts`. To keep backward compatible, a new boolean `enabled` is added to CompressionOptions. For compression_opts, it will be always used no matter what value of `enabled` is. For bottommost_compression_opts, it will only be used when user set `enabled=true`, otherwise, compression_opts will be used for bottommost_compression as default.
|
||||
* The "rocksdb.num.entries" table property no longer counts range deletion tombstones as entries.
|
||||
* Remove managed iterator. ReadOptions.managed is not effective anymore.
|
||||
|
||||
### New Features
|
||||
* Changes the format of index blocks by storing the key in their raw form rather than converting them to InternalKey. This saves 8 bytes per index key. The feature is backward compatbile but not forward compatible. It is disabled by default unless format_version 3 or above is used.
|
||||
|
1
TARGETS
1
TARGETS
@ -106,7 +106,6 @@ cpp_library(
|
||||
"db/log_writer.cc",
|
||||
"db/logs_with_prep_tracker.cc",
|
||||
"db/malloc_stats.cc",
|
||||
"db/managed_iterator.cc",
|
||||
"db/memtable.cc",
|
||||
"db/memtable_list.cc",
|
||||
"db/merge_helper.cc",
|
||||
|
@ -41,7 +41,6 @@
|
||||
#include "db/log_reader.h"
|
||||
#include "db/log_writer.h"
|
||||
#include "db/malloc_stats.h"
|
||||
#include "db/managed_iterator.h"
|
||||
#include "db/memtable.h"
|
||||
#include "db/memtable_list.h"
|
||||
#include "db/merge_context.h"
|
||||
@ -1578,6 +1577,10 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options,
|
||||
|
||||
Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family) {
|
||||
if (read_options.managed) {
|
||||
return NewErrorIterator(
|
||||
Status::NotSupported("Managed iterator is not supported anymore."));
|
||||
}
|
||||
Iterator* result = nullptr;
|
||||
if (read_options.read_tier == kPersistedTier) {
|
||||
return NewErrorIterator(Status::NotSupported(
|
||||
@ -1595,22 +1598,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
auto cfd = cfh->cfd();
|
||||
ReadCallback* read_callback = nullptr; // No read callback provided.
|
||||
if (read_options.managed) {
|
||||
#ifdef ROCKSDB_LITE
|
||||
// not supported in lite version
|
||||
result = NewErrorIterator(Status::InvalidArgument(
|
||||
"Managed Iterators not supported in RocksDBLite."));
|
||||
#else
|
||||
if ((read_options.tailing) || (read_options.snapshot != nullptr) ||
|
||||
(is_snapshot_supported_)) {
|
||||
result = new ManagedIterator(this, read_options, cfd);
|
||||
} else {
|
||||
// Managed iter not supported
|
||||
result = NewErrorIterator(Status::InvalidArgument(
|
||||
"Managed Iterators not supported without snapshots."));
|
||||
}
|
||||
#endif
|
||||
} else if (read_options.tailing) {
|
||||
if (read_options.tailing) {
|
||||
#ifdef ROCKSDB_LITE
|
||||
// not supported in lite version
|
||||
result = nullptr;
|
||||
@ -1705,6 +1693,9 @@ Status DBImpl::NewIterators(
|
||||
const ReadOptions& read_options,
|
||||
const std::vector<ColumnFamilyHandle*>& column_families,
|
||||
std::vector<Iterator*>* iterators) {
|
||||
if (read_options.managed) {
|
||||
return Status::NotSupported("Managed iterator is not supported anymore.");
|
||||
}
|
||||
if (read_options.read_tier == kPersistedTier) {
|
||||
return Status::NotSupported(
|
||||
"ReadTier::kPersistedData is not yet supported in iterators.");
|
||||
@ -1712,23 +1703,7 @@ Status DBImpl::NewIterators(
|
||||
ReadCallback* read_callback = nullptr; // No read callback provided.
|
||||
iterators->clear();
|
||||
iterators->reserve(column_families.size());
|
||||
if (read_options.managed) {
|
||||
#ifdef ROCKSDB_LITE
|
||||
return Status::InvalidArgument(
|
||||
"Managed interator not supported in RocksDB lite");
|
||||
#else
|
||||
if ((!read_options.tailing) && (read_options.snapshot == nullptr) &&
|
||||
(!is_snapshot_supported_)) {
|
||||
return Status::InvalidArgument(
|
||||
"Managed interator not supported without snapshots");
|
||||
}
|
||||
for (auto cfh : column_families) {
|
||||
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
|
||||
auto iter = new ManagedIterator(this, read_options, cfd);
|
||||
iterators->push_back(iter);
|
||||
}
|
||||
#endif
|
||||
} else if (read_options.tailing) {
|
||||
if (read_options.tailing) {
|
||||
#ifdef ROCKSDB_LITE
|
||||
return Status::InvalidArgument(
|
||||
"Tailing interator not supported in RocksDB lite");
|
||||
|
@ -184,73 +184,6 @@ TEST_P(DBIteratorTest, NonBlockingIteration) {
|
||||
kSkipMmapReads));
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
TEST_P(DBIteratorTest, ManagedNonBlockingIteration) {
|
||||
do {
|
||||
ReadOptions non_blocking_opts, regular_opts;
|
||||
Options options = CurrentOptions();
|
||||
options.statistics = rocksdb::CreateDBStatistics();
|
||||
non_blocking_opts.read_tier = kBlockCacheTier;
|
||||
non_blocking_opts.managed = true;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
// write one kv to the database.
|
||||
ASSERT_OK(Put(1, "a", "b"));
|
||||
|
||||
// scan using non-blocking iterator. We should find it because
|
||||
// it is in memtable.
|
||||
Iterator* iter = NewIterator(non_blocking_opts, handles_[1]);
|
||||
int count = 0;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
ASSERT_OK(iter->status());
|
||||
count++;
|
||||
}
|
||||
ASSERT_EQ(count, 1);
|
||||
delete iter;
|
||||
|
||||
// flush memtable to storage. Now, the key should not be in the
|
||||
// memtable neither in the block cache.
|
||||
ASSERT_OK(Flush(1));
|
||||
|
||||
// verify that a non-blocking iterator does not find any
|
||||
// kvs. Neither does it do any IOs to storage.
|
||||
int64_t numopen = TestGetTickerCount(options, NO_FILE_OPENS);
|
||||
int64_t cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
|
||||
iter = NewIterator(non_blocking_opts, handles_[1]);
|
||||
count = 0;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
count++;
|
||||
}
|
||||
ASSERT_EQ(count, 0);
|
||||
ASSERT_TRUE(iter->status().IsIncomplete());
|
||||
ASSERT_EQ(numopen, TestGetTickerCount(options, NO_FILE_OPENS));
|
||||
ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD));
|
||||
delete iter;
|
||||
|
||||
// read in the specified block via a regular get
|
||||
ASSERT_EQ(Get(1, "a"), "b");
|
||||
|
||||
// verify that we can find it via a non-blocking scan
|
||||
numopen = TestGetTickerCount(options, NO_FILE_OPENS);
|
||||
cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
|
||||
iter = NewIterator(non_blocking_opts, handles_[1]);
|
||||
count = 0;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
ASSERT_OK(iter->status());
|
||||
count++;
|
||||
}
|
||||
ASSERT_EQ(count, 1);
|
||||
ASSERT_EQ(numopen, TestGetTickerCount(options, NO_FILE_OPENS));
|
||||
ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD));
|
||||
delete iter;
|
||||
|
||||
// This test verifies block cache behaviors, which is not used by plain
|
||||
// table format.
|
||||
// Exclude kHashCuckoo as it does not support iteration currently
|
||||
} while (ChangeOptions(kSkipPlainTable | kSkipNoSeekToLast | kSkipHashCuckoo |
|
||||
kSkipMmapReads));
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
TEST_P(DBIteratorTest, IterSeekBeforePrev) {
|
||||
ASSERT_OK(Put("a", "b"));
|
||||
ASSERT_OK(Put("c", "d"));
|
||||
|
@ -479,275 +479,6 @@ TEST_F(DBTestTailingIterator, TailingIteratorGap) {
|
||||
ASSERT_EQ("40", it->key().ToString());
|
||||
}
|
||||
|
||||
TEST_F(DBTestTailingIterator, ManagedTailingIteratorSingle) {
|
||||
ReadOptions read_options;
|
||||
read_options.tailing = true;
|
||||
read_options.managed = true;
|
||||
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
|
||||
iter->SeekToFirst();
|
||||
ASSERT_TRUE(!iter->Valid());
|
||||
|
||||
// add a record and check that iter can see it
|
||||
ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor"));
|
||||
iter->SeekToFirst();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(iter->key().ToString(), "mirko");
|
||||
|
||||
iter->Next();
|
||||
ASSERT_TRUE(!iter->Valid());
|
||||
}
|
||||
|
||||
TEST_F(DBTestTailingIterator, ManagedTailingIteratorKeepAdding) {
|
||||
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
||||
ReadOptions read_options;
|
||||
read_options.tailing = true;
|
||||
read_options.managed = true;
|
||||
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
|
||||
std::string value(1024, 'a');
|
||||
|
||||
const int num_records = 10000;
|
||||
for (int i = 0; i < num_records; ++i) {
|
||||
char buf[32];
|
||||
snprintf(buf, sizeof(buf), "%016d", i);
|
||||
|
||||
Slice key(buf, 16);
|
||||
ASSERT_OK(Put(1, key, value));
|
||||
|
||||
iter->Seek(key);
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(iter->key().compare(key), 0);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTestTailingIterator, ManagedTailingIteratorSeekToNext) {
|
||||
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
||||
ReadOptions read_options;
|
||||
read_options.tailing = true;
|
||||
read_options.managed = true;
|
||||
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
|
||||
std::string value(1024, 'a');
|
||||
|
||||
const int num_records = 1000;
|
||||
for (int i = 1; i < num_records; ++i) {
|
||||
char buf1[32];
|
||||
char buf2[32];
|
||||
snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
|
||||
|
||||
Slice key(buf1, 20);
|
||||
ASSERT_OK(Put(1, key, value));
|
||||
|
||||
if (i % 100 == 99) {
|
||||
ASSERT_OK(Flush(1));
|
||||
}
|
||||
|
||||
snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
|
||||
Slice target(buf2, 20);
|
||||
iter->Seek(target);
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(iter->key().compare(key), 0);
|
||||
}
|
||||
for (int i = 2 * num_records; i > 0; --i) {
|
||||
char buf1[32];
|
||||
char buf2[32];
|
||||
snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
|
||||
|
||||
Slice key(buf1, 20);
|
||||
ASSERT_OK(Put(1, key, value));
|
||||
|
||||
if (i % 100 == 99) {
|
||||
ASSERT_OK(Flush(1));
|
||||
}
|
||||
|
||||
snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
|
||||
Slice target(buf2, 20);
|
||||
iter->Seek(target);
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(iter->key().compare(key), 0);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTestTailingIterator, ManagedTailingIteratorDeletes) {
|
||||
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
||||
ReadOptions read_options;
|
||||
read_options.tailing = true;
|
||||
read_options.managed = true;
|
||||
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
|
||||
|
||||
// write a single record, read it using the iterator, then delete it
|
||||
ASSERT_OK(Put(1, "0test", "test"));
|
||||
iter->SeekToFirst();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(iter->key().ToString(), "0test");
|
||||
ASSERT_OK(Delete(1, "0test"));
|
||||
|
||||
// write many more records
|
||||
const int num_records = 10000;
|
||||
std::string value(1024, 'A');
|
||||
|
||||
for (int i = 0; i < num_records; ++i) {
|
||||
char buf[32];
|
||||
snprintf(buf, sizeof(buf), "1%015d", i);
|
||||
|
||||
Slice key(buf, 16);
|
||||
ASSERT_OK(Put(1, key, value));
|
||||
}
|
||||
|
||||
// force a flush to make sure that no records are read from memtable
|
||||
ASSERT_OK(Flush(1));
|
||||
|
||||
// skip "0test"
|
||||
iter->Next();
|
||||
|
||||
// make sure we can read all new records using the existing iterator
|
||||
int count = 0;
|
||||
for (; iter->Valid(); iter->Next(), ++count) {
|
||||
}
|
||||
|
||||
ASSERT_EQ(count, num_records);
|
||||
}
|
||||
|
||||
TEST_F(DBTestTailingIterator, ManagedTailingIteratorPrefixSeek) {
|
||||
ReadOptions read_options;
|
||||
read_options.tailing = true;
|
||||
read_options.managed = true;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
options.disable_auto_compactions = true;
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(2));
|
||||
options.memtable_factory.reset(NewHashSkipListRepFactory(16));
|
||||
options.allow_concurrent_memtable_write = false;
|
||||
DestroyAndReopen(options);
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
|
||||
ASSERT_OK(Put(1, "0101", "test"));
|
||||
|
||||
ASSERT_OK(Flush(1));
|
||||
|
||||
ASSERT_OK(Put(1, "0202", "test"));
|
||||
|
||||
// Seek(0102) shouldn't find any records since 0202 has a different prefix
|
||||
iter->Seek("0102");
|
||||
ASSERT_TRUE(!iter->Valid());
|
||||
|
||||
iter->Seek("0202");
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(iter->key().ToString(), "0202");
|
||||
|
||||
iter->Next();
|
||||
ASSERT_TRUE(!iter->Valid());
|
||||
}
|
||||
|
||||
TEST_F(DBTestTailingIterator, ManagedTailingIteratorIncomplete) {
|
||||
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
||||
ReadOptions read_options;
|
||||
read_options.tailing = true;
|
||||
read_options.managed = true;
|
||||
read_options.read_tier = kBlockCacheTier;
|
||||
|
||||
std::string key = "key";
|
||||
std::string value = "value";
|
||||
|
||||
ASSERT_OK(db_->Put(WriteOptions(), key, value));
|
||||
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
|
||||
iter->SeekToFirst();
|
||||
// we either see the entry or it's not in cache
|
||||
ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
|
||||
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||
iter->SeekToFirst();
|
||||
// should still be true after compaction
|
||||
ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
|
||||
}
|
||||
|
||||
TEST_F(DBTestTailingIterator, ManagedTailingIteratorSeekToSame) {
|
||||
Options options = CurrentOptions();
|
||||
options.compaction_style = kCompactionStyleUniversal;
|
||||
options.write_buffer_size = 1000;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
|
||||
ReadOptions read_options;
|
||||
read_options.tailing = true;
|
||||
read_options.managed = true;
|
||||
|
||||
const int NROWS = 10000;
|
||||
// Write rows with keys 00000, 00002, 00004 etc.
|
||||
for (int i = 0; i < NROWS; ++i) {
|
||||
char buf[100];
|
||||
snprintf(buf, sizeof(buf), "%05d", 2 * i);
|
||||
std::string key(buf);
|
||||
std::string value("value");
|
||||
ASSERT_OK(db_->Put(WriteOptions(), key, value));
|
||||
}
|
||||
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
|
||||
// Seek to 00001. We expect to find 00002.
|
||||
std::string start_key = "00001";
|
||||
iter->Seek(start_key);
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
|
||||
std::string found = iter->key().ToString();
|
||||
ASSERT_EQ("00002", found);
|
||||
|
||||
// Now seek to the same key. The iterator should remain in the same
|
||||
// position.
|
||||
iter->Seek(found);
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_EQ(found, iter->key().ToString());
|
||||
}
|
||||
|
||||
TEST_F(DBTestTailingIterator, ForwardIteratorVersionProperty) {
|
||||
Options options = CurrentOptions();
|
||||
options.write_buffer_size = 1000;
|
||||
|
||||
ReadOptions read_options;
|
||||
read_options.tailing = true;
|
||||
|
||||
Put("foo", "bar");
|
||||
|
||||
uint64_t v1, v2, v3, v4;
|
||||
{
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
|
||||
iter->Seek("foo");
|
||||
std::string prop_value;
|
||||
ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
|
||||
&prop_value));
|
||||
v1 = static_cast<uint64_t>(std::atoi(prop_value.c_str()));
|
||||
|
||||
Put("foo1", "bar1");
|
||||
Flush();
|
||||
|
||||
ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
|
||||
&prop_value));
|
||||
v2 = static_cast<uint64_t>(std::atoi(prop_value.c_str()));
|
||||
|
||||
iter->Seek("f");
|
||||
|
||||
ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
|
||||
&prop_value));
|
||||
v3 = static_cast<uint64_t>(std::atoi(prop_value.c_str()));
|
||||
|
||||
ASSERT_EQ(v1, v2);
|
||||
ASSERT_GT(v3, v2);
|
||||
}
|
||||
|
||||
{
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
|
||||
iter->Seek("foo");
|
||||
std::string prop_value;
|
||||
ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
|
||||
&prop_value));
|
||||
v4 = static_cast<uint64_t>(std::atoi(prop_value.c_str()));
|
||||
}
|
||||
ASSERT_EQ(v3, v4);
|
||||
}
|
||||
|
||||
TEST_F(DBTestTailingIterator, SeekWithUpperBoundBug) {
|
||||
ReadOptions read_options;
|
||||
read_options.tailing = true;
|
||||
|
@ -1,254 +0,0 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include "db/managed_iterator.h"
|
||||
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "db/column_family.h"
|
||||
#include "db/db_impl.h"
|
||||
#include "db/db_iter.h"
|
||||
#include "db/dbformat.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/slice_transform.h"
|
||||
#include "table/merging_iterator.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
namespace {
|
||||
// Helper class that locks a mutex on construction and unlocks the mutex when
|
||||
// the destructor of the MutexLock object is invoked.
|
||||
//
|
||||
// Typical usage:
|
||||
//
|
||||
// void MyClass::MyMethod() {
|
||||
// MILock l(&mu_); // mu_ is an instance variable
|
||||
// ... some complex code, possibly with multiple return paths ...
|
||||
// }
|
||||
|
||||
class MILock {
|
||||
public:
|
||||
explicit MILock(std::mutex* mu, ManagedIterator* mi) : mu_(mu), mi_(mi) {
|
||||
this->mu_->lock();
|
||||
}
|
||||
~MILock() {
|
||||
this->mu_->unlock();
|
||||
}
|
||||
ManagedIterator* GetManagedIterator() { return mi_; }
|
||||
|
||||
private:
|
||||
std::mutex* const mu_;
|
||||
ManagedIterator* mi_;
|
||||
// No copying allowed
|
||||
MILock(const MILock&) = delete;
|
||||
void operator=(const MILock&) = delete;
|
||||
};
|
||||
} // anonymous namespace
|
||||
|
||||
//
|
||||
// Synchronization between modifiers, releasers, creators
|
||||
// If iterator operation, wait till (!in_use), set in_use, do op, reset in_use
|
||||
// if modifying mutable_iter, atomically exchange in_use:
|
||||
// return if in_use set / otherwise set in use,
|
||||
// atomically replace new iter with old , reset in use
|
||||
// The releaser is the new operation and it holds a lock for a very short time
|
||||
// The existing non-const iterator operations are supposed to be single
|
||||
// threaded and hold the lock for the duration of the operation
|
||||
// The existing const iterator operations use the cached key/values
|
||||
// and don't do any locking.
|
||||
ManagedIterator::ManagedIterator(DBImpl* db, const ReadOptions& read_options,
|
||||
ColumnFamilyData* cfd)
|
||||
: db_(db),
|
||||
read_options_(read_options),
|
||||
cfd_(cfd),
|
||||
svnum_(cfd->GetSuperVersionNumber()),
|
||||
mutable_iter_(nullptr),
|
||||
valid_(false),
|
||||
snapshot_created_(false),
|
||||
release_supported_(true) {
|
||||
read_options_.managed = false;
|
||||
if ((!read_options_.tailing) && (read_options_.snapshot == nullptr)) {
|
||||
assert(nullptr != (read_options_.snapshot = db_->GetSnapshot()));
|
||||
snapshot_created_ = true;
|
||||
}
|
||||
cfh_.SetCFD(cfd);
|
||||
mutable_iter_ = unique_ptr<Iterator>(db->NewIterator(read_options_, &cfh_));
|
||||
}
|
||||
|
||||
ManagedIterator::~ManagedIterator() {
|
||||
Lock();
|
||||
if (snapshot_created_) {
|
||||
db_->ReleaseSnapshot(read_options_.snapshot);
|
||||
snapshot_created_ = false;
|
||||
read_options_.snapshot = nullptr;
|
||||
}
|
||||
UnLock();
|
||||
}
|
||||
|
||||
bool ManagedIterator::Valid() const { return valid_; }
|
||||
|
||||
void ManagedIterator::SeekToLast() {
|
||||
MILock l(&in_use_, this);
|
||||
if (NeedToRebuild()) {
|
||||
RebuildIterator();
|
||||
}
|
||||
assert(mutable_iter_ != nullptr);
|
||||
mutable_iter_->SeekToLast();
|
||||
UpdateCurrent();
|
||||
}
|
||||
|
||||
void ManagedIterator::SeekToFirst() {
|
||||
MILock l(&in_use_, this);
|
||||
SeekInternal(Slice(), true);
|
||||
}
|
||||
|
||||
void ManagedIterator::Seek(const Slice& user_key) {
|
||||
MILock l(&in_use_, this);
|
||||
SeekInternal(user_key, false);
|
||||
}
|
||||
|
||||
void ManagedIterator::SeekForPrev(const Slice& user_key) {
|
||||
MILock l(&in_use_, this);
|
||||
if (NeedToRebuild()) {
|
||||
RebuildIterator();
|
||||
}
|
||||
assert(mutable_iter_ != nullptr);
|
||||
mutable_iter_->SeekForPrev(user_key);
|
||||
UpdateCurrent();
|
||||
}
|
||||
|
||||
void ManagedIterator::SeekInternal(const Slice& user_key, bool seek_to_first) {
|
||||
if (NeedToRebuild()) {
|
||||
RebuildIterator();
|
||||
}
|
||||
assert(mutable_iter_ != nullptr);
|
||||
if (seek_to_first) {
|
||||
mutable_iter_->SeekToFirst();
|
||||
} else {
|
||||
mutable_iter_->Seek(user_key);
|
||||
}
|
||||
UpdateCurrent();
|
||||
}
|
||||
|
||||
void ManagedIterator::Prev() {
|
||||
if (!valid_) {
|
||||
status_ = Status::InvalidArgument("Iterator value invalid");
|
||||
return;
|
||||
}
|
||||
MILock l(&in_use_, this);
|
||||
if (NeedToRebuild()) {
|
||||
RebuildIterator(true);
|
||||
if (!valid_) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
mutable_iter_->Prev();
|
||||
UpdateCurrent();
|
||||
}
|
||||
|
||||
void ManagedIterator::Next() {
|
||||
if (!valid_) {
|
||||
status_ = Status::InvalidArgument("Iterator value invalid");
|
||||
return;
|
||||
}
|
||||
MILock l(&in_use_, this);
|
||||
if (NeedToRebuild()) {
|
||||
RebuildIterator(true);
|
||||
if (!valid_) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
mutable_iter_->Next();
|
||||
UpdateCurrent();
|
||||
}
|
||||
|
||||
Slice ManagedIterator::key() const {
|
||||
assert(valid_);
|
||||
return cached_key_.GetUserKey();
|
||||
}
|
||||
|
||||
Slice ManagedIterator::value() const {
|
||||
assert(valid_);
|
||||
return cached_value_.GetUserKey();
|
||||
}
|
||||
|
||||
Status ManagedIterator::status() const { return status_; }
|
||||
|
||||
void ManagedIterator::RebuildIterator(bool reseek) {
|
||||
std::string current_key;
|
||||
if (reseek) {
|
||||
current_key = key().ToString();
|
||||
}
|
||||
|
||||
svnum_ = cfd_->GetSuperVersionNumber();
|
||||
mutable_iter_ = unique_ptr<Iterator>(db_->NewIterator(read_options_, &cfh_));
|
||||
|
||||
if (reseek) {
|
||||
Slice old_key(current_key.data(), current_key.size());
|
||||
SeekInternal(old_key, false);
|
||||
UpdateCurrent();
|
||||
if (!valid_ || key().compare(old_key) != 0) {
|
||||
valid_ = false;
|
||||
status_ = Status::Incomplete(
|
||||
"Next/Prev failed because current key has "
|
||||
"been removed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ManagedIterator::UpdateCurrent() {
|
||||
assert(mutable_iter_ != nullptr);
|
||||
|
||||
valid_ = mutable_iter_->Valid();
|
||||
status_ = mutable_iter_->status();
|
||||
|
||||
if (!valid_) {
|
||||
return;
|
||||
}
|
||||
|
||||
cached_key_.SetUserKey(mutable_iter_->key());
|
||||
cached_value_.SetUserKey(mutable_iter_->value());
|
||||
}
|
||||
|
||||
void ManagedIterator::ReleaseIter(bool only_old) {
|
||||
if ((mutable_iter_ == nullptr) || (!release_supported_)) {
|
||||
return;
|
||||
}
|
||||
if (svnum_ != cfd_->GetSuperVersionNumber() || !only_old) {
|
||||
if (!TryLock()) { // Don't release iter if in use
|
||||
return;
|
||||
}
|
||||
mutable_iter_ = nullptr; // in_use for a very short time
|
||||
UnLock();
|
||||
}
|
||||
}
|
||||
|
||||
bool ManagedIterator::NeedToRebuild() {
|
||||
if ((mutable_iter_ == nullptr) || (status_.IsIncomplete()) ||
|
||||
(!only_drop_old_ && (svnum_ != cfd_->GetSuperVersionNumber()))) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void ManagedIterator::Lock() {
|
||||
in_use_.lock();
|
||||
return;
|
||||
}
|
||||
|
||||
bool ManagedIterator::TryLock() { return in_use_.try_lock(); }
|
||||
|
||||
void ManagedIterator::UnLock() {
|
||||
in_use_.unlock();
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // ROCKSDB_LITE
|
@ -1,85 +0,0 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
#pragma once
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "db/column_family.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "util/arena.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class DBImpl;
|
||||
struct SuperVersion;
|
||||
class ColumnFamilyData;
|
||||
|
||||
/**
|
||||
* ManagedIterator is a special type of iterator that supports freeing the
|
||||
* underlying iterator and still being able to access the current key/value
|
||||
* pair. This is done by copying the key/value pair so that clients can
|
||||
* continue to access the data without getting a SIGSEGV.
|
||||
* The underlying iterator can be freed manually through the call to
|
||||
* ReleaseIter or automatically (as needed on space pressure or age.)
|
||||
* The iterator is recreated using the saved original arguments.
|
||||
*/
|
||||
class ManagedIterator : public Iterator {
|
||||
public:
|
||||
ManagedIterator(DBImpl* db, const ReadOptions& read_options,
|
||||
ColumnFamilyData* cfd);
|
||||
virtual ~ManagedIterator();
|
||||
|
||||
virtual void SeekToLast() override;
|
||||
virtual void Prev() override;
|
||||
virtual bool Valid() const override;
|
||||
void SeekToFirst() override;
|
||||
virtual void Seek(const Slice& target) override;
|
||||
virtual void SeekForPrev(const Slice& target) override;
|
||||
virtual void Next() override;
|
||||
virtual Slice key() const override;
|
||||
virtual Slice value() const override;
|
||||
virtual Status status() const override;
|
||||
void ReleaseIter(bool only_old);
|
||||
void SetDropOld(bool only_old) {
|
||||
only_drop_old_ = read_options_.tailing || only_old;
|
||||
}
|
||||
|
||||
private:
|
||||
void RebuildIterator(bool reseek = false);
|
||||
void UpdateCurrent();
|
||||
void SeekInternal(const Slice& user_key, bool seek_to_first);
|
||||
bool NeedToRebuild();
|
||||
void Lock();
|
||||
bool TryLock();
|
||||
void UnLock();
|
||||
DBImpl* const db_;
|
||||
ReadOptions read_options_;
|
||||
ColumnFamilyData* const cfd_;
|
||||
ColumnFamilyHandleInternal cfh_;
|
||||
|
||||
uint64_t svnum_;
|
||||
std::unique_ptr<Iterator> mutable_iter_;
|
||||
// internal iterator status
|
||||
Status status_;
|
||||
bool valid_;
|
||||
|
||||
IterKey cached_key_;
|
||||
IterKey cached_value_;
|
||||
|
||||
bool only_drop_old_ = true;
|
||||
bool snapshot_created_;
|
||||
bool release_supported_;
|
||||
std::mutex in_use_; // is managed iterator in use
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
#endif // !ROCKSDB_LITE
|
@ -1195,6 +1195,7 @@ extern ROCKSDB_LIBRARY_API void rocksdb_readoptions_set_read_tier(
|
||||
rocksdb_readoptions_t*, int);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_readoptions_set_tailing(
|
||||
rocksdb_readoptions_t*, unsigned char);
|
||||
// The functionality that this option controlled has been removed.
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_readoptions_set_managed(
|
||||
rocksdb_readoptions_t*, unsigned char);
|
||||
extern ROCKSDB_LIBRARY_API void rocksdb_readoptions_set_readahead_size(
|
||||
|
@ -1064,11 +1064,8 @@ struct ReadOptions {
|
||||
// Not supported in ROCKSDB_LITE mode!
|
||||
bool tailing;
|
||||
|
||||
// Specify to create a managed iterator -- a special iterator that
|
||||
// uses less resources by having the ability to free its underlying
|
||||
// resources on request.
|
||||
// Default: false
|
||||
// Not supported in ROCKSDB_LITE mode!
|
||||
// This options is not used anymore. It was to turn on a functionality that
|
||||
// has been removed.
|
||||
bool managed;
|
||||
|
||||
// Enable a total order seek regardless of index format (e.g. hash index)
|
||||
|
Loading…
Reference in New Issue
Block a user