rocksdb/db/merge_helper.h
Islam AbdelRahman 68a8e6b8fa Introduce FullMergeV2 (eliminate memcpy from merge operators)
Summary:
This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice>

This diff is stacked on top of D56493 and D56511

In this diff we
- Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future
- Replace std::deque<std::string> with std::vector<Slice> to pass operands
- Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187)
- Allow FullMergeV2 output to be an existing operand

```
[Everything in Memtable | 10K operands | 10 KB each | 1 operand per key]

DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000

[FullMergeV2]
readseq      :       0.607 micros/op 1648235 ops/sec; 16121.2 MB/s
readseq      :       0.478 micros/op 2091546 ops/sec; 20457.2 MB/s
readseq      :       0.252 micros/op 3972081 ops/sec; 38850.5 MB/s
readseq      :       0.237 micros/op 4218328 ops/sec; 41259.0 MB/s
readseq      :       0.247 micros/op 4043927 ops/sec; 39553.2 MB/s

[master]
readseq      :       3.935 micros/op 254140 ops/sec; 2485.7 MB/s
readseq      :       3.722 micros/op 268657 ops/sec; 2627.7 MB/s
readseq      :       3.149 micros/op 317605 ops/sec; 3106.5 MB/s
readseq      :       3.125 micros/op 320024 ops/sec; 3130.1 MB/s
readseq      :       4.075 micros/op 245374 ops/sec; 2400.0 MB/s
```

```
[Everything in Memtable | 10K operands | 10 KB each | 10 operand per key]

DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000

[FullMergeV2]
readseq      :       3.472 micros/op 288018 ops/sec; 2817.1 MB/s
readseq      :       2.304 micros/op 434027 ops/sec; 4245.2 MB/s
readseq      :       1.163 micros/op 859845 ops/sec; 8410.0 MB/s
readseq      :       1.192 micros/op 838926 ops/sec; 8205.4 MB/s
readseq      :       1.250 micros/op 800000 ops/sec; 7824.7 MB/s

[master]
readseq      :      24.025 micros/op 41623 ops/sec;  407.1 MB/s
readseq      :      18.489 micros/op 54086 ops/sec;  529.0 MB/s
readseq      :      18.693 micros/op 53495 ops/sec;  523.2 MB/s
readseq      :      23.621 micros/op 42335 ops/sec;  414.1 MB/s
readseq      :      18.775 micros/op 53262 ops/sec;  521.0 MB/s

```

```
[Everything in Block cache | 10K operands | 10 KB each | 1 operand per key]

[FullMergeV2]
$ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
readseq      :      14.741 micros/op 67837 ops/sec;  663.5 MB/s
readseq      :       1.029 micros/op 971446 ops/sec; 9501.6 MB/s
readseq      :       0.974 micros/op 1026229 ops/sec; 10037.4 MB/s
readseq      :       0.965 micros/op 1036080 ops/sec; 10133.8 MB/s
readseq      :       0.943 micros/op 1060657 ops/sec; 10374.2 MB/s

[master]
readseq      :      16.735 micros/op 59755 ops/sec;  584.5 MB/s
readseq      :       3.029 micros/op 330151 ops/sec; 3229.2 MB/s
readseq      :       3.136 micros/op 318883 ops/sec; 3119.0 MB/s
readseq      :       3.065 micros/op 326245 ops/sec; 3191.0 MB/s
readseq      :       3.014 micros/op 331813 ops/sec; 3245.4 MB/s
```

```
[Everything in Block cache | 10K operands | 10 KB each | 10 operand per key]

DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions

[FullMergeV2]
readseq      :      24.325 micros/op 41109 ops/sec;  402.1 MB/s
readseq      :       1.470 micros/op 680272 ops/sec; 6653.7 MB/s
readseq      :       1.231 micros/op 812347 ops/sec; 7945.5 MB/s
readseq      :       1.091 micros/op 916590 ops/sec; 8965.1 MB/s
readseq      :       1.109 micros/op 901713 ops/sec; 8819.6 MB/s

[master]
readseq      :      27.257 micros/op 36687 ops/sec;  358.8 MB/s
readseq      :       4.443 micros/op 225073 ops/sec; 2201.4 MB/s
readseq      :       5.830 micros/op 171526 ops/sec; 1677.7 MB/s
readseq      :       4.173 micros/op 239635 ops/sec; 2343.8 MB/s
readseq      :       4.150 micros/op 240963 ops/sec; 2356.8 MB/s
```

Test Plan: COMPILE_WITH_ASAN=1 make check -j64

Reviewers: yhchiang, andrewkr, sdong

Reviewed By: sdong

Subscribers: lovro, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D57075
2016-07-20 09:49:03 -07:00

