Add benchmark for GetMergeOperands() (#9785)
Summary: There's an existing benchmark, "getmergeoperands", but it is unconventional in that it has multiple phases and hardcoded setup parameters. This PR adds a different one, "readrandomoperands", that follows the pattern of other benchmarks of having a single phase and taking its configuration from existing flags. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9785 Test Plan: ``` $ ./db_bench -benchmarks=mergerandom -merge_operator=StringAppendOperator -write_buffer_size=1048576 -max_bytes_for_level_base=4194304 -target_file_size_base=1048576 -compression_type=none -disable_auto_compactions=true $ ./db_bench -use_existing_db=true -benchmarks=readrandomoperands -merge_operator=StringAppendOperator -disable_auto_compactions=true -duration=10 ... readrandomoperands : 542.082 micros/op 1844 ops/sec; 0.2 MB/s (11980 of 18999 found) ``` Reviewed By: jay-zhuang Differential Revision: D35290412 Pulled By: ajkr fbshipit-source-id: fb367ca614b128cef844a75f0e5d9dd7c3328d85
This commit is contained in:
parent
6eafdf135a
commit
bfea9e7c02
@ -160,6 +160,7 @@ IF_ROCKSDB_LITE("",
|
|||||||
"randomreplacekeys,"
|
"randomreplacekeys,"
|
||||||
"timeseries,"
|
"timeseries,"
|
||||||
"getmergeoperands",
|
"getmergeoperands",
|
||||||
|
"readrandomoperands,"
|
||||||
|
|
||||||
"Comma-separated list of operations to run in the specified"
|
"Comma-separated list of operations to run in the specified"
|
||||||
" order. Available benchmarks:\n"
|
" order. Available benchmarks:\n"
|
||||||
@ -246,7 +247,11 @@ IF_ROCKSDB_LITE("",
|
|||||||
"key "
|
"key "
|
||||||
"by doing a Get followed by binary searching in the large sorted list vs "
|
"by doing a Get followed by binary searching in the large sorted list vs "
|
||||||
"doing a GetMergeOperands and binary searching in the operands which are"
|
"doing a GetMergeOperands and binary searching in the operands which are"
|
||||||
"sorted sub-lists. The MergeOperator used is sortlist.h\n");
|
"sorted sub-lists. The MergeOperator used is sortlist.h\n"
|
||||||
|
"\treadrandomoperands -- read random keys using `GetMergeOperands()`. An "
|
||||||
|
"operation includes a rare but possible retry in case it got "
|
||||||
|
"`Status::Incomplete()`. This happens upon encountering more keys than "
|
||||||
|
"have ever been seen by the thread (or eight initially)\n");
|
||||||
|
|
||||||
DEFINE_int64(num, 1000000, "Number of key/values to place in database");
|
DEFINE_int64(num, 1000000, "Number of key/values to place in database");
|
||||||
|
|
||||||
@ -1541,6 +1546,9 @@ DEFINE_bool(persist_stats_to_disk,
|
|||||||
DEFINE_uint64(stats_history_buffer_size,
|
DEFINE_uint64(stats_history_buffer_size,
|
||||||
ROCKSDB_NAMESPACE::Options().stats_history_buffer_size,
|
ROCKSDB_NAMESPACE::Options().stats_history_buffer_size,
|
||||||
"Max number of stats snapshots to keep in memory");
|
"Max number of stats snapshots to keep in memory");
|
||||||
|
DEFINE_bool(avoid_flush_during_recovery,
|
||||||
|
ROCKSDB_NAMESPACE::Options().avoid_flush_during_recovery,
|
||||||
|
"If true, avoids flushing the recovered WAL data where possible.");
|
||||||
DEFINE_int64(multiread_stride, 0,
|
DEFINE_int64(multiread_stride, 0,
|
||||||
"Stride length for the keys in a MultiGet batch");
|
"Stride length for the keys in a MultiGet batch");
|
||||||
DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");
|
DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");
|
||||||
@ -2502,6 +2510,7 @@ class Benchmark {
|
|||||||
int64_t merge_keys_;
|
int64_t merge_keys_;
|
||||||
bool report_file_operations_;
|
bool report_file_operations_;
|
||||||
bool use_blob_db_; // Stacked BlobDB
|
bool use_blob_db_; // Stacked BlobDB
|
||||||
|
bool read_operands_; // read via GetMergeOperands()
|
||||||
std::vector<std::string> keys_;
|
std::vector<std::string> keys_;
|
||||||
|
|
||||||
class ErrorHandlerListener : public EventListener {
|
class ErrorHandlerListener : public EventListener {
|
||||||
@ -2892,11 +2901,11 @@ class Benchmark {
|
|||||||
merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
|
merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
|
||||||
report_file_operations_(FLAGS_report_file_operations),
|
report_file_operations_(FLAGS_report_file_operations),
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
use_blob_db_(FLAGS_use_blob_db) // Stacked BlobDB
|
use_blob_db_(FLAGS_use_blob_db), // Stacked BlobDB
|
||||||
#else
|
#else
|
||||||
use_blob_db_(false) // Stacked BlobDB
|
use_blob_db_(false), // Stacked BlobDB
|
||||||
#endif // !ROCKSDB_LITE
|
#endif // !ROCKSDB_LITE
|
||||||
{
|
read_operands_(false) {
|
||||||
// use simcache instead of cache
|
// use simcache instead of cache
|
||||||
if (FLAGS_simcache_size >= 0) {
|
if (FLAGS_simcache_size >= 0) {
|
||||||
if (FLAGS_cache_numshardbits >= 1) {
|
if (FLAGS_cache_numshardbits >= 1) {
|
||||||
@ -3409,6 +3418,9 @@ class Benchmark {
|
|||||||
} else if (name == "verifyfilechecksums") {
|
} else if (name == "verifyfilechecksums") {
|
||||||
method = &Benchmark::VerifyFileChecksums;
|
method = &Benchmark::VerifyFileChecksums;
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
} else if (name == "readrandomoperands") {
|
||||||
|
read_operands_ = true;
|
||||||
|
method = &Benchmark::ReadRandom;
|
||||||
} else if (!name.empty()) { // No error message for empty name
|
} else if (!name.empty()) { // No error message for empty name
|
||||||
fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
|
fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
|
||||||
ErrorExit();
|
ErrorExit();
|
||||||
@ -4294,6 +4306,7 @@ class Benchmark {
|
|||||||
options.persist_stats_to_disk = FLAGS_persist_stats_to_disk;
|
options.persist_stats_to_disk = FLAGS_persist_stats_to_disk;
|
||||||
options.stats_history_buffer_size =
|
options.stats_history_buffer_size =
|
||||||
static_cast<size_t>(FLAGS_stats_history_buffer_size);
|
static_cast<size_t>(FLAGS_stats_history_buffer_size);
|
||||||
|
options.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
|
||||||
|
|
||||||
options.compression_opts.level = FLAGS_compression_level;
|
options.compression_opts.level = FLAGS_compression_level;
|
||||||
options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
|
options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
|
||||||
@ -5634,6 +5647,12 @@ class Benchmark {
|
|||||||
std::unique_ptr<const char[]> key_guard;
|
std::unique_ptr<const char[]> key_guard;
|
||||||
Slice key = AllocateKey(&key_guard);
|
Slice key = AllocateKey(&key_guard);
|
||||||
PinnableSlice pinnable_val;
|
PinnableSlice pinnable_val;
|
||||||
|
std::vector<PinnableSlice> pinnable_vals;
|
||||||
|
if (read_operands_) {
|
||||||
|
// Start off with a small-ish value that'll be increased later if
|
||||||
|
// `GetMergeOperands()` tells us it is not large enough.
|
||||||
|
pinnable_vals.resize(8);
|
||||||
|
}
|
||||||
std::unique_ptr<char[]> ts_guard;
|
std::unique_ptr<char[]> ts_guard;
|
||||||
Slice ts;
|
Slice ts;
|
||||||
if (user_timestamp_size_ > 0) {
|
if (user_timestamp_size_ > 0) {
|
||||||
@ -5671,17 +5690,45 @@ class Benchmark {
|
|||||||
}
|
}
|
||||||
Status s;
|
Status s;
|
||||||
pinnable_val.Reset();
|
pinnable_val.Reset();
|
||||||
if (FLAGS_num_column_families > 1) {
|
for (size_t i = 0; i < pinnable_vals.size(); ++i) {
|
||||||
s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
|
pinnable_vals[i].Reset();
|
||||||
&pinnable_val, ts_ptr);
|
|
||||||
} else {
|
|
||||||
s = db_with_cfh->db->Get(options,
|
|
||||||
db_with_cfh->db->DefaultColumnFamily(), key,
|
|
||||||
&pinnable_val, ts_ptr);
|
|
||||||
}
|
}
|
||||||
|
ColumnFamilyHandle* cfh;
|
||||||
|
if (FLAGS_num_column_families > 1) {
|
||||||
|
cfh = db_with_cfh->GetCfh(key_rand);
|
||||||
|
} else {
|
||||||
|
cfh = db_with_cfh->db->DefaultColumnFamily();
|
||||||
|
}
|
||||||
|
if (read_operands_) {
|
||||||
|
GetMergeOperandsOptions get_merge_operands_options;
|
||||||
|
get_merge_operands_options.expected_max_number_of_operands =
|
||||||
|
static_cast<int>(pinnable_vals.size());
|
||||||
|
int number_of_operands;
|
||||||
|
s = db_with_cfh->db->GetMergeOperands(
|
||||||
|
options, cfh, key, pinnable_vals.data(),
|
||||||
|
&get_merge_operands_options, &number_of_operands);
|
||||||
|
if (s.IsIncomplete()) {
|
||||||
|
// Should only happen a few times when we encounter a key that had
|
||||||
|
// more merge operands than any key seen so far. Production use case
|
||||||
|
// would typically retry in such event to get all the operands so do
|
||||||
|
// that here.
|
||||||
|
pinnable_vals.resize(number_of_operands);
|
||||||
|
get_merge_operands_options.expected_max_number_of_operands =
|
||||||
|
static_cast<int>(pinnable_vals.size());
|
||||||
|
s = db_with_cfh->db->GetMergeOperands(
|
||||||
|
options, cfh, key, pinnable_vals.data(),
|
||||||
|
&get_merge_operands_options, &number_of_operands);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
s = db_with_cfh->db->Get(options, cfh, key, &pinnable_val, ts_ptr);
|
||||||
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
found++;
|
found++;
|
||||||
bytes += key.size() + pinnable_val.size() + user_timestamp_size_;
|
bytes += key.size() + pinnable_val.size() + user_timestamp_size_;
|
||||||
|
for (size_t i = 0; i < pinnable_vals.size(); ++i) {
|
||||||
|
bytes += pinnable_vals[i].size();
|
||||||
|
}
|
||||||
} else if (!s.IsNotFound()) {
|
} else if (!s.IsNotFound()) {
|
||||||
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
|
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
|
||||||
abort();
|
abort();
|
||||||
|
Loading…
Reference in New Issue
Block a user