DBIter to out extra keys with higher sequence numbers when changing direction from forward to backward
Summary: When DBIter changes iterating direction from forward to backward, it might see some much larger keys with higher sequence ID. With this commit, these rows will be actively filtered out. It should fix existing disabled tests in db_iter_test. This may not be a perfect fix, but it introduces least impact on existing codes, in order to be safe. Test Plan: Enable existing tests and make sure they pass. Add a new test DBIterWithMergeIterTest.InnerMergeIteratorDataRace8. Also run all existing tests. Reviewers: yhchiang, rven, anthony, IslamAbdelRahman, kradhakrishnan, igor Reviewed By: igor Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D45567
This commit is contained in:
parent
3795449c9d
commit
d286b5df90
@ -595,19 +595,23 @@ void DBIter::FindPrevUserKey() {
|
|||||||
size_t num_skipped = 0;
|
size_t num_skipped = 0;
|
||||||
ParsedInternalKey ikey;
|
ParsedInternalKey ikey;
|
||||||
FindParseableKey(&ikey, kReverse);
|
FindParseableKey(&ikey, kReverse);
|
||||||
while (iter_->Valid() &&
|
int cmp;
|
||||||
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) {
|
while (iter_->Valid() && ((cmp = user_comparator_->Compare(
|
||||||
if (num_skipped >= max_skip_) {
|
ikey.user_key, saved_key_.GetKey())) == 0 ||
|
||||||
num_skipped = 0;
|
(cmp > 0 && ikey.sequence > sequence_))) {
|
||||||
IterKey last_key;
|
if (cmp == 0) {
|
||||||
last_key.SetInternalKey(ParsedInternalKey(
|
if (num_skipped >= max_skip_) {
|
||||||
saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
|
num_skipped = 0;
|
||||||
iter_->Seek(last_key.GetKey());
|
IterKey last_key;
|
||||||
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
|
last_key.SetInternalKey(ParsedInternalKey(
|
||||||
|
saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
|
||||||
|
iter_->Seek(last_key.GetKey());
|
||||||
|
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
|
||||||
|
} else {
|
||||||
|
++num_skipped;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
iter_->Prev();
|
iter_->Prev();
|
||||||
++num_skipped;
|
|
||||||
FindParseableKey(&ikey, kReverse);
|
FindParseableKey(&ikey, kReverse);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1920,8 +1920,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) {
|
|||||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not passing for a bug of not handling the case
|
TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace2) {
|
||||||
TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace2) {
|
|
||||||
// Test Prev() when one child iterator is at its end but more rows
|
// Test Prev() when one child iterator is at its end but more rows
|
||||||
// are added.
|
// are added.
|
||||||
db_iter_->Seek("f");
|
db_iter_->Seek("f");
|
||||||
@ -1959,8 +1958,7 @@ TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace2) {
|
|||||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not passing for a bug of not handling the case
|
TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace3) {
|
||||||
TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace3) {
|
|
||||||
// Test Prev() when one child iterator is at its end but more rows
|
// Test Prev() when one child iterator is at its end but more rows
|
||||||
// are added and max_skipped is triggered.
|
// are added and max_skipped is triggered.
|
||||||
db_iter_->Seek("f");
|
db_iter_->Seek("f");
|
||||||
@ -2002,7 +2000,7 @@ TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace3) {
|
|||||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace4) {
|
TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace4) {
|
||||||
// Test Prev() when one child iterator has more rows inserted
|
// Test Prev() when one child iterator has more rows inserted
|
||||||
// between Seek() and Prev() when changing directions.
|
// between Seek() and Prev() when changing directions.
|
||||||
internal_iter2_->Add("z", kTypeValue, "9", 4u);
|
internal_iter2_->Add("z", kTypeValue, "9", 4u);
|
||||||
@ -2053,7 +2051,7 @@ TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace4) {
|
|||||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace5) {
|
TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace5) {
|
||||||
internal_iter2_->Add("z", kTypeValue, "9", 4u);
|
internal_iter2_->Add("z", kTypeValue, "9", 4u);
|
||||||
|
|
||||||
// Test Prev() when one child iterator has more rows inserted
|
// Test Prev() when one child iterator has more rows inserted
|
||||||
@ -2146,7 +2144,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace6) {
|
|||||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace7) {
|
TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace7) {
|
||||||
internal_iter1_->Add("u", kTypeValue, "10", 4u);
|
internal_iter1_->Add("u", kTypeValue, "10", 4u);
|
||||||
internal_iter1_->Add("v", kTypeValue, "11", 4u);
|
internal_iter1_->Add("v", kTypeValue, "11", 4u);
|
||||||
internal_iter1_->Add("w", kTypeValue, "12", 4u);
|
internal_iter1_->Add("w", kTypeValue, "12", 4u);
|
||||||
@ -2200,6 +2198,42 @@ TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace7) {
|
|||||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) {
|
||||||
|
// internal_iter1_: a, f, g
|
||||||
|
// internal_iter2_: a, b, c, d, adding (z)
|
||||||
|
internal_iter2_->Add("z", kTypeValue, "9", 4u);
|
||||||
|
|
||||||
|
// Test Prev() when one child iterator has more rows inserted
|
||||||
|
// between Seek() and Prev() when changing directions.
|
||||||
|
db_iter_->Seek("g");
|
||||||
|
ASSERT_TRUE(db_iter_->Valid());
|
||||||
|
ASSERT_EQ(db_iter_->key().ToString(), "g");
|
||||||
|
ASSERT_EQ(db_iter_->value().ToString(), "3");
|
||||||
|
|
||||||
|
// Test call back inserts two keys before "z" in mem table after
|
||||||
|
// MergeIterator::Prev() calls mem table iterator's Seek() and
|
||||||
|
// before calling Prev()
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"MergeIterator::Prev:BeforePrev", [&](void* arg) {
|
||||||
|
IteratorWrapper* it = reinterpret_cast<IteratorWrapper*>(arg);
|
||||||
|
if (it->key().starts_with("z")) {
|
||||||
|
internal_iter2_->Add("x", kTypeValue, "7", 16u, true);
|
||||||
|
internal_iter2_->Add("y", kTypeValue, "7", 17u, true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
db_iter_->Prev();
|
||||||
|
ASSERT_TRUE(db_iter_->Valid());
|
||||||
|
ASSERT_EQ(db_iter_->key().ToString(), "f");
|
||||||
|
ASSERT_EQ(db_iter_->value().ToString(), "2");
|
||||||
|
db_iter_->Prev();
|
||||||
|
ASSERT_TRUE(db_iter_->Valid());
|
||||||
|
ASSERT_EQ(db_iter_->key().ToString(), "d");
|
||||||
|
ASSERT_EQ(db_iter_->value().ToString(), "7");
|
||||||
|
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
}
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
Loading…
Reference in New Issue
Block a user