175 lines
6.8 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef MERGE_HELPER_H
#define MERGE_HELPER_H
#include <deque>
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "db/merge_context.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "util/stop_watch.h"
namespace rocksdb {
class Comparator;
class Iterator;
class Logger;
class MergeOperator;
class Statistics;
class InternalIterator;
class MergeHelper {
public:
MergeHelper(Env* env, const Comparator* user_comparator,
const MergeOperator* user_merge_operator,
const CompactionFilter* compaction_filter, Logger* logger,
unsigned min_partial_merge_operands,
bool assert_valid_internal_key, SequenceNumber latest_snapshot,
int level = 0, Statistics* stats = nullptr)
: env_(env),
user_comparator_(user_comparator),
user_merge_operator_(user_merge_operator),
compaction_filter_(compaction_filter),
logger_(logger),
min_partial_merge_operands_(min_partial_merge_operands),
assert_valid_internal_key_(assert_valid_internal_key),
latest_snapshot_(latest_snapshot),
level_(level),
keys_(),
filter_timer_(env_),
total_filter_time_(0U),
stats_(stats) {
assert(user_comparator_ != nullptr);
}
// Wrapper around MergeOperator::FullMergeV2() that records perf statistics.
// Result of merge will be written to result if status returned is OK.
// If operands is empty, the value will simply be copied to result.
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
// - Corruption: Merge operator reported unsuccessful merge.
static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* value,
const std::vector<Slice>& operands,
std::string* result, Logger* logger,
Statistics* statistics, Env* env,
Slice* result_operand = nullptr);
// Merge entries until we hit
// - a corrupted key
// - a Put/Delete,
// - a different user key,
// - a specific sequence number (snapshot boundary),
// or - the end of iteration
// iter: (IN) points to the first merge type entry
// (OUT) points to the first entry not included in the merge process
// stop_before: (IN) a sequence number that merge should not cross.
// 0 means no restriction
// at_bottom: (IN) true if the iterator covers the bottem level, which means
// we could reach the start of the history of this user key.
//
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
// - MergeInProgress: Put/Delete not encountered and unable to merge operands.
// - Corruption: Merge operator reported unsuccessful merge or a corrupted
// key has been encountered and not expected (applies only when compiling
// with asserts removed).
//
// REQUIRED: The first key in the input is not corrupted.
Status MergeUntil(InternalIterator* iter,
const SequenceNumber stop_before = 0,
const bool at_bottom = false);
// Filters a merge operand using the compaction filter specified
// in the constructor. Returns true if the operand should be filtered out.
bool FilterMerge(const Slice& user_key, const Slice& value_slice);
// Query the merge result
// These are valid until the next MergeUntil call
// If the merging was successful:
// - keys() contains a single element with the latest sequence number of
// the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
// - values() contains a single element with the result of merging all the
// operands together
//
// IMPORTANT 1: the key type could change after the MergeUntil call.
// Put/Delete + Merge + ... + Merge => Put
// Merge + ... + Merge => Merge
//
// If the merge operator is not associative, and if a Put/Delete is not found
// then the merging will be unsuccessful. In this case:
// - keys() contains the list of internal keys seen in order of iteration.
// - values() contains the list of values (merges) seen in the same order.
// values() is parallel to keys() so that the first entry in
// keys() is the key associated with the first entry in values()
// and so on. These lists will be the same length.
// All of these pairs will be merges over the same user key.
// See IMPORTANT 2 note below.
//
// IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
// So keys().back() was the first key seen by iterator.
// TODO: Re-style this comment to be like the first one
const std::deque<std::string>& keys() const { return keys_; }
const std::vector<Slice>& values() const {
return merge_context_.GetOperands();
}
uint64_t TotalFilterTime() const { return total_filter_time_; }
bool HasOperator() const { return user_merge_operator_ != nullptr; }
private:
Env* env_;
const Comparator* user_comparator_;
const MergeOperator* user_merge_operator_;
const CompactionFilter* compaction_filter_;
Logger* logger_;
unsigned min_partial_merge_operands_;
bool assert_valid_internal_key_; // enforce no internal key corruption?
SequenceNumber latest_snapshot_;
int level_;
// the scratch area that holds the result of MergeUntil
// valid up to the next MergeUntil call
// Keeps track of the sequence of keys seen
std::deque<std::string> keys_;
// Parallel with keys_; stores the operands
mutable MergeContext merge_context_;
StopWatchNano filter_timer_;
uint64_t total_filter_time_;
Statistics* stats_;
};
// MergeOutputIterator can be used to iterate over the result of a merge.
class MergeOutputIterator {
public:
// The MergeOutputIterator is bound to a MergeHelper instance.
explicit MergeOutputIterator(const MergeHelper* merge_helper);
// Seeks to the first record in the output.
void SeekToFirst();
// Advances to the next record in the output.
void Next();
Slice key() { return Slice(*it_keys_); }
Slice value() { return Slice(*it_values_); }
bool Valid() { return it_keys_ != merge_helper_->keys().rend(); }
private:
const MergeHelper* merge_helper_;
std::deque<std::string>::const_reverse_iterator it_keys_;
std::vector<Slice>::const_reverse_iterator it_values_;
};
} // namespace rocksdb
#endif