diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 5fa5c5c82..bc89a6a76 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -8,14 +8,16 @@ #include #include +#include "db/db_iter.h" #include "db/dbformat.h" #include "rocksdb/comparator.h" #include "rocksdb/options.h" #include "rocksdb/perf_context.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" -#include "db/db_iter.h" +#include "table/merger.h" #include "util/string_util.h" +#include "util/sync_point.h" #include "util/testharness.h" #include "utilities/merge_operators.h" @@ -48,8 +50,13 @@ class TestIterator : public Iterator { } void Add(std::string argkey, ValueType type, std::string argvalue) { + Add(argkey, type, argvalue, sequence_number_++); + } + + void Add(std::string argkey, ValueType type, std::string argvalue, + size_t seq_num) { valid_ = true; - ParsedInternalKey internal_key(argkey, sequence_number_++, type); + ParsedInternalKey internal_key(argkey, seq_num, type); data_.push_back( std::pair(std::string(), argvalue)); AppendInternalKey(&data_.back().first, internal_key); @@ -1772,6 +1779,212 @@ TEST_F(DBIteratorTest, SeekToLastOccurrenceSeq0) { ASSERT_FALSE(db_iter->Valid()); } +class DBIterWithMergeIterTest : public testing::Test { + public: + DBIterWithMergeIterTest() + : env_(Env::Default()), icomp_(BytewiseComparator()) { + options_.merge_operator = nullptr; + + internal_iter1_ = new TestIterator(BytewiseComparator()); + internal_iter1_->Add("a", kTypeValue, "1", 3u); + internal_iter1_->Add("f", kTypeValue, "2", 5u); + internal_iter1_->Add("g", kTypeValue, "3", 7u); + internal_iter1_->Finish(); + + internal_iter2_ = new TestIterator(BytewiseComparator()); + internal_iter2_->Add("a", kTypeValue, "4", 6u); + internal_iter2_->Add("b", kTypeValue, "5", 1u); + internal_iter2_->Add("c", kTypeValue, "6", 2u); + internal_iter2_->Add("d", kTypeValue, "7", 3u); + internal_iter2_->Finish(); + + std::vector child_iters; + child_iters.push_back(internal_iter1_); + child_iters.push_back(internal_iter2_); + InternalKeyComparator icomp(BytewiseComparator()); + Iterator* merge_iter = NewMergingIterator(&icomp_, &child_iters[0], 2u); + + db_iter_.reset(NewDBIterator(env_, ImmutableCFOptions(options_), + BytewiseComparator(), merge_iter, + 8 /* read data earlier than seqId 8 */, + 3 /* max iterators before reseek */)); + } + + Env* env_; + Options options_; + TestIterator* internal_iter1_; + TestIterator* internal_iter2_; + InternalKeyComparator icomp_; + Iterator* merge_iter_; + std::unique_ptr db_iter_; +}; + +TEST_F(DBIterWithMergeIterTest, InnerMergeIterator1) { + db_iter_->SeekToFirst(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "a"); + ASSERT_EQ(db_iter_->value().ToString(), "4"); + db_iter_->Next(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "b"); + ASSERT_EQ(db_iter_->value().ToString(), "5"); + db_iter_->Next(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "c"); + ASSERT_EQ(db_iter_->value().ToString(), "6"); + db_iter_->Next(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "d"); + ASSERT_EQ(db_iter_->value().ToString(), "7"); + db_iter_->Next(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "f"); + ASSERT_EQ(db_iter_->value().ToString(), "2"); + db_iter_->Next(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "g"); + ASSERT_EQ(db_iter_->value().ToString(), "3"); + db_iter_->Next(); + ASSERT_FALSE(db_iter_->Valid()); +} + +TEST_F(DBIterWithMergeIterTest, InnerMergeIterator2) { + // Test Prev() when one child iterator is at its end. + db_iter_->Seek("g"); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "g"); + ASSERT_EQ(db_iter_->value().ToString(), "3"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "f"); + ASSERT_EQ(db_iter_->value().ToString(), "2"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "d"); + ASSERT_EQ(db_iter_->value().ToString(), "7"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "c"); + ASSERT_EQ(db_iter_->value().ToString(), "6"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "b"); + ASSERT_EQ(db_iter_->value().ToString(), "5"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "a"); + ASSERT_EQ(db_iter_->value().ToString(), "4"); +} + +TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) { + // Test Prev() when one child iterator is at its end but more rows + // are added. + db_iter_->Seek("f"); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "f"); + ASSERT_EQ(db_iter_->value().ToString(), "2"); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "MergeIterator::Prev:BeforeSeekToLast", + [&](void* arg) { internal_iter2_->Add("z", kTypeValue, "7", 12u); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "d"); + ASSERT_EQ(db_iter_->value().ToString(), "7"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "c"); + ASSERT_EQ(db_iter_->value().ToString(), "6"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "b"); + ASSERT_EQ(db_iter_->value().ToString(), "5"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "a"); + ASSERT_EQ(db_iter_->value().ToString(), "4"); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Not passing for a bug of not handling the case +TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace2) { + // Test Prev() when one child iterator is at its end but more rows + // are added. + db_iter_->Seek("f"); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "f"); + ASSERT_EQ(db_iter_->value().ToString(), "2"); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "MergeIterator::Prev:BeforeSeekToLast", [&](void* arg) { + internal_iter2_->Add("z", kTypeValue, "7", 12u); + internal_iter2_->Add("z", kTypeValue, "7", 11u); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "d"); + ASSERT_EQ(db_iter_->value().ToString(), "7"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "c"); + ASSERT_EQ(db_iter_->value().ToString(), "6"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "b"); + ASSERT_EQ(db_iter_->value().ToString(), "5"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "a"); + ASSERT_EQ(db_iter_->value().ToString(), "4"); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Not passing for a bug of not handling the case +TEST_F(DBIterWithMergeIterTest, DISABLED_InnerMergeIteratorDataRace3) { + // Test Prev() when one child iterator is at its end but more rows + // are added and max_skipped is triggered. + db_iter_->Seek("f"); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "f"); + ASSERT_EQ(db_iter_->value().ToString(), "2"); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "MergeIterator::Prev:BeforeSeekToLast", [&](void* arg) { + internal_iter2_->Add("z", kTypeValue, "7", 16u); + internal_iter2_->Add("z", kTypeValue, "7", 15u); + internal_iter2_->Add("z", kTypeValue, "7", 14u); + internal_iter2_->Add("z", kTypeValue, "7", 13u); + internal_iter2_->Add("z", kTypeValue, "7", 12u); + internal_iter2_->Add("z", kTypeValue, "7", 11u); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "d"); + ASSERT_EQ(db_iter_->value().ToString(), "7"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "c"); + ASSERT_EQ(db_iter_->value().ToString(), "6"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "b"); + ASSERT_EQ(db_iter_->value().ToString(), "5"); + db_iter_->Prev(); + ASSERT_TRUE(db_iter_->Valid()); + ASSERT_EQ(db_iter_->key().ToString(), "a"); + ASSERT_EQ(db_iter_->value().ToString(), "4"); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/table/merger.cc b/table/merger.cc index 30931caaa..a496fe468 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -19,6 +19,7 @@ #include "util/arena.h" #include "util/heap.h" #include "util/stop_watch.h" +#include "util/sync_point.h" #include "util/perf_context_imp.h" #include "util/autovector.h" @@ -182,6 +183,7 @@ class MergingIterator : public Iterator { child.Prev(); } else { // Child has no entries >= key(). Position at last entry. + TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast"); child.SeekToLast(); } }