SkipListRep::LookaheadIterator
Summary: This diff introduces the `lookahead` argument to `SkipListFactory()`. This is an optimization for the tailing use case which includes many seeks. E.g. consider the following operations on a skip list iterator: Seek(x), Next(), Next(), Seek(x+2), Next(), Seek(x+3), Next(), Next(), ... If `lookahead` is positive, `SkipListRep` will return an iterator which also keeps track of the previously visited node. Seek() then first does a linear search starting from that node (up to `lookahead` steps). As in the tailing example above, this may require fewer than ~log(n) comparisons as with regular skip list search. Test Plan: Added a new benchmark (`fillseekseq`) which simulates the usage pattern. It first writes N records (with consecutive keys), then measures how much time it takes to read them by calling `Seek()` and `Next()`. $ time ./db_bench -num 10000000 -benchmarks fillseekseq -prefix_size 1 \ -key_size 8 -write_buffer_size $[1024*1024*1024] -value_size 50 \ -seekseq_next 2 -skip_list_lookahead=0 [...] DB path: [/dev/shm/rocksdbtest/dbbench] fillseekseq : 0.389 micros/op 2569047 ops/sec; real 0m21.806s user 0m12.106s sys 0m9.672s $ time ./db_bench [...] -skip_list_lookahead=2 [...] DB path: [/dev/shm/rocksdbtest/dbbench] fillseekseq : 0.153 micros/op 6540684 ops/sec; real 0m19.469s user 0m10.192s sys 0m9.252s Reviewers: ljin, sdong, igor Reviewed By: igor Subscribers: dhruba, leveldb, march, lovro Differential Revision: https://reviews.facebook.net/D23997
This commit is contained in:
parent
6a443309d8
commit
88edfd90ae
@ -86,7 +86,8 @@ DEFINE_string(benchmarks,
|
|||||||
"xxhash,"
|
"xxhash,"
|
||||||
"compress,"
|
"compress,"
|
||||||
"uncompress,"
|
"uncompress,"
|
||||||
"acquireload,",
|
"acquireload,"
|
||||||
|
"fillseekseq,",
|
||||||
|
|
||||||
"Comma-separated list of operations to run in the specified order"
|
"Comma-separated list of operations to run in the specified order"
|
||||||
"Actual benchmarks:\n"
|
"Actual benchmarks:\n"
|
||||||
@ -129,6 +130,8 @@ DEFINE_string(benchmarks,
|
|||||||
"\tcrc32c -- repeated crc32c of 4K of data\n"
|
"\tcrc32c -- repeated crc32c of 4K of data\n"
|
||||||
"\txxhash -- repeated xxHash of 4K of data\n"
|
"\txxhash -- repeated xxHash of 4K of data\n"
|
||||||
"\tacquireload -- load N*1000 times\n"
|
"\tacquireload -- load N*1000 times\n"
|
||||||
|
"\tfillseekseq -- write N values in sequential key, then read "
|
||||||
|
"them by seeking to each key\n"
|
||||||
"Meta operations:\n"
|
"Meta operations:\n"
|
||||||
"\tcompact -- Compact the entire DB\n"
|
"\tcompact -- Compact the entire DB\n"
|
||||||
"\tstats -- Print DB stats\n"
|
"\tstats -- Print DB stats\n"
|
||||||
@ -165,6 +168,9 @@ DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
|
|||||||
|
|
||||||
DEFINE_int32(value_size, 100, "Size of each value");
|
DEFINE_int32(value_size, 100, "Size of each value");
|
||||||
|
|
||||||
|
DEFINE_int32(seekseq_next, 0, "How many times to call Next() after Seek() in "
|
||||||
|
"fillseekseq");
|
||||||
|
|
||||||
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
|
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
|
||||||
|
|
||||||
DEFINE_int64(batch_size, 1, "Batch size");
|
DEFINE_int64(batch_size, 1, "Batch size");
|
||||||
@ -565,6 +571,9 @@ DEFINE_string(merge_operator, "", "The merge operator to use with the database."
|
|||||||
"If a new merge operator is specified, be sure to use fresh"
|
"If a new merge operator is specified, be sure to use fresh"
|
||||||
" database The possible merge operators are defined in"
|
" database The possible merge operators are defined in"
|
||||||
" utilities/merge_operators.h");
|
" utilities/merge_operators.h");
|
||||||
|
DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try "
|
||||||
|
"linear search first for this many steps from the previous "
|
||||||
|
"position");
|
||||||
|
|
||||||
static const bool FLAGS_soft_rate_limit_dummy __attribute__((unused)) =
|
static const bool FLAGS_soft_rate_limit_dummy __attribute__((unused)) =
|
||||||
RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
|
RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
|
||||||
@ -1326,6 +1335,8 @@ class Benchmark {
|
|||||||
method = &Benchmark::MergeRandom;
|
method = &Benchmark::MergeRandom;
|
||||||
} else if (name == Slice("randomwithverify")) {
|
} else if (name == Slice("randomwithverify")) {
|
||||||
method = &Benchmark::RandomWithVerify;
|
method = &Benchmark::RandomWithVerify;
|
||||||
|
} else if (name == Slice("fillseekseq")) {
|
||||||
|
method = &Benchmark::WriteSeqSeekSeq;
|
||||||
} else if (name == Slice("compact")) {
|
} else if (name == Slice("compact")) {
|
||||||
method = &Benchmark::Compact;
|
method = &Benchmark::Compact;
|
||||||
} else if (name == Slice("crc32c")) {
|
} else if (name == Slice("crc32c")) {
|
||||||
@ -1717,7 +1728,8 @@ class Benchmark {
|
|||||||
FLAGS_hash_bucket_count));
|
FLAGS_hash_bucket_count));
|
||||||
break;
|
break;
|
||||||
case kSkipList:
|
case kSkipList:
|
||||||
// no need to do anything
|
options.memtable_factory.reset(new SkipListFactory(
|
||||||
|
FLAGS_skip_list_lookahead));
|
||||||
break;
|
break;
|
||||||
case kHashLinkedList:
|
case kHashLinkedList:
|
||||||
options.memtable_factory.reset(NewHashLinkListRepFactory(
|
options.memtable_factory.reset(NewHashLinkListRepFactory(
|
||||||
@ -2791,6 +2803,36 @@ class Benchmark {
|
|||||||
thread->stats.AddMessage(msg);
|
thread->stats.AddMessage(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void WriteSeqSeekSeq(ThreadState* thread) {
|
||||||
|
writes_ = FLAGS_num;
|
||||||
|
DoWrite(thread, SEQUENTIAL);
|
||||||
|
// exclude writes from the ops/sec calculation
|
||||||
|
thread->stats.Start(thread->tid);
|
||||||
|
|
||||||
|
DB* db = SelectDB(thread);
|
||||||
|
std::unique_ptr<Iterator> iter(
|
||||||
|
db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)));
|
||||||
|
|
||||||
|
Slice key = AllocateKey();
|
||||||
|
for (int64_t i = 0; i < FLAGS_num; ++i) {
|
||||||
|
GenerateKeyFromInt(i, FLAGS_num, &key);
|
||||||
|
iter->Seek(key);
|
||||||
|
assert(iter->Valid() && iter->key() == key);
|
||||||
|
thread->stats.FinishedOps(nullptr, db, 1);
|
||||||
|
|
||||||
|
for (int j = 0; j < FLAGS_seekseq_next && i+1 < FLAGS_num; ++j) {
|
||||||
|
iter->Next();
|
||||||
|
GenerateKeyFromInt(++i, FLAGS_num, &key);
|
||||||
|
assert(iter->Valid() && iter->key() == key);
|
||||||
|
thread->stats.FinishedOps(nullptr, db, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
iter->Seek(key);
|
||||||
|
assert(iter->Valid() && iter->key() == key);
|
||||||
|
thread->stats.FinishedOps(nullptr, db, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void Compact(ThreadState* thread) {
|
void Compact(ThreadState* thread) {
|
||||||
DB* db = SelectDB(thread);
|
DB* db = SelectDB(thread);
|
||||||
db->CompactRange(nullptr, nullptr);
|
db->CompactRange(nullptr, nullptr);
|
||||||
|
@ -186,12 +186,23 @@ class MemTableRepFactory {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// This uses a skip list to store keys. It is the default.
|
// This uses a skip list to store keys. It is the default.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// lookahead: If non-zero, each iterator's seek operation will start the
|
||||||
|
// search from the previously visited record (doing at most 'lookahead'
|
||||||
|
// steps). This is an optimization for the access pattern including many
|
||||||
|
// seeks with consecutive keys.
|
||||||
class SkipListFactory : public MemTableRepFactory {
|
class SkipListFactory : public MemTableRepFactory {
|
||||||
public:
|
public:
|
||||||
|
explicit SkipListFactory(size_t lookahead = 0) : lookahead_(lookahead) {}
|
||||||
|
|
||||||
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
|
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
|
||||||
Arena*, const SliceTransform*,
|
Arena*, const SliceTransform*,
|
||||||
Logger* logger) override;
|
Logger* logger) override;
|
||||||
virtual const char* Name() const override { return "SkipListFactory"; }
|
virtual const char* Name() const override { return "SkipListFactory"; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
const size_t lookahead_;
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
|
@ -12,9 +12,16 @@ namespace rocksdb {
|
|||||||
namespace {
|
namespace {
|
||||||
class SkipListRep : public MemTableRep {
|
class SkipListRep : public MemTableRep {
|
||||||
SkipList<const char*, const MemTableRep::KeyComparator&> skip_list_;
|
SkipList<const char*, const MemTableRep::KeyComparator&> skip_list_;
|
||||||
|
const MemTableRep::KeyComparator& cmp_;
|
||||||
|
const SliceTransform* transform_;
|
||||||
|
const size_t lookahead_;
|
||||||
|
|
||||||
|
friend class LookaheadIterator;
|
||||||
public:
|
public:
|
||||||
explicit SkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena)
|
explicit SkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena,
|
||||||
: MemTableRep(arena), skip_list_(compare, arena) {
|
const SliceTransform* transform, const size_t lookahead)
|
||||||
|
: MemTableRep(arena), skip_list_(compare, arena), cmp_(compare),
|
||||||
|
transform_(transform), lookahead_(lookahead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert key into the list.
|
// Insert key into the list.
|
||||||
@ -106,11 +113,110 @@ public:
|
|||||||
std::string tmp_; // For passing to EncodeKey
|
std::string tmp_; // For passing to EncodeKey
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Iterator over the contents of a skip list which also keeps track of the
|
||||||
|
// previously visited node. In Seek(), it examines a few nodes after it
|
||||||
|
// first, falling back to O(log n) search from the head of the list only if
|
||||||
|
// the target key hasn't been found.
|
||||||
|
class LookaheadIterator : public MemTableRep::Iterator {
|
||||||
|
public:
|
||||||
|
explicit LookaheadIterator(const SkipListRep& rep) :
|
||||||
|
rep_(rep), iter_(&rep_.skip_list_), prev_(iter_) {}
|
||||||
|
|
||||||
|
virtual ~LookaheadIterator() override {}
|
||||||
|
|
||||||
|
virtual bool Valid() const override {
|
||||||
|
return iter_.Valid();
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char *key() const override {
|
||||||
|
assert(Valid());
|
||||||
|
return iter_.key();
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void Next() override {
|
||||||
|
assert(Valid());
|
||||||
|
|
||||||
|
bool advance_prev = true;
|
||||||
|
if (prev_.Valid()) {
|
||||||
|
auto k1 = rep_.UserKey(prev_.key());
|
||||||
|
auto k2 = rep_.UserKey(iter_.key());
|
||||||
|
|
||||||
|
if (k1.compare(k2) == 0) {
|
||||||
|
// same user key, don't move prev_
|
||||||
|
advance_prev = false;
|
||||||
|
} else if (rep_.transform_) {
|
||||||
|
// only advance prev_ if it has the same prefix as iter_
|
||||||
|
auto t1 = rep_.transform_->Transform(k1);
|
||||||
|
auto t2 = rep_.transform_->Transform(k2);
|
||||||
|
advance_prev = t1.compare(t2) == 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (advance_prev) {
|
||||||
|
prev_ = iter_;
|
||||||
|
}
|
||||||
|
iter_.Next();
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void Prev() override {
|
||||||
|
assert(Valid());
|
||||||
|
iter_.Prev();
|
||||||
|
prev_ = iter_;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void Seek(const Slice& internal_key, const char *memtable_key)
|
||||||
|
override {
|
||||||
|
const char *encoded_key =
|
||||||
|
(memtable_key != nullptr) ?
|
||||||
|
memtable_key : EncodeKey(&tmp_, internal_key);
|
||||||
|
|
||||||
|
if (prev_.Valid() && rep_.cmp_(encoded_key, prev_.key()) >= 0) {
|
||||||
|
// prev_.key() is smaller or equal to our target key; do a quick
|
||||||
|
// linear search (at most lookahead_ steps) starting from prev_
|
||||||
|
iter_ = prev_;
|
||||||
|
|
||||||
|
size_t cur = 0;
|
||||||
|
while (cur++ <= rep_.lookahead_ && iter_.Valid()) {
|
||||||
|
if (rep_.cmp_(encoded_key, iter_.key()) <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
iter_.Seek(encoded_key);
|
||||||
|
prev_ = iter_;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void SeekToFirst() override {
|
||||||
|
iter_.SeekToFirst();
|
||||||
|
prev_ = iter_;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void SeekToLast() override {
|
||||||
|
iter_.SeekToLast();
|
||||||
|
prev_ = iter_;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
std::string tmp_; // For passing to EncodeKey
|
||||||
|
|
||||||
|
private:
|
||||||
|
const SkipListRep& rep_;
|
||||||
|
SkipList<const char*, const MemTableRep::KeyComparator&>::Iterator iter_;
|
||||||
|
SkipList<const char*, const MemTableRep::KeyComparator&>::Iterator prev_;
|
||||||
|
};
|
||||||
|
|
||||||
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {
|
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {
|
||||||
if (arena == nullptr) {
|
if (lookahead_ > 0) {
|
||||||
return new SkipListRep::Iterator(&skip_list_);
|
void *mem =
|
||||||
|
arena ? arena->AllocateAligned(sizeof(SkipListRep::LookaheadIterator))
|
||||||
|
: operator new(sizeof(SkipListRep::LookaheadIterator));
|
||||||
|
return new (mem) SkipListRep::LookaheadIterator(*this);
|
||||||
} else {
|
} else {
|
||||||
auto mem = arena->AllocateAligned(sizeof(SkipListRep::Iterator));
|
void *mem =
|
||||||
|
arena ? arena->AllocateAligned(sizeof(SkipListRep::Iterator))
|
||||||
|
: operator new(sizeof(SkipListRep::Iterator));
|
||||||
return new (mem) SkipListRep::Iterator(&skip_list_);
|
return new (mem) SkipListRep::Iterator(&skip_list_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -119,8 +225,8 @@ public:
|
|||||||
|
|
||||||
MemTableRep* SkipListFactory::CreateMemTableRep(
|
MemTableRep* SkipListFactory::CreateMemTableRep(
|
||||||
const MemTableRep::KeyComparator& compare, Arena* arena,
|
const MemTableRep::KeyComparator& compare, Arena* arena,
|
||||||
const SliceTransform*, Logger* logger) {
|
const SliceTransform* transform, Logger* logger) {
|
||||||
return new SkipListRep(compare, arena);
|
return new SkipListRep(compare, arena, transform, lookahead_);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
Loading…
x
Reference in New Issue
Block a user