68a8e6b8fa
Summary: This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice> This diff is stacked on top of D56493 and D56511 In this diff we - Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future - Replace std::deque<std::string> with std::vector<Slice> to pass operands - Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187) - Allow FullMergeV2 output to be an existing operand ``` [Everything in Memtable | 10K operands | 10 KB each | 1 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s [master] readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s ``` ``` [Everything in Memtable | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s [master] readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 1 operand per key] [FullMergeV2] $ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s [master] readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions [FullMergeV2] readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s [master] readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s ``` Test Plan: COMPILE_WITH_ASAN=1 make check -j64 Reviewers: yhchiang, andrewkr, sdong Reviewed By: sdong Subscribers: lovro, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D57075
246 lines
7.6 KiB
C++
246 lines
7.6 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "table/two_level_iterator.h"
|
|
|
|
#include "db/pinned_iterators_manager.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/table.h"
|
|
#include "table/block.h"
|
|
#include "table/format.h"
|
|
#include "util/arena.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
namespace {
|
|
|
|
class TwoLevelIterator : public InternalIterator {
|
|
public:
|
|
explicit TwoLevelIterator(TwoLevelIteratorState* state,
|
|
InternalIterator* first_level_iter,
|
|
bool need_free_iter_and_state);
|
|
|
|
virtual ~TwoLevelIterator() {
|
|
// Assert that the TwoLevelIterator is never deleted while Pinning is
|
|
// Enabled.
|
|
assert(!pinned_iters_mgr_ ||
|
|
(pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled()));
|
|
first_level_iter_.DeleteIter(!need_free_iter_and_state_);
|
|
second_level_iter_.DeleteIter(false);
|
|
if (need_free_iter_and_state_) {
|
|
delete state_;
|
|
} else {
|
|
state_->~TwoLevelIteratorState();
|
|
}
|
|
}
|
|
|
|
virtual void Seek(const Slice& target) override;
|
|
virtual void SeekToFirst() override;
|
|
virtual void SeekToLast() override;
|
|
virtual void Next() override;
|
|
virtual void Prev() override;
|
|
|
|
virtual bool Valid() const override { return second_level_iter_.Valid(); }
|
|
virtual Slice key() const override {
|
|
assert(Valid());
|
|
return second_level_iter_.key();
|
|
}
|
|
virtual Slice value() const override {
|
|
assert(Valid());
|
|
return second_level_iter_.value();
|
|
}
|
|
virtual Status status() const override {
|
|
// It'd be nice if status() returned a const Status& instead of a Status
|
|
if (!first_level_iter_.status().ok()) {
|
|
return first_level_iter_.status();
|
|
} else if (second_level_iter_.iter() != nullptr &&
|
|
!second_level_iter_.status().ok()) {
|
|
return second_level_iter_.status();
|
|
} else {
|
|
return status_;
|
|
}
|
|
}
|
|
virtual void SetPinnedItersMgr(
|
|
PinnedIteratorsManager* pinned_iters_mgr) override {
|
|
pinned_iters_mgr_ = pinned_iters_mgr;
|
|
first_level_iter_.SetPinnedItersMgr(pinned_iters_mgr);
|
|
if (second_level_iter_.iter()) {
|
|
second_level_iter_.SetPinnedItersMgr(pinned_iters_mgr);
|
|
}
|
|
}
|
|
virtual bool IsKeyPinned() const override {
|
|
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
|
|
second_level_iter_.iter() && second_level_iter_.IsKeyPinned();
|
|
}
|
|
virtual bool IsValuePinned() const override {
|
|
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
|
|
second_level_iter_.iter() && second_level_iter_.IsValuePinned();
|
|
}
|
|
|
|
private:
|
|
void SaveError(const Status& s) {
|
|
if (status_.ok() && !s.ok()) status_ = s;
|
|
}
|
|
void SkipEmptyDataBlocksForward();
|
|
void SkipEmptyDataBlocksBackward();
|
|
void SetSecondLevelIterator(InternalIterator* iter);
|
|
void InitDataBlock();
|
|
|
|
TwoLevelIteratorState* state_;
|
|
IteratorWrapper first_level_iter_;
|
|
IteratorWrapper second_level_iter_; // May be nullptr
|
|
bool need_free_iter_and_state_;
|
|
PinnedIteratorsManager* pinned_iters_mgr_;
|
|
Status status_;
|
|
// If second_level_iter is non-nullptr, then "data_block_handle_" holds the
|
|
// "index_value" passed to block_function_ to create the second_level_iter.
|
|
std::string data_block_handle_;
|
|
};
|
|
|
|
TwoLevelIterator::TwoLevelIterator(TwoLevelIteratorState* state,
|
|
InternalIterator* first_level_iter,
|
|
bool need_free_iter_and_state)
|
|
: state_(state),
|
|
first_level_iter_(first_level_iter),
|
|
need_free_iter_and_state_(need_free_iter_and_state),
|
|
pinned_iters_mgr_(nullptr) {}
|
|
|
|
void TwoLevelIterator::Seek(const Slice& target) {
|
|
if (state_->check_prefix_may_match &&
|
|
!state_->PrefixMayMatch(target)) {
|
|
SetSecondLevelIterator(nullptr);
|
|
return;
|
|
}
|
|
first_level_iter_.Seek(target);
|
|
|
|
InitDataBlock();
|
|
if (second_level_iter_.iter() != nullptr) {
|
|
second_level_iter_.Seek(target);
|
|
}
|
|
SkipEmptyDataBlocksForward();
|
|
}
|
|
|
|
void TwoLevelIterator::SeekToFirst() {
|
|
first_level_iter_.SeekToFirst();
|
|
InitDataBlock();
|
|
if (second_level_iter_.iter() != nullptr) {
|
|
second_level_iter_.SeekToFirst();
|
|
}
|
|
SkipEmptyDataBlocksForward();
|
|
}
|
|
|
|
void TwoLevelIterator::SeekToLast() {
|
|
first_level_iter_.SeekToLast();
|
|
InitDataBlock();
|
|
if (second_level_iter_.iter() != nullptr) {
|
|
second_level_iter_.SeekToLast();
|
|
}
|
|
SkipEmptyDataBlocksBackward();
|
|
}
|
|
|
|
void TwoLevelIterator::Next() {
|
|
assert(Valid());
|
|
second_level_iter_.Next();
|
|
SkipEmptyDataBlocksForward();
|
|
}
|
|
|
|
void TwoLevelIterator::Prev() {
|
|
assert(Valid());
|
|
second_level_iter_.Prev();
|
|
SkipEmptyDataBlocksBackward();
|
|
}
|
|
|
|
|
|
void TwoLevelIterator::SkipEmptyDataBlocksForward() {
|
|
while (second_level_iter_.iter() == nullptr ||
|
|
(!second_level_iter_.Valid() &&
|
|
!second_level_iter_.status().IsIncomplete())) {
|
|
// Move to next block
|
|
if (!first_level_iter_.Valid()) {
|
|
SetSecondLevelIterator(nullptr);
|
|
return;
|
|
}
|
|
first_level_iter_.Next();
|
|
InitDataBlock();
|
|
if (second_level_iter_.iter() != nullptr) {
|
|
second_level_iter_.SeekToFirst();
|
|
}
|
|
}
|
|
}
|
|
|
|
void TwoLevelIterator::SkipEmptyDataBlocksBackward() {
|
|
while (second_level_iter_.iter() == nullptr ||
|
|
(!second_level_iter_.Valid() &&
|
|
!second_level_iter_.status().IsIncomplete())) {
|
|
// Move to next block
|
|
if (!first_level_iter_.Valid()) {
|
|
SetSecondLevelIterator(nullptr);
|
|
return;
|
|
}
|
|
first_level_iter_.Prev();
|
|
InitDataBlock();
|
|
if (second_level_iter_.iter() != nullptr) {
|
|
second_level_iter_.SeekToLast();
|
|
}
|
|
}
|
|
}
|
|
|
|
void TwoLevelIterator::SetSecondLevelIterator(InternalIterator* iter) {
|
|
if (second_level_iter_.iter() != nullptr) {
|
|
SaveError(second_level_iter_.status());
|
|
}
|
|
|
|
if (pinned_iters_mgr_ && iter) {
|
|
iter->SetPinnedItersMgr(pinned_iters_mgr_);
|
|
}
|
|
|
|
InternalIterator* old_iter = second_level_iter_.Set(iter);
|
|
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
|
|
pinned_iters_mgr_->PinIteratorIfNeeded(old_iter);
|
|
} else {
|
|
delete old_iter;
|
|
}
|
|
}
|
|
|
|
void TwoLevelIterator::InitDataBlock() {
|
|
if (!first_level_iter_.Valid()) {
|
|
SetSecondLevelIterator(nullptr);
|
|
} else {
|
|
Slice handle = first_level_iter_.value();
|
|
if (second_level_iter_.iter() != nullptr &&
|
|
!second_level_iter_.status().IsIncomplete() &&
|
|
handle.compare(data_block_handle_) == 0) {
|
|
// second_level_iter is already constructed with this iterator, so
|
|
// no need to change anything
|
|
} else {
|
|
InternalIterator* iter = state_->NewSecondaryIterator(handle);
|
|
data_block_handle_.assign(handle.data(), handle.size());
|
|
SetSecondLevelIterator(iter);
|
|
}
|
|
}
|
|
}
|
|
|
|
} // namespace
|
|
|
|
InternalIterator* NewTwoLevelIterator(TwoLevelIteratorState* state,
|
|
InternalIterator* first_level_iter,
|
|
Arena* arena,
|
|
bool need_free_iter_and_state) {
|
|
if (arena == nullptr) {
|
|
return new TwoLevelIterator(state, first_level_iter,
|
|
need_free_iter_and_state);
|
|
} else {
|
|
auto mem = arena->AllocateAligned(sizeof(TwoLevelIterator));
|
|
return new (mem)
|
|
TwoLevelIterator(state, first_level_iter, need_free_iter_and_state);
|
|
}
|
|
}
|
|
|
|
} // namespace rocksdb
|