DBIter to use IteratorWrapper for inner iterator (#5214)

Summary:
It's hard to get DBIter to directly use InternalIterator::NextAndGetResult() because the code change would be complicated. Instead, use IteratorWrapper, where Next() is already using NextAndGetResult(). Performance number is hard to measure because it is small and ther is variation. I run readseq many times, and there seems to be 1% gain.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5214

Differential Revision: D15003635

Pulled By: siying

fbshipit-source-id: 17af1965c409c2fe90cd85037fbd2c5a1364f82a
This commit is contained in:
Siying Dong 2019-04-23 10:51:50 -07:00 committed by Facebook Github Bot
parent 78a6e07c83
commit 72c8533f2c

View File

@ -22,6 +22,7 @@
#include "rocksdb/merge_operator.h"
#include "rocksdb/options.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "util/arena.h"
#include "util/filename.h"
#include "util/logging.h"
@ -149,8 +150,8 @@ class DBIter final: public Iterator {
if (pin_thru_lifetime_) {
pinned_iters_mgr_.StartPinning();
}
if (iter_) {
iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
if (iter_.iter()) {
iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
}
}
~DBIter() override {
@ -161,16 +162,12 @@ class DBIter final: public Iterator {
RecordTick(statistics_, NO_ITERATOR_DELETED);
ResetInternalKeysSkippedCounter();
local_stats_.BumpGlobalStatistics(statistics_);
if (!arena_mode_) {
delete iter_;
} else {
iter_->~InternalIterator();
}
iter_.DeleteIter(arena_mode_);
}
virtual void SetIter(InternalIterator* iter) {
assert(iter_ == nullptr);
iter_ = iter;
iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
assert(iter_.iter() == nullptr);
iter_.Set(iter);
iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
}
virtual ReadRangeDelAggregator* GetRangeDelAggregator() {
return &range_del_agg_;
@ -194,12 +191,12 @@ class DBIter final: public Iterator {
} else if (direction_ == kReverse) {
return pinned_value_;
} else {
return iter_->value();
return iter_.value();
}
}
Status status() const override {
if (status_.ok()) {
return iter_->status();
return iter_.status();
} else {
assert(!valid_);
return status_;
@ -216,7 +213,7 @@ class DBIter final: public Iterator {
}
if (prop_name == "rocksdb.iterator.super-version-number") {
// First try to pass the value returned from inner iterator.
return iter_->GetProperty(prop_name, prop);
return iter_.iter()->GetProperty(prop_name, prop);
} else if (prop_name == "rocksdb.iterator.is-key-pinned") {
if (valid_) {
*prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
@ -308,7 +305,7 @@ class DBIter final: public Iterator {
Logger* logger_;
UserComparatorWrapper user_comparator_;
const MergeOperator* const merge_operator_;
InternalIterator* iter_;
IteratorWrapper iter_;
ReadCallback* read_callback_;
// Max visible sequence number. It is normally the snapshot seq unless we have
// uncommitted data in db as in WriteUnCommitted.
@ -361,11 +358,11 @@ class DBIter final: public Iterator {
};
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
if (!ParseInternalKey(iter_->key(), ikey)) {
if (!ParseInternalKey(iter_.key(), ikey)) {
status_ = Status::Corruption("corrupted internal key in DBIter");
valid_ = false;
ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
iter_->key().ToString(true).c_str());
iter_.key().ToString(true).c_str());
return false;
} else {
return true;
@ -393,13 +390,13 @@ void DBIter::Next() {
// Next() without checking the current key.
// If the current key is a merge, very likely iter already points
// to the next internal position.
assert(iter_->Valid());
iter_->Next();
assert(iter_.Valid());
iter_.Next();
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
}
local_stats_.next_count_++;
if (ok && iter_->Valid()) {
if (ok && iter_.Valid()) {
FindNextUserEntry(true /* skipping the current user key */,
prefix_same_as_start_);
} else {
@ -433,7 +430,7 @@ inline bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
// Actual implementation of DBIter::FindNextUserEntry()
inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
// Loop until we hit an acceptable entry to yield
assert(iter_->Valid());
assert(iter_.Valid());
assert(status_.ok());
assert(direction_ == kForward);
current_entry_is_merged_ = false;
@ -495,8 +492,8 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
return true;
} else {
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
ikey_.user_key, !pin_thru_lifetime_ ||
!iter_.iter()->IsKeyPinned() /* copy */);
skipping = true;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
}
@ -516,14 +513,16 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
} else {
// this key and all previous versions shouldn't be included,
// skipping
saved_key_.SetUserKey(ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ ||
!iter_.iter()->IsKeyPinned() /* copy */);
skipping = true;
}
} else {
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
ikey_.user_key, !pin_thru_lifetime_ ||
!iter_.iter()->IsKeyPinned() /* copy */);
if (range_del_agg_.ShouldDelete(
ikey_, RangeDelPositioningMode::kForwardTraversal)) {
// Arrange to skip all upcoming entries for this key since
@ -553,7 +552,7 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
case kTypeMerge:
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
!pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
if (range_del_agg_.ShouldDelete(
ikey_, RangeDelPositioningMode::kForwardTraversal)) {
// Arrange to skip all upcoming entries for this key since
@ -587,7 +586,7 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
} else {
saved_key_.SetUserKey(
ikey_.user_key,
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
!iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
skipping = false;
num_skipped = 0;
}
@ -616,20 +615,20 @@ inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check)
ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
kValueTypeForSeek));
}
iter_->Seek(last_key);
iter_.Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
} else {
iter_->Next();
iter_.Next();
}
} while (iter_->Valid());
} while (iter_.Valid());
valid_ = false;
return iter_->status().ok();
return iter_.status().ok();
}
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
// PRE: iter_->key() points to the first merge type entry
// PRE: iter_.key() points to the first merge type entry
// saved_key_ stores the user key
// POST: saved_value_ has the merged value for the user key
// iter_ points to the next entry (or invalid)
@ -645,13 +644,13 @@ bool DBIter::MergeValuesNewToOld() {
TempPinData();
merge_context_.Clear();
// Start the merge process by pushing the first operand
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
ParsedInternalKey ikey;
Status s;
for (iter_->Next(); iter_->Valid(); iter_->Next()) {
for (iter_.Next(); iter_.Valid(); iter_.Next()) {
TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
if (!ParseKey(&ikey)) {
return false;
@ -665,12 +664,12 @@ bool DBIter::MergeValuesNewToOld() {
ikey, RangeDelPositioningMode::kForwardTraversal)) {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_->Next();
iter_.Next();
break;
} else if (kTypeValue == ikey.type) {
// hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done!
const Slice val = iter_->value();
const Slice val = iter_.value();
s = MergeHelper::TimedFullMerge(
merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_, &pinned_value_, true);
@ -680,8 +679,8 @@ bool DBIter::MergeValuesNewToOld() {
return false;
}
// iter_ is positioned after put
iter_->Next();
if (!iter_->status().ok()) {
iter_.Next();
if (!iter_.status().ok()) {
valid_ = false;
return false;
}
@ -689,8 +688,8 @@ bool DBIter::MergeValuesNewToOld() {
} else if (kTypeMerge == ikey.type) {
// hit a merge, add the value as an operand and run associative merge.
// when complete, add result to operands and continue.
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (kTypeBlobIndex == ikey.type) {
if (!allow_blob_) {
@ -709,7 +708,7 @@ bool DBIter::MergeValuesNewToOld() {
}
}
if (!iter_->status().ok()) {
if (!iter_.status().ok()) {
valid_ = false;
return false;
}
@ -758,21 +757,21 @@ void DBIter::Prev() {
}
bool DBIter::ReverseToForward() {
assert(iter_->status().ok());
assert(iter_.status().ok());
// When moving backwards, iter_ is positioned on _previous_ key, which may
// not exist or may have different prefix than the current key().
// If that's the case, seek iter_ to current key.
if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) {
if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_.Valid()) {
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(
saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
iter_->Seek(last_key.GetInternalKey());
iter_.Seek(last_key.GetInternalKey());
}
direction_ = kForward;
// Skip keys less than the current key() (a.k.a. saved_key_).
while (iter_->Valid()) {
while (iter_.Valid()) {
ParsedInternalKey ikey;
if (!ParseKey(&ikey)) {
return false;
@ -780,10 +779,10 @@ bool DBIter::ReverseToForward() {
if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
return true;
}
iter_->Next();
iter_.Next();
}
if (!iter_->status().ok()) {
if (!iter_.status().ok()) {
valid_ = false;
return false;
}
@ -793,14 +792,14 @@ bool DBIter::ReverseToForward() {
// Move iter_ to the key before saved_key_.
bool DBIter::ReverseToBackward() {
assert(iter_->status().ok());
assert(iter_.status().ok());
// When current_entry_is_merged_ is true, iter_ may be positioned on the next
// key, which may not exist or may have prefix different from current.
// If that's the case, seek to saved_key_.
if (current_entry_is_merged_ &&
((prefix_extractor_ != nullptr && !total_order_seek_) ||
!iter_->Valid())) {
!iter_.Valid())) {
IterKey last_key;
// Using kMaxSequenceNumber and kValueTypeForSeek
// (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
@ -808,15 +807,15 @@ bool DBIter::ReverseToBackward() {
last_key.SetInternalKey(ParsedInternalKey(
saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
if (prefix_extractor_ != nullptr && !total_order_seek_) {
iter_->SeekForPrev(last_key.GetInternalKey());
iter_.SeekForPrev(last_key.GetInternalKey());
} else {
// Some iterators may not support SeekForPrev(), so we avoid using it
// when prefix seek mode is disabled. This is somewhat expensive
// (an extra Prev(), as well as an extra change of direction of iter_),
// so we may need to reconsider it later.
iter_->Seek(last_key.GetInternalKey());
if (!iter_->Valid() && iter_->status().ok()) {
iter_->SeekToLast();
iter_.Seek(last_key.GetInternalKey());
if (!iter_.Valid() && iter_.status().ok()) {
iter_.SeekToLast();
}
}
}
@ -826,10 +825,10 @@ bool DBIter::ReverseToBackward() {
}
void DBIter::PrevInternal() {
while (iter_->Valid()) {
while (iter_.Valid()) {
saved_key_.SetUserKey(
ExtractUserKey(iter_->key()),
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
ExtractUserKey(iter_.key()),
!iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
if (prefix_extractor_ && prefix_same_as_start_ &&
prefix_extractor_->Transform(saved_key_.GetUserKey())
@ -883,7 +882,7 @@ void DBIter::PrevInternal() {
// POST: iter_ is positioned on one of the entries equal to saved_key_, or on
// the entry just before them, or on the entry just after them.
bool DBIter::FindValueForCurrentKey() {
assert(iter_->Valid());
assert(iter_.Valid());
merge_context_.Clear();
current_entry_is_merged_ = false;
// last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
@ -895,7 +894,7 @@ bool DBIter::FindValueForCurrentKey() {
ReleaseTempPinnedData();
TempPinData();
size_t num_skipped = 0;
while (iter_->Valid()) {
while (iter_.Valid()) {
ParsedInternalKey ikey;
if (!ParseKey(&ikey)) {
return false;
@ -925,8 +924,8 @@ bool DBIter::FindValueForCurrentKey() {
last_key_entry_type = kTypeRangeDeletion;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
assert(iter_->IsValuePinned());
pinned_value_ = iter_->value();
assert(iter_.iter()->IsValuePinned());
pinned_value_ = iter_.value();
}
merge_context_.Clear();
last_not_merge_type = last_key_entry_type;
@ -947,7 +946,8 @@ bool DBIter::FindValueForCurrentKey() {
} else {
assert(merge_operator_ != nullptr);
merge_context_.PushOperandBack(
iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
iter_.value(),
iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
}
break;
@ -956,11 +956,11 @@ bool DBIter::FindValueForCurrentKey() {
}
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
iter_->Prev();
iter_.Prev();
++num_skipped;
}
if (!iter_->status().ok()) {
if (!iter_.status().ok()) {
valid_ = false;
return false;
}
@ -1040,16 +1040,16 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
std::string last_key;
AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
sequence_, kValueTypeForSeek));
iter_->Seek(last_key);
iter_.Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
// In case read_callback presents, the value we seek to may not be visible.
// Find the next value that's visible.
ParsedInternalKey ikey;
while (true) {
if (!iter_->Valid()) {
if (!iter_.Valid()) {
valid_ = false;
return iter_->status().ok();
return iter_.status().ok();
}
if (!ParseKey(&ikey)) {
@ -1067,7 +1067,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
break;
}
iter_->Next();
iter_.Next();
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
@ -1085,8 +1085,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
return false;
}
if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
assert(iter_->IsValuePinned());
pinned_value_ = iter_->value();
assert(iter_.iter()->IsValuePinned());
pinned_value_ = iter_.value();
valid_ = true;
return true;
}
@ -1096,13 +1096,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
assert(ikey.type == kTypeMerge);
current_entry_is_merged_ = true;
merge_context_.Clear();
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
while (true) {
iter_->Next();
iter_.Next();
if (!iter_->Valid()) {
if (!iter_->status().ok()) {
if (!iter_.Valid()) {
if (!iter_.status().ok()) {
valid_ = false;
return false;
}
@ -1120,7 +1120,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
ikey, RangeDelPositioningMode::kForwardTraversal)) {
break;
} else if (ikey.type == kTypeValue) {
const Slice val = iter_->value();
const Slice val = iter_.value();
Status s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), &val,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
@ -1133,8 +1133,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
valid_ = true;
return true;
} else if (ikey.type == kTypeMerge) {
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (ikey.type == kTypeBlobIndex) {
if (!allow_blob_) {
@ -1166,13 +1166,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
// Make sure we leave iter_ in a good state. If it's valid and we don't care
// about prefixes, that's already good enough. Otherwise it needs to be
// seeked to the current key.
if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) {
if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_.Valid()) {
if (prefix_extractor_ != nullptr && !total_order_seek_) {
iter_->SeekForPrev(last_key);
iter_.SeekForPrev(last_key);
} else {
iter_->Seek(last_key);
if (!iter_->Valid() && iter_->status().ok()) {
iter_->SeekToLast();
iter_.Seek(last_key);
if (!iter_.Valid() && iter_.status().ok()) {
iter_.SeekToLast();
}
}
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
@ -1187,7 +1187,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
bool DBIter::FindUserKeyBeforeSavedKey() {
assert(status_.ok());
size_t num_skipped = 0;
while (iter_->Valid()) {
while (iter_.Valid()) {
ParsedInternalKey ikey;
if (!ParseKey(&ikey)) {
return false;
@ -1215,19 +1215,19 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
// It would be more efficient to use SeekForPrev() here, but some
// iterators may not support it.
iter_->Seek(last_key.GetInternalKey());
iter_.Seek(last_key.GetInternalKey());
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
if (!iter_->Valid()) {
if (!iter_.Valid()) {
break;
}
} else {
++num_skipped;
}
iter_->Prev();
iter_.Prev();
}
if (!iter_->status().ok()) {
if (!iter_.status().ok()) {
valid_ = false;
return false;
}
@ -1285,11 +1285,11 @@ void DBIter::Seek(const Slice& target) {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->Seek(saved_key_.GetInternalKey());
iter_.Seek(saved_key_.GetInternalKey());
range_del_agg_.InvalidateRangeDelMapPositions();
}
RecordTick(statistics_, NUMBER_DB_SEEK);
if (iter_->Valid()) {
if (iter_.Valid()) {
if (prefix_extractor_ && prefix_same_as_start_) {
prefix_start_key_ = prefix_extractor_->Transform(target);
}
@ -1337,7 +1337,7 @@ void DBIter::SeekForPrev(const Slice& target) {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->SeekForPrev(saved_key_.GetInternalKey());
iter_.SeekForPrev(saved_key_.GetInternalKey());
range_del_agg_.InvalidateRangeDelMapPositions();
}
@ -1348,7 +1348,7 @@ void DBIter::SeekForPrev(const Slice& target) {
#endif // ROCKSDB_LITE
RecordTick(statistics_, NUMBER_DB_SEEK);
if (iter_->Valid()) {
if (iter_.Valid()) {
if (prefix_extractor_ && prefix_same_as_start_) {
prefix_start_key_ = prefix_extractor_->Transform(target);
}
@ -1393,15 +1393,15 @@ void DBIter::SeekToFirst() {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->SeekToFirst();
iter_.SeekToFirst();
range_del_agg_.InvalidateRangeDelMapPositions();
}
RecordTick(statistics_, NUMBER_DB_SEEK);
if (iter_->Valid()) {
if (iter_.Valid()) {
saved_key_.SetUserKey(
ExtractUserKey(iter_->key()),
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
ExtractUserKey(iter_.key()),
!iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
FindNextUserEntry(false /* not skipping */, false /* no prefix check */);
if (statistics_ != nullptr) {
if (valid_) {
@ -1445,7 +1445,7 @@ void DBIter::SeekToLast() {
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->SeekToLast();
iter_.SeekToLast();
range_del_agg_.InvalidateRangeDelMapPositions();
}
PrevInternal();