Fix a comparison in DBIter::FindPrevUserKey()

Summary:
When seek target is a merge key (`kTypeMerge`), `DBIter::FindNextUserEntry()`
advances the underlying iterator _past_ the current key (`saved_key_`); see
`MergeValuesNewToOld()`. However, `FindPrevUserKey()` assumes that `iter_`
points to an entry with the same user key as `saved_key_`. As a result,
`it->Seek(key) && it->Prev()` can cause the iterator to be positioned at the
_next_, instead of the previous, entry (new test, written by @lovro, reproduces
the bug).

This diff changes `FindPrevUserKey()` to also skip keys that are _greater_ than
`saved_key_`.

Test Plan: db_test

Reviewers: igor, sdong

Reviewed By: sdong

Subscribers: leveldb, dhruba, lovro

Differential Revision: https://reviews.facebook.net/D40791
This commit is contained in:
Tomislav Novak 2015-06-26 13:18:27 -07:00
parent 501591c423
commit ec70fea4c4
3 changed files with 103 additions and 2 deletions

View File

@ -350,6 +350,9 @@ void DBIter::MergeValuesNewToOld() {
void DBIter::Prev() {
assert(valid_);
if (direction_ == kForward) {
if (!iter_->Valid()) {
iter_->SeekToLast();
}
FindPrevUserKey();
direction_ = kReverse;
}
@ -553,7 +556,7 @@ void DBIter::FindNextUserKey() {
ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward);
while (iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) {
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
iter_->Next();
FindParseableKey(&ikey, kForward);
}
@ -568,7 +571,7 @@ void DBIter::FindPrevUserKey() {
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);
while (iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) {
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) >= 0) {
if (num_skipped >= max_skip_) {
num_skipped = 0;
IterKey last_key;

View File

@ -1668,6 +1668,81 @@ TEST_F(DBIteratorTest, DBIterator8) {
ASSERT_EQ(db_iter->value().ToString(), "0");
}
TEST_F(DBIteratorTest, DBIterator9) {
Options options;
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
{
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddMerge("a", "merge_1");
internal_iter->AddMerge("a", "merge_2");
internal_iter->AddMerge("b", "merge_3");
internal_iter->AddMerge("b", "merge_4");
internal_iter->AddMerge("d", "merge_5");
internal_iter->AddMerge("d", "merge_6");
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
10, options.max_sequential_skip_in_iterations));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
db_iter->Prev();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
ASSERT_EQ(db_iter->value().ToString(), "merge_3,merge_4");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "d");
ASSERT_EQ(db_iter->value().ToString(), "merge_5,merge_6");
db_iter->Seek("b");
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
ASSERT_EQ(db_iter->value().ToString(), "merge_3,merge_4");
db_iter->Prev();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
ASSERT_EQ(db_iter->value().ToString(), "merge_1,merge_2");
db_iter->Seek("c");
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "d");
ASSERT_EQ(db_iter->value().ToString(), "merge_5,merge_6");
db_iter->Prev();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
ASSERT_EQ(db_iter->value().ToString(), "merge_3,merge_4");
}
}
TEST_F(DBIteratorTest, DBIterator10) {
Options options;
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "1");
internal_iter->AddPut("b", "2");
internal_iter->AddPut("c", "3");
internal_iter->AddPut("d", "4");
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter,
10, options.max_sequential_skip_in_iterations));
db_iter->Seek("c");
ASSERT_TRUE(db_iter->Valid());
db_iter->Prev();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
ASSERT_EQ(db_iter->value().ToString(), "2");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
ASSERT_EQ(db_iter->value().ToString(), "3");
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -14109,6 +14109,29 @@ TEST_F(DBTest, RowCache) {
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
}
TEST_F(DBTest, PrevAfterMerge) {
Options options;
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreatePutOperator();
DestroyAndReopen(options);
// write three entries with different keys using Merge()
WriteOptions wopts;
db_->Merge(wopts, "1", "data1");
db_->Merge(wopts, "2", "data2");
db_->Merge(wopts, "3", "data3");
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
it->Seek("2");
ASSERT_TRUE(it->Valid());
ASSERT_EQ("2", it->key().ToString());
it->Prev();
ASSERT_TRUE(it->Valid());
ASSERT_EQ("1", it->key().ToString());
}
} // namespace rocksdb
int main(int argc, char** argv) {