new Prev() prefix support using SeekForPrev()
Summary: 1) The previous solution for Prev() prefix support is not clean. Since I add api SeekForPrev(), now the Prev() can be symmetric to Next(). and we do not need SeekToLast() to be called in Prev() any more. Also, Next() will Seek(prefix_seek_key_) to solve the problem of possible inconsistency between db_iter and merge_iter when there is merge_operator. And prefix_seek_key is only refreshed when change direction to forward. 2) This diff also solves the bug of Iterator::SeekToLast() with iterate_upper_bound_ with prefix extractor. add test cases for the above two cases. There are some tests for the SeekToLast() in Prev(), I will clean them later. Test Plan: make all check Reviewers: IslamAbdelRahman, andrewkr, yiwu, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D63933
This commit is contained in:
parent
991b585ee0
commit
447f17127c
@ -4345,7 +4345,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
||||
env_, *cfd->ioptions(), cfd->user_comparator(), snapshot,
|
||||
sv->mutable_cf_options.max_sequential_skip_in_iterations,
|
||||
sv->version_number, read_options.iterate_upper_bound,
|
||||
read_options.prefix_same_as_start, read_options.pin_data);
|
||||
read_options.prefix_same_as_start, read_options.pin_data,
|
||||
read_options.total_order_seek);
|
||||
|
||||
InternalIterator* internal_iter =
|
||||
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena());
|
||||
|
@ -105,7 +105,8 @@ class DBIter: public Iterator {
|
||||
InternalIterator* iter, SequenceNumber s, bool arena_mode,
|
||||
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
|
||||
const Slice* iterate_upper_bound = nullptr,
|
||||
bool prefix_same_as_start = false, bool pin_data = false)
|
||||
bool prefix_same_as_start = false, bool pin_data = false,
|
||||
bool total_order_seek = false)
|
||||
: arena_mode_(arena_mode),
|
||||
env_(env),
|
||||
logger_(ioptions.info_log),
|
||||
@ -120,7 +121,8 @@ class DBIter: public Iterator {
|
||||
version_number_(version_number),
|
||||
iterate_upper_bound_(iterate_upper_bound),
|
||||
prefix_same_as_start_(prefix_same_as_start),
|
||||
pin_thru_lifetime_(pin_data) {
|
||||
pin_thru_lifetime_(pin_data),
|
||||
total_order_seek_(total_order_seek) {
|
||||
RecordTick(statistics_, NO_ITERATORS);
|
||||
prefix_extractor_ = ioptions.prefix_extractor;
|
||||
max_skip_ = max_sequential_skip_in_iterations;
|
||||
@ -204,6 +206,7 @@ class DBIter: public Iterator {
|
||||
virtual void SeekToLast() override;
|
||||
|
||||
private:
|
||||
void ReverseToForward();
|
||||
void ReverseToBackward();
|
||||
void PrevInternal();
|
||||
void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
|
||||
@ -256,6 +259,7 @@ class DBIter: public Iterator {
|
||||
Direction direction_;
|
||||
bool valid_;
|
||||
bool current_entry_is_merged_;
|
||||
// for prefix seek mode to support prev()
|
||||
Statistics* statistics_;
|
||||
uint64_t max_skip_;
|
||||
uint64_t version_number_;
|
||||
@ -266,6 +270,7 @@ class DBIter: public Iterator {
|
||||
// Means that we will pin all data blocks we read as long the Iterator
|
||||
// is not deleted, will be true if ReadOptions::pin_data is true
|
||||
const bool pin_thru_lifetime_;
|
||||
const bool total_order_seek_;
|
||||
// List of operands for merge operator.
|
||||
MergeContext merge_context_;
|
||||
LocalStatistics local_stats_;
|
||||
@ -294,11 +299,7 @@ void DBIter::Next() {
|
||||
// Release temporarily pinned blocks from last operation
|
||||
ReleaseTempPinnedData();
|
||||
if (direction_ == kReverse) {
|
||||
FindNextUserKey();
|
||||
direction_ = kForward;
|
||||
if (!iter_->Valid()) {
|
||||
iter_->SeekToFirst();
|
||||
}
|
||||
ReverseToForward();
|
||||
} else if (iter_->Valid() && !current_entry_is_merged_) {
|
||||
// If the current value is not a merge, the iter position is the
|
||||
// current key, which is already returned. We can safely issue a
|
||||
@ -506,7 +507,29 @@ void DBIter::Prev() {
|
||||
}
|
||||
}
|
||||
|
||||
void DBIter::ReverseToForward() {
|
||||
if (prefix_extractor_ != nullptr && !total_order_seek_) {
|
||||
IterKey last_key;
|
||||
last_key.SetInternalKey(ParsedInternalKey(
|
||||
saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
|
||||
Slice db_iter_key = last_key.GetKey();
|
||||
iter_->ResetPrefixSeekKey(&db_iter_key);
|
||||
}
|
||||
FindNextUserKey();
|
||||
direction_ = kForward;
|
||||
if (!iter_->Valid()) {
|
||||
iter_->SeekToFirst();
|
||||
}
|
||||
}
|
||||
|
||||
void DBIter::ReverseToBackward() {
|
||||
if (prefix_extractor_ != nullptr && !total_order_seek_) {
|
||||
IterKey last_key;
|
||||
last_key.SetInternalKey(
|
||||
ParsedInternalKey(saved_key_.GetKey(), 0, kValueTypeForSeekForPrev));
|
||||
Slice db_iter_key = last_key.GetKey();
|
||||
iter_->ResetPrefixSeekKey(&db_iter_key);
|
||||
}
|
||||
if (current_entry_is_merged_) {
|
||||
// Not placed in the same key. Need to call Prev() until finding the
|
||||
// previous key.
|
||||
@ -794,7 +817,6 @@ void DBIter::Seek(const Slice& target) {
|
||||
PERF_TIMER_GUARD(seek_internal_seek_time);
|
||||
iter_->Seek(saved_key_.GetKey());
|
||||
}
|
||||
|
||||
RecordTick(statistics_, NUMBER_DB_SEEK);
|
||||
if (iter_->Valid()) {
|
||||
if (prefix_extractor_ && prefix_same_as_start_) {
|
||||
@ -815,6 +837,7 @@ void DBIter::Seek(const Slice& target) {
|
||||
} else {
|
||||
valid_ = false;
|
||||
}
|
||||
|
||||
if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
|
||||
prefix_start_buf_.SetKey(prefix_start_key_);
|
||||
prefix_start_key_ = prefix_start_buf_.GetKey();
|
||||
@ -911,25 +934,15 @@ void DBIter::SeekToLast() {
|
||||
// it will seek to the last key before the
|
||||
// ReadOptions.iterate_upper_bound
|
||||
if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
|
||||
saved_key_.SetKey(*iterate_upper_bound_, false /* copy */);
|
||||
std::string last_key;
|
||||
AppendInternalKey(&last_key,
|
||||
ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber,
|
||||
kValueTypeForSeek));
|
||||
|
||||
iter_->Seek(last_key);
|
||||
|
||||
if (!iter_->Valid()) {
|
||||
iter_->SeekToLast();
|
||||
} else {
|
||||
iter_->Prev();
|
||||
if (!iter_->Valid()) {
|
||||
valid_ = false;
|
||||
return;
|
||||
}
|
||||
SeekForPrev(*iterate_upper_bound_);
|
||||
if (!Valid()) {
|
||||
return;
|
||||
} else if (user_comparator_->Equal(*iterate_upper_bound_, key())) {
|
||||
Prev();
|
||||
}
|
||||
} else {
|
||||
PrevInternal();
|
||||
}
|
||||
PrevInternal();
|
||||
if (statistics_ != nullptr) {
|
||||
RecordTick(statistics_, NUMBER_DB_SEEK);
|
||||
if (valid_) {
|
||||
@ -943,18 +956,16 @@ void DBIter::SeekToLast() {
|
||||
}
|
||||
}
|
||||
|
||||
Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
|
||||
const Comparator* user_key_comparator,
|
||||
InternalIterator* internal_iter,
|
||||
const SequenceNumber& sequence,
|
||||
uint64_t max_sequential_skip_in_iterations,
|
||||
uint64_t version_number,
|
||||
const Slice* iterate_upper_bound,
|
||||
bool prefix_same_as_start, bool pin_data) {
|
||||
DBIter* db_iter =
|
||||
new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
|
||||
false, max_sequential_skip_in_iterations, version_number,
|
||||
iterate_upper_bound, prefix_same_as_start, pin_data);
|
||||
Iterator* NewDBIterator(
|
||||
Env* env, const ImmutableCFOptions& ioptions,
|
||||
const Comparator* user_key_comparator, InternalIterator* internal_iter,
|
||||
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
|
||||
uint64_t version_number, const Slice* iterate_upper_bound,
|
||||
bool prefix_same_as_start, bool pin_data, bool total_order_seek) {
|
||||
DBIter* db_iter = new DBIter(
|
||||
env, ioptions, user_key_comparator, internal_iter, sequence, false,
|
||||
max_sequential_skip_in_iterations, version_number, iterate_upper_bound,
|
||||
prefix_same_as_start, pin_data, total_order_seek);
|
||||
return db_iter;
|
||||
}
|
||||
|
||||
@ -993,15 +1004,15 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
|
||||
Env* env, const ImmutableCFOptions& ioptions,
|
||||
const Comparator* user_key_comparator, const SequenceNumber& sequence,
|
||||
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
|
||||
const Slice* iterate_upper_bound, bool prefix_same_as_start,
|
||||
bool pin_data) {
|
||||
const Slice* iterate_upper_bound, bool prefix_same_as_start, bool pin_data,
|
||||
bool total_order_seek) {
|
||||
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
|
||||
Arena* arena = iter->GetArena();
|
||||
auto mem = arena->AllocateAligned(sizeof(DBIter));
|
||||
DBIter* db_iter =
|
||||
new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
|
||||
true, max_sequential_skip_in_iterations, version_number,
|
||||
iterate_upper_bound, prefix_same_as_start, pin_data);
|
||||
DBIter* db_iter = new (mem) DBIter(
|
||||
env, ioptions, user_key_comparator, nullptr, sequence, true,
|
||||
max_sequential_skip_in_iterations, version_number, iterate_upper_bound,
|
||||
prefix_same_as_start, pin_data, total_order_seek);
|
||||
|
||||
iter->SetDBIter(db_iter);
|
||||
|
||||
|
@ -31,7 +31,8 @@ extern Iterator* NewDBIterator(
|
||||
const Comparator* user_key_comparator, InternalIterator* internal_iter,
|
||||
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
|
||||
uint64_t version_number, const Slice* iterate_upper_bound = nullptr,
|
||||
bool prefix_same_as_start = false, bool pin_data = false);
|
||||
bool prefix_same_as_start = false, bool pin_data = false,
|
||||
bool total_order_seek = false);
|
||||
|
||||
// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
|
||||
// iterator is supposed be allocated. This class is used as an entry point of
|
||||
@ -78,6 +79,7 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
|
||||
const Comparator* user_key_comparator, const SequenceNumber& sequence,
|
||||
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
|
||||
const Slice* iterate_upper_bound = nullptr,
|
||||
bool prefix_same_as_start = false, bool pin_data = false);
|
||||
bool prefix_same_as_start = false, bool pin_data = false,
|
||||
bool total_order_seek = false);
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -357,7 +357,7 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
|
||||
db_iter->SeekToLast();
|
||||
|
||||
ASSERT_TRUE(db_iter->Valid());
|
||||
ASSERT_EQ(static_cast<int>(perf_context.internal_key_skipped_count), 1);
|
||||
ASSERT_EQ(static_cast<int>(perf_context.internal_key_skipped_count), 7);
|
||||
ASSERT_EQ(db_iter->key().ToString(), "b");
|
||||
|
||||
SetPerfLevel(kDisable);
|
||||
@ -480,7 +480,7 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
|
||||
db_iter->SeekToLast();
|
||||
|
||||
ASSERT_TRUE(db_iter->Valid());
|
||||
ASSERT_EQ(static_cast<int>(perf_context.internal_delete_skipped_count), 0);
|
||||
ASSERT_EQ(static_cast<int>(perf_context.internal_delete_skipped_count), 1);
|
||||
ASSERT_EQ(db_iter->key().ToString(), "b");
|
||||
|
||||
SetPerfLevel(kDisable);
|
||||
|
@ -18,6 +18,7 @@ int main() {
|
||||
#include <vector>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include "db/db_impl.h"
|
||||
#include "rocksdb/comparator.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/filter_policy.h"
|
||||
@ -26,9 +27,11 @@ int main() {
|
||||
#include "rocksdb/slice_transform.h"
|
||||
#include "rocksdb/table.h"
|
||||
#include "util/histogram.h"
|
||||
#include "util/random.h"
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/string_util.h"
|
||||
#include "util/testharness.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
|
||||
using GFLAGS::ParseCommandLineFlags;
|
||||
|
||||
@ -46,6 +49,7 @@ DEFINE_int32(skiplist_height, 4, "");
|
||||
DEFINE_double(memtable_prefix_bloom_size_ratio, 0.1, "");
|
||||
DEFINE_int32(memtable_huge_page_size, 2 * 1024 * 1024, "");
|
||||
DEFINE_int32(value_size, 40, "");
|
||||
DEFINE_bool(enable_print, false, "Print options generated to console.");
|
||||
|
||||
// Path to the database on file system
|
||||
const std::string kDbName = rocksdb::test::TmpDir() + "/prefix_test";
|
||||
@ -106,6 +110,10 @@ class TestKeyComparator : public Comparator {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool operator()(const TestKey& a, const TestKey& b) const {
|
||||
return Compare(TestKeyToSlice(a), TestKeyToSlice(b)) < 0;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "TestKeyComparator";
|
||||
}
|
||||
@ -124,6 +132,23 @@ void PutKey(DB* db, WriteOptions write_options, uint64_t prefix,
|
||||
ASSERT_OK(db->Put(write_options, key, value));
|
||||
}
|
||||
|
||||
void PutKey(DB* db, WriteOptions write_options, const TestKey& test_key,
|
||||
const Slice& value) {
|
||||
Slice key = TestKeyToSlice(test_key);
|
||||
ASSERT_OK(db->Put(write_options, key, value));
|
||||
}
|
||||
|
||||
void MergeKey(DB* db, WriteOptions write_options, const TestKey& test_key,
|
||||
const Slice& value) {
|
||||
Slice key = TestKeyToSlice(test_key);
|
||||
ASSERT_OK(db->Merge(write_options, key, value));
|
||||
}
|
||||
|
||||
void DeleteKey(DB* db, WriteOptions write_options, const TestKey& test_key) {
|
||||
Slice key = TestKeyToSlice(test_key);
|
||||
ASSERT_OK(db->Delete(write_options, key));
|
||||
}
|
||||
|
||||
void SeekIterator(Iterator* iter, uint64_t prefix, uint64_t suffix) {
|
||||
TestKey test_key(prefix, suffix);
|
||||
Slice key = TestKeyToSlice(test_key);
|
||||
@ -629,8 +654,206 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(PrefixTest, PrefixSeekModePrev) {
|
||||
// Only for SkipListFactory
|
||||
options.memtable_factory.reset(new SkipListFactory);
|
||||
options.merge_operator = MergeOperators::CreatePutOperator();
|
||||
options.write_buffer_size = 1024 * 1024;
|
||||
Random rnd(1);
|
||||
for (size_t m = 1; m < 100; m++) {
|
||||
std::cout << "[" + std::to_string(m) + "]" + "*** Mem table: "
|
||||
<< options.memtable_factory->Name() << std::endl;
|
||||
DestroyDB(kDbName, Options());
|
||||
auto db = OpenDb();
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
std::map<TestKey, std::string, TestKeyComparator> entry_maps[3], whole_map;
|
||||
for (uint64_t i = 0; i < 10; i++) {
|
||||
int div = i % 3 + 1;
|
||||
for (uint64_t j = 0; j < 10; j++) {
|
||||
whole_map[TestKey(i, j)] = entry_maps[rnd.Uniform(div)][TestKey(i, j)] =
|
||||
'v' + std::to_string(i) + std::to_string(j);
|
||||
}
|
||||
}
|
||||
|
||||
std::map<TestKey, std::string, TestKeyComparator> type_map;
|
||||
for (size_t i = 0; i < 3; i++) {
|
||||
for (auto& kv : entry_maps[i]) {
|
||||
if (rnd.OneIn(3)) {
|
||||
PutKey(db.get(), write_options, kv.first, kv.second);
|
||||
type_map[kv.first] = "value";
|
||||
} else {
|
||||
MergeKey(db.get(), write_options, kv.first, kv.second);
|
||||
type_map[kv.first] = "merge";
|
||||
}
|
||||
}
|
||||
if (i < 2) {
|
||||
db->Flush(FlushOptions());
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < 2; i++) {
|
||||
for (auto& kv : entry_maps[i]) {
|
||||
if (rnd.OneIn(10)) {
|
||||
whole_map.erase(kv.first);
|
||||
DeleteKey(db.get(), write_options, kv.first);
|
||||
entry_maps[2][kv.first] = "delete";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (FLAGS_enable_print) {
|
||||
for (size_t i = 0; i < 3; i++) {
|
||||
for (auto& kv : entry_maps[i]) {
|
||||
std::cout << "[" << i << "]" << kv.first.prefix << kv.first.sorted
|
||||
<< " " << kv.second + " " + type_map[kv.first] << std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
|
||||
for (uint64_t prefix = 0; prefix < 10; prefix++) {
|
||||
uint64_t start_suffix = rnd.Uniform(9);
|
||||
SeekIterator(iter.get(), prefix, start_suffix);
|
||||
auto it = whole_map.find(TestKey(prefix, start_suffix));
|
||||
if (it == whole_map.end()) {
|
||||
continue;
|
||||
}
|
||||
ASSERT_NE(it, whole_map.end());
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
if (FLAGS_enable_print) {
|
||||
std::cout << "round " << prefix
|
||||
<< " iter: " << SliceToTestKey(iter->key())->prefix
|
||||
<< SliceToTestKey(iter->key())->sorted
|
||||
<< " | map: " << it->first.prefix << it->first.sorted << " | "
|
||||
<< iter->value().ToString() << " " << it->second << std::endl;
|
||||
}
|
||||
ASSERT_EQ(iter->value(), it->second);
|
||||
uint64_t stored_prefix = prefix;
|
||||
for (size_t k = 0; k < 9; k++) {
|
||||
if (rnd.OneIn(2) || it == whole_map.begin()) {
|
||||
iter->Next();
|
||||
it++;
|
||||
if (FLAGS_enable_print) {
|
||||
std::cout << "Next >> ";
|
||||
}
|
||||
} else {
|
||||
iter->Prev();
|
||||
it--;
|
||||
if (FLAGS_enable_print) {
|
||||
std::cout << "Prev >> ";
|
||||
}
|
||||
}
|
||||
if (!iter->Valid() ||
|
||||
SliceToTestKey(iter->key())->prefix != stored_prefix) {
|
||||
break;
|
||||
}
|
||||
stored_prefix = SliceToTestKey(iter->key())->prefix;
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_NE(it, whole_map.end());
|
||||
ASSERT_EQ(iter->value(), it->second);
|
||||
if (FLAGS_enable_print) {
|
||||
std::cout << "iter: " << SliceToTestKey(iter->key())->prefix
|
||||
<< SliceToTestKey(iter->key())->sorted
|
||||
<< " | map: " << it->first.prefix << it->first.sorted
|
||||
<< " | " << iter->value().ToString() << " " << it->second
|
||||
<< std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(PrefixTest, PrefixSeekModePrev2) {
|
||||
// Only for SkipListFactory
|
||||
// test the case
|
||||
// iter1 iter2
|
||||
// | prefix | suffix | | prefix | suffix |
|
||||
// | 1 | 1 | | 1 | 2 |
|
||||
// | 1 | 3 | | 1 | 4 |
|
||||
// | 2 | 1 | | 3 | 3 |
|
||||
// | 2 | 2 | | 3 | 4 |
|
||||
// after seek(15), iter1 will be at 21 and iter2 will be 33.
|
||||
// Then if call Prev() in prefix mode where SeekForPrev(21) gets called,
|
||||
// iter2 should turn to invalid state because of bloom filter.
|
||||
options.memtable_factory.reset(new SkipListFactory);
|
||||
options.write_buffer_size = 1024 * 1024;
|
||||
std::string v13("v13");
|
||||
DestroyDB(kDbName, Options());
|
||||
auto db = OpenDb();
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
PutKey(db.get(), write_options, TestKey(1, 2), "v12");
|
||||
PutKey(db.get(), write_options, TestKey(1, 4), "v14");
|
||||
PutKey(db.get(), write_options, TestKey(3, 3), "v33");
|
||||
PutKey(db.get(), write_options, TestKey(3, 4), "v34");
|
||||
db->Flush(FlushOptions());
|
||||
reinterpret_cast<DBImpl*>(db.get())->TEST_WaitForFlushMemTable();
|
||||
PutKey(db.get(), write_options, TestKey(1, 1), "v11");
|
||||
PutKey(db.get(), write_options, TestKey(1, 3), "v13");
|
||||
PutKey(db.get(), write_options, TestKey(2, 1), "v21");
|
||||
PutKey(db.get(), write_options, TestKey(2, 2), "v22");
|
||||
db->Flush(FlushOptions());
|
||||
reinterpret_cast<DBImpl*>(db.get())->TEST_WaitForFlushMemTable();
|
||||
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
|
||||
SeekIterator(iter.get(), 1, 5);
|
||||
iter->Prev();
|
||||
ASSERT_EQ(iter->value(), v13);
|
||||
}
|
||||
|
||||
TEST_F(PrefixTest, PrefixSeekModePrev3) {
|
||||
// Only for SkipListFactory
|
||||
// test SeekToLast() with iterate_upper_bound_ in prefix_seek_mode
|
||||
options.memtable_factory.reset(new SkipListFactory);
|
||||
options.write_buffer_size = 1024 * 1024;
|
||||
std::string v14("v14");
|
||||
TestKey upper_bound_key = TestKey(1, 5);
|
||||
Slice upper_bound = TestKeyToSlice(upper_bound_key);
|
||||
|
||||
{
|
||||
DestroyDB(kDbName, Options());
|
||||
auto db = OpenDb();
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
read_options.iterate_upper_bound = &upper_bound;
|
||||
PutKey(db.get(), write_options, TestKey(1, 2), "v12");
|
||||
PutKey(db.get(), write_options, TestKey(1, 4), "v14");
|
||||
db->Flush(FlushOptions());
|
||||
reinterpret_cast<DBImpl*>(db.get())->TEST_WaitForFlushMemTable();
|
||||
PutKey(db.get(), write_options, TestKey(1, 1), "v11");
|
||||
PutKey(db.get(), write_options, TestKey(1, 3), "v13");
|
||||
PutKey(db.get(), write_options, TestKey(2, 1), "v21");
|
||||
PutKey(db.get(), write_options, TestKey(2, 2), "v22");
|
||||
db->Flush(FlushOptions());
|
||||
reinterpret_cast<DBImpl*>(db.get())->TEST_WaitForFlushMemTable();
|
||||
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
|
||||
iter->SeekToLast();
|
||||
ASSERT_EQ(iter->value(), v14);
|
||||
}
|
||||
{
|
||||
DestroyDB(kDbName, Options());
|
||||
auto db = OpenDb();
|
||||
WriteOptions write_options;
|
||||
ReadOptions read_options;
|
||||
read_options.iterate_upper_bound = &upper_bound;
|
||||
PutKey(db.get(), write_options, TestKey(1, 2), "v12");
|
||||
PutKey(db.get(), write_options, TestKey(1, 4), "v14");
|
||||
PutKey(db.get(), write_options, TestKey(3, 3), "v33");
|
||||
PutKey(db.get(), write_options, TestKey(3, 4), "v34");
|
||||
db->Flush(FlushOptions());
|
||||
reinterpret_cast<DBImpl*>(db.get())->TEST_WaitForFlushMemTable();
|
||||
PutKey(db.get(), write_options, TestKey(1, 1), "v11");
|
||||
PutKey(db.get(), write_options, TestKey(1, 3), "v13");
|
||||
db->Flush(FlushOptions());
|
||||
reinterpret_cast<DBImpl*>(db.get())->TEST_WaitForFlushMemTable();
|
||||
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
|
||||
iter->SeekToLast();
|
||||
ASSERT_EQ(iter->value(), v14);
|
||||
}
|
||||
}
|
||||
|
||||
} // end namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
ParseCommandLineFlags(&argc, &argv, true);
|
||||
|
@ -94,6 +94,12 @@ class InternalIterator : public Cleanable {
|
||||
virtual Status GetProperty(std::string prop_name, std::string* prop) {
|
||||
return Status::NotSupported("");
|
||||
}
|
||||
// Reset the key used for Seek() in merge iterator, especially for prefix
|
||||
// seek mode
|
||||
// When in prefix_seek_mode, there is inconsistency between db_iterator and
|
||||
// merge iterator. This inconsistency can cause problem when do Seek() in
|
||||
// merge iterator in prefix mode.
|
||||
virtual void ResetPrefixSeekKey(const Slice* prefix_seek_key = nullptr) {}
|
||||
|
||||
protected:
|
||||
void SeekForPrevImpl(const Slice& target, const Comparator* cmp) {
|
||||
|
@ -8,6 +8,7 @@
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "table/merger.h"
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "db/pinned_iterators_manager.h"
|
||||
#include "rocksdb/comparator.h"
|
||||
@ -157,7 +158,7 @@ class MergingIterator : public InternalIterator {
|
||||
ClearHeaps();
|
||||
for (auto& child : children_) {
|
||||
if (&child != current_) {
|
||||
child.Seek(key());
|
||||
child.Seek(prefix_seek_key_ ? *prefix_seek_key_ : key());
|
||||
if (child.Valid() && comparator_->Equal(key(), child.key())) {
|
||||
child.Next();
|
||||
}
|
||||
@ -203,15 +204,9 @@ class MergingIterator : public InternalIterator {
|
||||
InitMaxHeap();
|
||||
for (auto& child : children_) {
|
||||
if (&child != current_) {
|
||||
child.Seek(key());
|
||||
if (child.Valid()) {
|
||||
// Child is at first entry >= key(). Step back one to be < key()
|
||||
TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
|
||||
child.SeekForPrev(prefix_seek_key_ ? *prefix_seek_key_ : key());
|
||||
if (child.Valid() && comparator_->Equal(key(), child.key())) {
|
||||
child.Prev();
|
||||
} else {
|
||||
// Child has no entries >= key(). Position at last entry.
|
||||
TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast");
|
||||
child.SeekToLast();
|
||||
}
|
||||
}
|
||||
if (child.Valid()) {
|
||||
@ -219,11 +214,9 @@ class MergingIterator : public InternalIterator {
|
||||
}
|
||||
}
|
||||
direction_ = kReverse;
|
||||
// Note that we don't do assert(current_ == CurrentReverse()) here
|
||||
// because it is possible to have some keys larger than the seek-key
|
||||
// inserted between Seek() and SeekToLast(), which makes current_ not
|
||||
// equal to CurrentReverse().
|
||||
current_ = CurrentReverse();
|
||||
// The loop advanced all non-current children to be < key() so current_
|
||||
// should still be strictly the smallest key.
|
||||
assert(current_ == CurrentReverse());
|
||||
}
|
||||
|
||||
// For the heap modifications below to be correct, current_ must be the
|
||||
@ -284,6 +277,17 @@ class MergingIterator : public InternalIterator {
|
||||
current_->IsValuePinned();
|
||||
}
|
||||
|
||||
virtual void ResetPrefixSeekKey(const Slice* db_iter_key) override {
|
||||
if (db_iter_key == nullptr) {
|
||||
prefix_seek_key_.reset();
|
||||
return;
|
||||
}
|
||||
if (!prefix_seek_key_) {
|
||||
prefix_seek_key_.reset(new std::string);
|
||||
}
|
||||
*prefix_seek_key_ = db_iter_key->ToString();
|
||||
}
|
||||
|
||||
private:
|
||||
// Clears heaps for both directions, used when changing direction or seeking
|
||||
void ClearHeaps();
|
||||
@ -310,6 +314,7 @@ class MergingIterator : public InternalIterator {
|
||||
// forward. Lazily initialize it to save memory.
|
||||
std::unique_ptr<MergerMaxIterHeap> maxHeap_;
|
||||
PinnedIteratorsManager* pinned_iters_mgr_;
|
||||
std::unique_ptr<std::string> prefix_seek_key_;
|
||||
|
||||
IteratorWrapper* CurrentForward() const {
|
||||
assert(direction_ == kForward);
|
||||
|
Loading…
Reference in New Issue
Block a user