68a8e6b8fa
Summary: This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice> This diff is stacked on top of D56493 and D56511 In this diff we - Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future - Replace std::deque<std::string> with std::vector<Slice> to pass operands - Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187) - Allow FullMergeV2 output to be an existing operand ``` [Everything in Memtable | 10K operands | 10 KB each | 1 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s [master] readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s ``` ``` [Everything in Memtable | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s [master] readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 1 operand per key] [FullMergeV2] $ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s [master] readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions [FullMergeV2] readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s [master] readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s ``` Test Plan: COMPILE_WITH_ASAN=1 make check -j64 Reviewers: yhchiang, andrewkr, sdong Reviewed By: sdong Subscribers: lovro, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D57075
156 lines
5.8 KiB
C++
156 lines
5.8 KiB
C++
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
#pragma once
|
|
|
|
#include <algorithm>
|
|
#include <deque>
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
#include "db/compaction.h"
|
|
#include "db/merge_helper.h"
|
|
#include "db/pinned_iterators_manager.h"
|
|
#include "rocksdb/compaction_filter.h"
|
|
#include "util/log_buffer.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
struct CompactionIteratorStats {
|
|
// Compaction statistics
|
|
int64_t num_record_drop_user = 0;
|
|
int64_t num_record_drop_hidden = 0;
|
|
int64_t num_record_drop_obsolete = 0;
|
|
uint64_t total_filter_time = 0;
|
|
|
|
// Input statistics
|
|
// TODO(noetzli): The stats are incomplete. They are lacking everything
|
|
// consumed by MergeHelper.
|
|
uint64_t num_input_records = 0;
|
|
uint64_t num_input_deletion_records = 0;
|
|
uint64_t num_input_corrupt_records = 0;
|
|
uint64_t total_input_raw_key_bytes = 0;
|
|
uint64_t total_input_raw_value_bytes = 0;
|
|
};
|
|
|
|
class CompactionIterator {
|
|
public:
|
|
CompactionIterator(InternalIterator* input, const Comparator* cmp,
|
|
MergeHelper* merge_helper, SequenceNumber last_sequence,
|
|
std::vector<SequenceNumber>* snapshots,
|
|
SequenceNumber earliest_write_conflict_snapshot, Env* env,
|
|
bool expect_valid_internal_key,
|
|
const Compaction* compaction = nullptr,
|
|
const CompactionFilter* compaction_filter = nullptr,
|
|
LogBuffer* log_buffer = nullptr);
|
|
|
|
~CompactionIterator();
|
|
|
|
void ResetRecordCounts();
|
|
|
|
// Seek to the beginning of the compaction iterator output.
|
|
//
|
|
// REQUIRED: Call only once.
|
|
void SeekToFirst();
|
|
|
|
// Produces the next record in the compaction.
|
|
//
|
|
// REQUIRED: SeekToFirst() has been called.
|
|
void Next();
|
|
|
|
// Getters
|
|
const Slice& key() const { return key_; }
|
|
const Slice& value() const { return value_; }
|
|
const Status& status() const { return status_; }
|
|
const ParsedInternalKey& ikey() const { return ikey_; }
|
|
bool Valid() const { return valid_; }
|
|
const Slice& user_key() const { return current_user_key_; }
|
|
const CompactionIteratorStats& iter_stats() const { return iter_stats_; }
|
|
|
|
private:
|
|
// Processes the input stream to find the next output
|
|
void NextFromInput();
|
|
|
|
// Do last preparations before presenting the output to the callee. At this
|
|
// point this only zeroes out the sequence number if possible for better
|
|
// compression.
|
|
void PrepareOutput();
|
|
|
|
// Given a sequence number, return the sequence number of the
|
|
// earliest snapshot that this sequence number is visible in.
|
|
// The snapshots themselves are arranged in ascending order of
|
|
// sequence numbers.
|
|
// Employ a sequential search because the total number of
|
|
// snapshots are typically small.
|
|
inline SequenceNumber findEarliestVisibleSnapshot(
|
|
SequenceNumber in, SequenceNumber* prev_snapshot);
|
|
|
|
InternalIterator* input_;
|
|
const Comparator* cmp_;
|
|
MergeHelper* merge_helper_;
|
|
const std::vector<SequenceNumber>* snapshots_;
|
|
const SequenceNumber earliest_write_conflict_snapshot_;
|
|
Env* env_;
|
|
bool expect_valid_internal_key_;
|
|
const Compaction* compaction_;
|
|
const CompactionFilter* compaction_filter_;
|
|
LogBuffer* log_buffer_;
|
|
bool bottommost_level_;
|
|
bool valid_ = false;
|
|
SequenceNumber visible_at_tip_;
|
|
SequenceNumber earliest_snapshot_;
|
|
SequenceNumber latest_snapshot_;
|
|
bool ignore_snapshots_;
|
|
|
|
// State
|
|
//
|
|
// Points to a copy of the current compaction iterator output (current_key_)
|
|
// if valid_.
|
|
Slice key_;
|
|
// Points to the value in the underlying iterator that corresponds to the
|
|
// current output.
|
|
Slice value_;
|
|
// The status is OK unless compaction iterator encounters a merge operand
|
|
// while not having a merge operator defined.
|
|
Status status_;
|
|
// Stores the user key, sequence number and type of the current compaction
|
|
// iterator output (or current key in the underlying iterator during
|
|
// NextFromInput()).
|
|
ParsedInternalKey ikey_;
|
|
// Stores whether ikey_.user_key is valid. If set to false, the user key is
|
|
// not compared against the current key in the underlying iterator.
|
|
bool has_current_user_key_ = false;
|
|
bool at_next_ = false; // If false, the iterator
|
|
// Holds a copy of the current compaction iterator output (or current key in
|
|
// the underlying iterator during NextFromInput()).
|
|
IterKey current_key_;
|
|
Slice current_user_key_;
|
|
SequenceNumber current_user_key_sequence_;
|
|
SequenceNumber current_user_key_snapshot_;
|
|
|
|
// True if the iterator has already returned a record for the current key.
|
|
bool has_outputted_key_ = false;
|
|
|
|
// truncated the value of the next key and output it without applying any
|
|
// compaction rules. This is used for outputting a put after a single delete.
|
|
bool clear_and_output_next_key_ = false;
|
|
|
|
MergeOutputIterator merge_out_iter_;
|
|
// PinnedIteratorsManager used to pin input_ Iterator blocks while reading
|
|
// merge operands and then releasing them after consuming them.
|
|
PinnedIteratorsManager pinned_iters_mgr_;
|
|
std::string compaction_filter_value_;
|
|
// "level_ptrs" holds indices that remember which file of an associated
|
|
// level we were last checking during the last call to compaction->
|
|
// KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
|
|
// to pick off where it left off since each subcompaction's key range is
|
|
// increasing so a later call to the function must be looking for a key that
|
|
// is in or beyond the last file checked during the previous call
|
|
std::vector<size_t> level_ptrs_;
|
|
CompactionIteratorStats iter_stats_;
|
|
};
|
|
} // namespace rocksdb
|