68a8e6b8fa
Summary: This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice> This diff is stacked on top of D56493 and D56511 In this diff we - Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future - Replace std::deque<std::string> with std::vector<Slice> to pass operands - Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187) - Allow FullMergeV2 output to be an existing operand ``` [Everything in Memtable | 10K operands | 10 KB each | 1 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s [master] readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s ``` ``` [Everything in Memtable | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s [master] readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 1 operand per key] [FullMergeV2] $ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s [master] readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions [FullMergeV2] readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s [master] readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s ``` Test Plan: COMPILE_WITH_ASAN=1 make check -j64 Reviewers: yhchiang, andrewkr, sdong Reviewed By: sdong Subscribers: lovro, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D57075
118 lines
3.4 KiB
C++
118 lines
3.4 KiB
C++
/**
|
|
* @author Deon Nicholas (dnicholas@fb.com)
|
|
* Copyright 2013 Facebook
|
|
*/
|
|
|
|
#include "stringappend2.h"
|
|
|
|
#include <memory>
|
|
#include <string>
|
|
#include <assert.h>
|
|
|
|
#include "rocksdb/slice.h"
|
|
#include "rocksdb/merge_operator.h"
|
|
#include "utilities/merge_operators.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
// Constructor: also specify the delimiter character.
|
|
StringAppendTESTOperator::StringAppendTESTOperator(char delim_char)
|
|
: delim_(delim_char) {
|
|
}
|
|
|
|
// Implementation for the merge operation (concatenates two strings)
|
|
bool StringAppendTESTOperator::FullMergeV2(
|
|
const MergeOperationInput& merge_in,
|
|
MergeOperationOutput* merge_out) const {
|
|
// Clear the *new_value for writing.
|
|
merge_out->new_value.clear();
|
|
|
|
if (merge_in.existing_value == nullptr && merge_in.operand_list.size() == 1) {
|
|
// Only one operand
|
|
merge_out->existing_operand = merge_in.operand_list.back();
|
|
return true;
|
|
}
|
|
|
|
// Compute the space needed for the final result.
|
|
size_t numBytes = 0;
|
|
for (auto it = merge_in.operand_list.begin();
|
|
it != merge_in.operand_list.end(); ++it) {
|
|
numBytes += it->size() + 1; // Plus 1 for the delimiter
|
|
}
|
|
|
|
// Only print the delimiter after the first entry has been printed
|
|
bool printDelim = false;
|
|
|
|
// Prepend the *existing_value if one exists.
|
|
if (merge_in.existing_value) {
|
|
merge_out->new_value.reserve(numBytes + merge_in.existing_value->size());
|
|
merge_out->new_value.append(merge_in.existing_value->data(),
|
|
merge_in.existing_value->size());
|
|
printDelim = true;
|
|
} else if (numBytes) {
|
|
merge_out->new_value.reserve(
|
|
numBytes - 1); // Minus 1 since we have one less delimiter
|
|
}
|
|
|
|
// Concatenate the sequence of strings (and add a delimiter between each)
|
|
for (auto it = merge_in.operand_list.begin();
|
|
it != merge_in.operand_list.end(); ++it) {
|
|
if (printDelim) {
|
|
merge_out->new_value.append(1, delim_);
|
|
}
|
|
merge_out->new_value.append(it->data(), it->size());
|
|
printDelim = true;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool StringAppendTESTOperator::PartialMergeMulti(
|
|
const Slice& key, const std::deque<Slice>& operand_list,
|
|
std::string* new_value, Logger* logger) const {
|
|
return false;
|
|
}
|
|
|
|
// A version of PartialMerge that actually performs "partial merging".
|
|
// Use this to simulate the exact behaviour of the StringAppendOperator.
|
|
bool StringAppendTESTOperator::_AssocPartialMergeMulti(
|
|
const Slice& key, const std::deque<Slice>& operand_list,
|
|
std::string* new_value, Logger* logger) const {
|
|
// Clear the *new_value for writing
|
|
assert(new_value);
|
|
new_value->clear();
|
|
assert(operand_list.size() >= 2);
|
|
|
|
// Generic append
|
|
// Determine and reserve correct size for *new_value.
|
|
size_t size = 0;
|
|
for (const auto& operand : operand_list) {
|
|
size += operand.size();
|
|
}
|
|
size += operand_list.size() - 1; // Delimiters
|
|
new_value->reserve(size);
|
|
|
|
// Apply concatenation
|
|
new_value->assign(operand_list.front().data(), operand_list.front().size());
|
|
|
|
for (std::deque<Slice>::const_iterator it = operand_list.begin() + 1;
|
|
it != operand_list.end(); ++it) {
|
|
new_value->append(1, delim_);
|
|
new_value->append(it->data(), it->size());
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
const char* StringAppendTESTOperator::Name() const {
|
|
return "StringAppendTESTOperator";
|
|
}
|
|
|
|
|
|
std::shared_ptr<MergeOperator>
|
|
MergeOperators::CreateStringAppendTESTOperator() {
|
|
return std::make_shared<StringAppendTESTOperator>(',');
|
|
}
|
|
|
|
} // namespace rocksdb
|