From d286b5df90baae159b01262e3f23041afd02d3e3 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 25 Aug 2015 13:40:52 -0700 Subject: [PATCH] DBIter to out extra keys with higher sequence numbers when changing direction from forward to backward Summary: When DBIter changes iterating direction from forward to backward, it might see some much larger keys with higher sequence ID. With this commit, these rows will be actively filtered out. It should fix existing disabled tests in db_iter_test. This may not be a perfect fix, but it introduces least impact on existing codes, in order to be safe. Test Plan: Enable existing tests and make sure they pass. Add a new test DBIterWithMergeIterTest.InnerMergeIteratorDataRace8. Also run all existing tests. Reviewers: yhchiang, rven, anthony, IslamAbdelRahman, kradhakrishnan, igor Reviewed By: igor Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D45567 --- db/db_iter.cc | 26 ++++++++++++++----------- db/db_iter_test.cc | 48 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/db/db_iter.cc b/db/db_iter.cc index 2e17b4dd9..471cadf9d 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -595,19 +595,23 @@ void DBIter::FindPrevUserKey() { size_t num_skipped = 0; ParsedInternalKey ikey; FindParseableKey(&ikey, kReverse); - while (iter_->Valid() && - user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) { - if (num_skipped >= max_skip_) { - num_skipped = 0; - IterKey last_key; - last_key.SetInternalKey(ParsedInternalKey( - saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek)); - iter_->Seek(last_key.GetKey()); - RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); + int cmp; + while (iter_->Valid() && ((cmp = user_comparator_->Compare( + ikey.user_key, saved_key_.GetKey())) == 0 || + (cmp > 0 && ikey.sequence > sequence_))) { + if (cmp == 0) { + if (num_skipped >= max_skip_) { + num_skipped = 0; + IterKey last_key; + last_key.SetInternalKey(ParsedInternalKey( + saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek)); + iter_->Seek(last_key.GetKey()); + RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); + } else { + ++num_skipped; + } } - iter_->Prev(); - ++num_skipped; FindParseableKey(&ikey, kReverse); } } diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 87512edea..73ef91b6a 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -1920,8 +1920,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } -// Not passing for a bug of not handling the case -TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace2) { +TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace2) { // Test Prev() when one child iterator is at its end but more rows // are added. db_iter_->Seek("f"); @@ -1959,8 +1958,7 @@ TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace2) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } -// Not passing for a bug of not handling the case -TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace3) { +TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace3) { // Test Prev() when one child iterator is at its end but more rows // are added and max_skipped is triggered. db_iter_->Seek("f"); @@ -2002,7 +2000,7 @@ TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace3) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } -TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace4) { +TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace4) { // Test Prev() when one child iterator has more rows inserted // between Seek() and Prev() when changing directions. internal_iter2_->Add("z", kTypeValue, "9", 4u); @@ -2053,7 +2051,7 @@ TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace4) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } -TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace5) { +TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace5) { internal_iter2_->Add("z", kTypeValue, "9", 4u); // Test Prev() when one child iterator has more rows inserted @@ -2146,7 +2144,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace6) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } -TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace7) { +TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace7) { internal_iter1_->Add("u", kTypeValue, "10", 4u); internal_iter1_->Add("v", kTypeValue, "11", 4u); internal_iter1_->Add("w", kTypeValue, "12", 4u); @@ -2200,6 +2198,42 @@ TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace7) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) { + // internal_iter1_: a, f, g + // internal_iter2_: a, b, c, d, adding (z) + internal_iter2_->Add("z", kTypeValue, "9", 4u); + + // Test Prev() when one child iterator has more rows inserted + // between Seek() and Prev() when changing directions. + db_iter_->Seek("g"); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "g"); + ASSERT_EQ(db_iter_->value().ToString(), "3"); + + // Test call back inserts two keys before "z" in mem table after + // MergeIterator::Prev() calls mem table iterator's Seek() and + // before calling Prev() + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "MergeIterator::Prev:BeforePrev", [&](void* arg) { + IteratorWrapper* it = reinterpret_cast(arg); + if (it->key().starts_with("z")) { + internal_iter2_->Add("x", kTypeValue, "7", 16u, true); + internal_iter2_->Add("y", kTypeValue, "7", 17u, true); + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "f"); + ASSERT_EQ(db_iter_->value().ToString(), "2"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "d"); + ASSERT_EQ(db_iter_->value().ToString(), "7"); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} } // namespace rocksdb int main(int argc, char** argv) {