[CF] NewIterators

Summary: Adding the last missing function -- NewIterators(). Pretty simple implementation

Test Plan: added a unit test

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16689
This commit is contained in:
Igor Canadi 2014-03-07 16:12:34 -08:00
parent 80a207fc90
commit 9f15092ebd
5 changed files with 115 additions and 14 deletions

View File

@ -788,6 +788,63 @@ TEST(ColumnFamilyTest, DifferentCompactionStyles) {
Close();
}
namespace {
std::string IterStatus(Iterator* iter) {
std::string result;
if (iter->Valid()) {
result = iter->key().ToString() + "->" + iter->value().ToString();
} else {
result = "(invalid)";
}
return result;
}
} // namespace anonymous
TEST(ColumnFamilyTest, NewIteratorsTest) {
// iter == 0 -- no tailing
// iter == 2 -- tailing
for (int iter = 0; iter < 2; ++iter) {
Open();
CreateColumnFamiliesAndReopen({"one", "two"});
ASSERT_OK(Put(0, "a", "b"));
ASSERT_OK(Put(1, "b", "a"));
ASSERT_OK(Put(2, "c", "m"));
ASSERT_OK(Put(2, "v", "t"));
std::vector<Iterator*> iterators;
ReadOptions options;
options.tailing = (iter == 1);
ASSERT_OK(db_->NewIterators(options, handles_, &iterators));
for (auto it : iterators) {
it->SeekToFirst();
}
ASSERT_EQ(IterStatus(iterators[0]), "a->b");
ASSERT_EQ(IterStatus(iterators[1]), "b->a");
ASSERT_EQ(IterStatus(iterators[2]), "c->m");
ASSERT_OK(Put(1, "x", "x"));
for (auto it : iterators) {
it->Next();
}
ASSERT_EQ(IterStatus(iterators[0]), "(invalid)");
if (iter == 0) {
// no tailing
ASSERT_EQ(IterStatus(iterators[1]), "(invalid)");
} else {
// tailing
ASSERT_EQ(IterStatus(iterators[1]), "x->x");
}
ASSERT_EQ(IterStatus(iterators[2]), "v->t");
for (auto it : iterators) {
delete it;
}
Destroy();
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -3214,12 +3214,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
SuperVersion* super_version = nullptr;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
mutex_.Lock();
if (!options.tailing) {
mutex_.Lock();
super_version = cfd->GetSuperVersion()->Ref();
latest_snapshot = versions_->LastSequence();
mutex_.Unlock();
}
mutex_.Unlock();
Iterator* iter;
if (options.tailing) {
@ -3227,11 +3227,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
} else {
iter = NewInternalIterator(options, cfd, super_version);
iter = NewDBIterator(
&dbname_, env_, *cfd->full_options(), cfd->user_comparator(), iter,
(options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
auto snapshot =
options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot;
iter = NewDBIterator(&dbname_, env_, *cfd->full_options(),
cfd->user_comparator(), iter, snapshot);
}
if (options.prefix) {
@ -3245,10 +3246,53 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
Status DBImpl::NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
// TODO(icanadi)
return Status::NotSupported("Not yet!");
if (options.prefix) {
return Status::NotSupported(
"NewIterators doesn't support ReadOptions::prefix");
}
iterators->clear();
iterators->reserve(column_families.size());
SequenceNumber latest_snapshot = 0;
std::vector<SuperVersion*> super_versions;
super_versions.reserve(column_families.size());
if (!options.tailing) {
mutex_.Lock();
latest_snapshot = versions_->LastSequence();
for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
super_versions.push_back(cfd->GetSuperVersion()->Ref());
}
mutex_.Unlock();
}
if (options.tailing) {
for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
iterators->push_back(new TailingIterator(this, options, cfd));
}
} else {
for (size_t i = 0; i < column_families.size(); ++i) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_families[i]);
auto cfd = cfh->cfd();
auto snapshot =
options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot;
auto iter = NewInternalIterator(options, cfd, super_versions[i]);
iter = NewDBIterator(&dbname_, env_, *cfd->full_options(),
cfd->user_comparator(), iter, snapshot);
iterators->push_back(iter);
}
}
return Status::OK();
}
const Snapshot* DBImpl::GetSnapshot() {

View File

@ -86,7 +86,7 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family);
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators);
virtual const Snapshot* GetSnapshot();
virtual void ReleaseSnapshot(const Snapshot* snapshot);

View File

@ -255,7 +255,7 @@ class DB {
// before the db is deleted
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) = 0;
// Return a handle to the current DB state. Iterators created with

View File

@ -80,9 +80,9 @@ class StackableDB : public DB {
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
return db_->NewIterators(options, column_family, iterators);
return db_->NewIterators(options, column_families, iterators);
}