diff --git a/db/db_iter.cc b/db/db_iter.cc index 3b407daf3..08aaef333 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -131,9 +131,8 @@ class DBIter: public Iterator { } } virtual ~DBIter() { - if (pin_thru_lifetime_) { - pinned_iters_mgr_.ReleasePinnedIterators(); - } + // Release pinned data if any + pinned_iters_mgr_.ReleasePinnedIterators(); RecordTick(statistics_, NO_ITERATORS, -1); local_stats_.BumpGlobalStatistics(statistics_); if (!arena_mode_) { @@ -154,8 +153,13 @@ class DBIter: public Iterator { } virtual Slice value() const override { assert(valid_); - return (direction_ == kForward && !current_entry_is_merged_) ? - iter_->value() : saved_value_; + if (current_entry_is_merged_) { + return saved_value_; + } else if (direction_ == kReverse) { + return pinned_value_; + } else { + return iter_->value(); + } } virtual Status status() const override { if (status_.ok()) { @@ -206,6 +210,21 @@ class DBIter: public Iterator { bool ParseKey(ParsedInternalKey* key); void MergeValuesNewToOld(); + // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() + // is called + void TempPinData() { + if (!pin_thru_lifetime_) { + pinned_iters_mgr_.StartPinning(); + } + } + + // Release blocks pinned by TempPinData() + void ReleaseTempPinnedData() { + if (!pin_thru_lifetime_) { + pinned_iters_mgr_.ReleasePinnedIterators(); + } + } + inline void ClearSavedValue() { if (saved_value_.capacity() > 1048576) { std::string empty; @@ -227,6 +246,7 @@ class DBIter: public Iterator { Status status_; IterKey saved_key_; std::string saved_value_; + Slice pinned_value_; Direction direction_; bool valid_; bool current_entry_is_merged_; @@ -266,6 +286,8 @@ void DBIter::Next() { assert(valid_); if (direction_ == kReverse) { + // We only pin blocks when doing kReverse + ReleaseTempPinnedData(); FindNextUserKey(); direction_ = kForward; if (!iter_->Valid()) { @@ -472,6 +494,7 @@ void DBIter::Prev() { if (direction_ == kForward) { ReverseToBackward(); } + ReleaseTempPinnedData(); PrevInternal(); if (statistics_ != nullptr) { local_stats_.prev_count_++; @@ -555,6 +578,7 @@ void DBIter::PrevInternal() { bool DBIter::FindValueForCurrentKey() { assert(iter_->Valid()); merge_context_.Clear(); + current_entry_is_merged_ = false; // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or // kTypeValue) ValueType last_not_merge_type = kTypeDeletion; @@ -575,7 +599,9 @@ bool DBIter::FindValueForCurrentKey() { switch (last_key_entry_type) { case kTypeValue: merge_context_.Clear(); - saved_value_ = iter_->value().ToString(); + ReleaseTempPinnedData(); + TempPinData(); + pinned_value_ = iter_->value(); last_not_merge_type = kTypeValue; break; case kTypeDeletion: @@ -605,6 +631,7 @@ bool DBIter::FindValueForCurrentKey() { valid_ = false; return false; case kTypeMerge: + current_entry_is_merged_ = true; if (last_not_merge_type == kTypeDeletion) { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); @@ -615,12 +642,10 @@ bool DBIter::FindValueForCurrentKey() { timer.ElapsedNanos()); } else { assert(last_not_merge_type == kTypeValue); - std::string last_put_value = saved_value_; - Slice temp_slice(last_put_value); { StopWatchNano timer(env_, statistics_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); - user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice, + user_merge_operator_->FullMerge(saved_key_.GetKey(), &pinned_value_, merge_context_.GetOperands(), &saved_value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, @@ -655,7 +680,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (ikey.type == kTypeValue || ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) { if (ikey.type == kTypeValue) { - saved_value_ = iter_->value().ToString(); + ReleaseTempPinnedData(); + TempPinData(); + pinned_value_ = iter_->value(); valid_ = true; return true; } @@ -665,6 +692,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { // kTypeMerge. We need to collect all kTypeMerge values and save them // in operands + current_entry_is_merged_ = true; merge_context_.Clear(); while (iter_->Valid() && user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) && @@ -767,6 +795,7 @@ void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) { void DBIter::Seek(const Slice& target) { StopWatch sw(env_, statistics_, DB_SEEK); + ReleaseTempPinnedData(); saved_key_.Clear(); // now savved_key is used to store internal key. saved_key_.SetInternalKey(target, sequence_); @@ -809,6 +838,7 @@ void DBIter::SeekToFirst() { max_skip_ = std::numeric_limits::max(); } direction_ = kForward; + ReleaseTempPinnedData(); ClearSavedValue(); { @@ -841,6 +871,7 @@ void DBIter::SeekToLast() { max_skip_ = std::numeric_limits::max(); } direction_ = kReverse; + ReleaseTempPinnedData(); ClearSavedValue(); { diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index e76f09e85..3a00e5db6 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -1228,6 +1228,164 @@ TEST_F(DBIteratorTest, PinnedDataIteratorReadAfterUpdate) { delete iter; } +TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocks) { + Options options = CurrentOptions(); + BlockBasedTableOptions table_options; + table_options.block_size = 1; // every block will contain one entry + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.merge_operator = MergeOperators::CreateStringAppendTESTOperator(); + options.disable_auto_compactions = true; + options.max_sequential_skip_in_iterations = 8; + + DestroyAndReopen(options); + + // Putting such deletes will force DBIter::Prev() to fallback to a Seek + for (int file_num = 0; file_num < 10; file_num++) { + ASSERT_OK(Delete("key4")); + ASSERT_OK(Flush()); + } + + // First File containing 5 blocks of puts + ASSERT_OK(Put("key1", "val1.0")); + ASSERT_OK(Put("key2", "val2.0")); + ASSERT_OK(Put("key3", "val3.0")); + ASSERT_OK(Put("key4", "val4.0")); + ASSERT_OK(Put("key5", "val5.0")); + ASSERT_OK(Flush()); + + // Second file containing 9 blocks of merge operands + ASSERT_OK(db_->Merge(WriteOptions(), "key1", "val1.1")); + ASSERT_OK(db_->Merge(WriteOptions(), "key1", "val1.2")); + + ASSERT_OK(db_->Merge(WriteOptions(), "key2", "val2.1")); + ASSERT_OK(db_->Merge(WriteOptions(), "key2", "val2.2")); + ASSERT_OK(db_->Merge(WriteOptions(), "key2", "val2.3")); + + ASSERT_OK(db_->Merge(WriteOptions(), "key3", "val3.1")); + ASSERT_OK(db_->Merge(WriteOptions(), "key3", "val3.2")); + ASSERT_OK(db_->Merge(WriteOptions(), "key3", "val3.3")); + ASSERT_OK(db_->Merge(WriteOptions(), "key3", "val3.4")); + ASSERT_OK(Flush()); + + { + ReadOptions ro; + ro.fill_cache = false; + Iterator* iter = db_->NewIterator(ro); + + iter->SeekToLast(); + ASSERT_EQ(iter->key().ToString(), "key5"); + ASSERT_EQ(iter->value().ToString(), "val5.0"); + + iter->Prev(); + ASSERT_EQ(iter->key().ToString(), "key4"); + ASSERT_EQ(iter->value().ToString(), "val4.0"); + + iter->Prev(); + ASSERT_EQ(iter->key().ToString(), "key3"); + ASSERT_EQ(iter->value().ToString(), "val3.0,val3.1,val3.2,val3.3,val3.4"); + + iter->Prev(); + ASSERT_EQ(iter->key().ToString(), "key2"); + ASSERT_EQ(iter->value().ToString(), "val2.0,val2.1,val2.2,val2.3"); + + iter->Prev(); + ASSERT_EQ(iter->key().ToString(), "key1"); + ASSERT_EQ(iter->value().ToString(), "val1.0,val1.1,val1.2"); + + delete iter; + } +} + +TEST_F(DBIteratorTest, IterPrevKeyCrossingBlocksRandomized) { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreateStringAppendTESTOperator(); + options.disable_auto_compactions = true; + options.level0_slowdown_writes_trigger = (1 << 30); + options.level0_stop_writes_trigger = (1 << 30); + options.max_sequential_skip_in_iterations = 8; + DestroyAndReopen(options); + + const int kNumKeys = 500; + // Small number of merge operands to make sure that DBIter::Prev() dont + // fall back to Seek() + const int kNumMergeOperands = 3; + // Use value size that will make sure that every block contain 1 key + const int kValSize = + static_cast(BlockBasedTableOptions().block_size) * 4; + // Percentage of keys that wont get merge operations + const int kNoMergeOpPercentage = 20; + // Percentage of keys that will be deleted + const int kDeletePercentage = 10; + + // For half of the key range we will write multiple deletes first to + // force DBIter::Prev() to fall back to Seek() + for (int file_num = 0; file_num < 10; file_num++) { + for (int i = 0; i < kNumKeys; i += 2) { + ASSERT_OK(Delete(Key(i))); + } + ASSERT_OK(Flush()); + } + + Random rnd(301); + std::map true_data; + std::string gen_key; + std::string gen_val; + + for (int i = 0; i < kNumKeys; i++) { + gen_key = Key(i); + gen_val = RandomString(&rnd, kValSize); + + ASSERT_OK(Put(gen_key, gen_val)); + true_data[gen_key] = gen_val; + } + ASSERT_OK(Flush()); + + // Separate values and merge operands in different file so that we + // make sure that we dont merge them while flushing but actually + // merge them in the read path + for (int i = 0; i < kNumKeys; i++) { + if (rnd.OneIn(static_cast(100.0 / kNoMergeOpPercentage))) { + // Dont give merge operations for some keys + continue; + } + + for (int j = 0; j < kNumMergeOperands; j++) { + gen_key = Key(i); + gen_val = RandomString(&rnd, kValSize); + + ASSERT_OK(db_->Merge(WriteOptions(), gen_key, gen_val)); + true_data[gen_key] += "," + gen_val; + } + } + ASSERT_OK(Flush()); + + for (int i = 0; i < kNumKeys; i++) { + if (rnd.OneIn(static_cast(100.0 / kDeletePercentage))) { + gen_key = Key(i); + + ASSERT_OK(Delete(gen_key)); + true_data.erase(gen_key); + } + } + ASSERT_OK(Flush()); + + { + ReadOptions ro; + ro.fill_cache = false; + Iterator* iter = db_->NewIterator(ro); + auto data_iter = true_data.rbegin(); + + for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { + ASSERT_EQ(iter->key().ToString(), data_iter->first); + ASSERT_EQ(iter->value().ToString(), data_iter->second); + data_iter++; + } + ASSERT_EQ(data_iter, true_data.rend()); + + delete iter; + } +} + TEST_F(DBIteratorTest, IteratorWithLocalStatistics) { Options options = CurrentOptions(); options.statistics = rocksdb::CreateDBStatistics();