68a8e6b8fa
Summary: This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice> This diff is stacked on top of D56493 and D56511 In this diff we - Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future - Replace std::deque<std::string> with std::vector<Slice> to pass operands - Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187) - Allow FullMergeV2 output to be an existing operand ``` [Everything in Memtable | 10K operands | 10 KB each | 1 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s [master] readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s ``` ``` [Everything in Memtable | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s [master] readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 1 operand per key] [FullMergeV2] $ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s [master] readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions [FullMergeV2] readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s [master] readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s ``` Test Plan: COMPILE_WITH_ASAN=1 make check -j64 Reviewers: yhchiang, andrewkr, sdong Reviewed By: sdong Subscribers: lovro, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D57075
384 lines
13 KiB
Python
384 lines
13 KiB
Python
#! /usr/bin/env python
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import random
|
|
import logging
|
|
import tempfile
|
|
import subprocess
|
|
import shutil
|
|
import argparse
|
|
|
|
# params overwrite priority:
|
|
# for default:
|
|
# default_params < blackbox|whitebox_default_params < args
|
|
# for simple:
|
|
# simple_default_params < blackbox|whitebox_simple_default_params < args
|
|
|
|
default_params = {
|
|
"block_size": 16384,
|
|
"cache_size": 1048576,
|
|
"delpercent": 5,
|
|
"destroy_db_initially": 0,
|
|
"disable_data_sync": 0,
|
|
"disable_wal": 0,
|
|
"allow_concurrent_memtable_write": 0,
|
|
"iterpercent": 10,
|
|
"max_background_compactions": 20,
|
|
"max_bytes_for_level_base": 10485760,
|
|
"max_key": 100000000,
|
|
"max_write_buffer_number": 3,
|
|
"memtablerep": "prefix_hash",
|
|
"mmap_read": lambda: random.randint(0, 1),
|
|
"open_files": 500000,
|
|
"prefix_size": 7,
|
|
"prefixpercent": 5,
|
|
"progress_reports": 0,
|
|
"readpercent": 45,
|
|
"reopen": 20,
|
|
"sync": 0,
|
|
"target_file_size_base": 2097152,
|
|
"target_file_size_multiplier": 2,
|
|
"threads": 32,
|
|
"verify_checksum": 1,
|
|
"write_buffer_size": 4 * 1024 * 1024,
|
|
"writepercent": 35,
|
|
"subcompactions": lambda: random.randint(1, 4),
|
|
"use_merge": lambda: random.randint(0, 1),
|
|
"use_full_merge_v1": lambda: random.randint(0, 1),
|
|
}
|
|
|
|
|
|
def get_dbname(test_name):
|
|
test_tmpdir = os.environ.get("TEST_TMPDIR")
|
|
if test_tmpdir is None or test_tmpdir == "":
|
|
dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_' + test_name)
|
|
else:
|
|
dbname = test_tmpdir + "/rocksdb_crashtest_" + test_name
|
|
shutil.rmtree(dbname, True)
|
|
return dbname
|
|
|
|
blackbox_default_params = {
|
|
# total time for this script to test db_stress
|
|
"duration": 6000,
|
|
# time for one db_stress instance to run
|
|
"interval": 120,
|
|
# since we will be killing anyway, use large value for ops_per_thread
|
|
"ops_per_thread": 100000000,
|
|
"set_options_one_in": 10000,
|
|
"test_batches_snapshots": 1,
|
|
}
|
|
|
|
whitebox_default_params = {
|
|
"duration": 10000,
|
|
"log2_keys_per_lock": 10,
|
|
"nooverwritepercent": 1,
|
|
"ops_per_thread": 200000,
|
|
"test_batches_snapshots": lambda: random.randint(0, 1),
|
|
"write_buffer_size": 4 * 1024 * 1024,
|
|
"subcompactions": lambda: random.randint(1, 4),
|
|
}
|
|
|
|
simple_default_params = {
|
|
"block_size": 16384,
|
|
"cache_size": 1048576,
|
|
"column_families": 1,
|
|
"delpercent": 5,
|
|
"destroy_db_initially": 0,
|
|
"disable_data_sync": 0,
|
|
"disable_wal": 0,
|
|
"allow_concurrent_memtable_write": lambda: random.randint(0, 1),
|
|
"iterpercent": 10,
|
|
"max_background_compactions": 1,
|
|
"max_bytes_for_level_base": 67108864,
|
|
"max_key": 100000000,
|
|
"max_write_buffer_number": 3,
|
|
"memtablerep": "skip_list",
|
|
"mmap_read": lambda: random.randint(0, 1),
|
|
"prefix_size": 0,
|
|
"prefixpercent": 0,
|
|
"progress_reports": 0,
|
|
"readpercent": 50,
|
|
"reopen": 20,
|
|
"sync": 0,
|
|
"target_file_size_base": 16777216,
|
|
"target_file_size_multiplier": 1,
|
|
"test_batches_snapshots": 0,
|
|
"threads": 32,
|
|
"verify_checksum": 1,
|
|
"write_buffer_size": 32 * 1024 * 1024,
|
|
"writepercent": 35,
|
|
"subcompactions": lambda: random.randint(1, 4),
|
|
}
|
|
|
|
blackbox_simple_default_params = {
|
|
"duration": 6000,
|
|
"interval": 120,
|
|
"open_files": -1,
|
|
"ops_per_thread": 100000000,
|
|
"set_options_one_in": 0,
|
|
"test_batches_snapshots": 0,
|
|
}
|
|
|
|
whitebox_simple_default_params = {
|
|
"duration": 10000,
|
|
"log2_keys_per_lock": 10,
|
|
"nooverwritepercent": 1,
|
|
"open_files": 500000,
|
|
"ops_per_thread": 200000,
|
|
"write_buffer_size": 32 * 1024 * 1024,
|
|
"subcompactions": lambda: random.randint(1, 4),
|
|
}
|
|
|
|
|
|
def finalize_and_sanitize(src_params):
|
|
dest_params = dict([(k, v() if callable(v) else v)
|
|
for (k, v) in src_params.items()])
|
|
if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
|
|
dest_params["memtablerep"] = "skip_list"
|
|
return dest_params
|
|
|
|
|
|
def gen_cmd_params(args):
|
|
params = {}
|
|
|
|
if args.simple:
|
|
params.update(simple_default_params)
|
|
if args.test_type == 'blackbox':
|
|
params.update(blackbox_simple_default_params)
|
|
if args.test_type == 'whitebox':
|
|
params.update(whitebox_simple_default_params)
|
|
|
|
if not args.simple:
|
|
params.update(default_params)
|
|
if args.test_type == 'blackbox':
|
|
params.update(blackbox_default_params)
|
|
if args.test_type == 'whitebox':
|
|
params.update(whitebox_default_params)
|
|
|
|
for k, v in vars(args).items():
|
|
if v is not None:
|
|
params[k] = v
|
|
return params
|
|
|
|
|
|
def gen_cmd(params):
|
|
cmd = './db_stress ' + ' '.join(
|
|
'--{0}={1}'.format(k, v)
|
|
for k, v in finalize_and_sanitize(params).items()
|
|
if k not in set(['test_type', 'simple', 'duration', 'interval'])
|
|
and v is not None)
|
|
return cmd
|
|
|
|
|
|
# This script runs and kills db_stress multiple times. It checks consistency
|
|
# in case of unsafe crashes in RocksDB.
|
|
def blackbox_crash_main(args):
|
|
cmd_params = gen_cmd_params(args)
|
|
dbname = get_dbname('blackbox')
|
|
exit_time = time.time() + cmd_params['duration']
|
|
|
|
print("Running blackbox-crash-test with \n"
|
|
+ "interval_between_crash=" + str(cmd_params['interval']) + "\n"
|
|
+ "total-duration=" + str(cmd_params['duration']) + "\n"
|
|
+ "threads=" + str(cmd_params['threads']) + "\n"
|
|
+ "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n"
|
|
+ "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n"
|
|
+ "subcompactions=" + str(cmd_params['subcompactions']) + "\n")
|
|
|
|
while time.time() < exit_time:
|
|
run_had_errors = False
|
|
killtime = time.time() + cmd_params['interval']
|
|
|
|
cmd = gen_cmd(dict(cmd_params.items() + {'db': dbname}.items()))
|
|
|
|
child = subprocess.Popen([cmd],
|
|
stderr=subprocess.PIPE, shell=True)
|
|
print("Running db_stress with pid=%d: %s\n\n"
|
|
% (child.pid, cmd))
|
|
|
|
stop_early = False
|
|
while time.time() < killtime:
|
|
if child.poll() is not None:
|
|
print("WARNING: db_stress ended before kill: exitcode=%d\n"
|
|
% child.returncode)
|
|
stop_early = True
|
|
break
|
|
time.sleep(1)
|
|
|
|
if not stop_early:
|
|
if child.poll() is not None:
|
|
print("WARNING: db_stress ended before kill: exitcode=%d\n"
|
|
% child.returncode)
|
|
else:
|
|
child.kill()
|
|
print("KILLED %d\n" % child.pid)
|
|
time.sleep(1) # time to stabilize after a kill
|
|
|
|
while True:
|
|
line = child.stderr.readline().strip()
|
|
if line != '':
|
|
run_had_errors = True
|
|
print('***' + line + '^')
|
|
else:
|
|
break
|
|
|
|
if run_had_errors:
|
|
sys.exit(2)
|
|
|
|
time.sleep(1) # time to stabilize before the next run
|
|
|
|
# we need to clean up after ourselves -- only do this on test success
|
|
shutil.rmtree(dbname, True)
|
|
|
|
|
|
# This python script runs db_stress multiple times. Some runs with
|
|
# kill_random_test that causes rocksdb to crash at various points in code.
|
|
def whitebox_crash_main(args):
|
|
cmd_params = gen_cmd_params(args)
|
|
dbname = get_dbname('whitebox')
|
|
|
|
cur_time = time.time()
|
|
exit_time = cur_time + cmd_params['duration']
|
|
half_time = cur_time + cmd_params['duration'] / 2
|
|
|
|
print("Running whitebox-crash-test with \n"
|
|
+ "total-duration=" + str(cmd_params['duration']) + "\n"
|
|
+ "threads=" + str(cmd_params['threads']) + "\n"
|
|
+ "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n"
|
|
+ "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n"
|
|
+ "subcompactions=" + str(cmd_params['subcompactions']) + "\n")
|
|
|
|
total_check_mode = 4
|
|
check_mode = 0
|
|
kill_random_test = 888887
|
|
kill_mode = 0
|
|
|
|
while time.time() < exit_time:
|
|
if check_mode == 0:
|
|
additional_opts = {
|
|
# use large ops per thread since we will kill it anyway
|
|
"ops_per_thread": 100 * cmd_params['ops_per_thread'],
|
|
}
|
|
# run with kill_random_test, with three modes.
|
|
# Mode 0 covers all kill points. Mode 1 covers less kill points but
|
|
# increases change of triggering them. Mode 2 covers even less
|
|
# frequent kill points and further increases triggering change.
|
|
if kill_mode == 0:
|
|
additional_opts.update({
|
|
"kill_random_test": kill_random_test,
|
|
})
|
|
elif kill_mode == 1:
|
|
additional_opts.update({
|
|
"kill_random_test": (kill_random_test / 10 + 1),
|
|
"kill_prefix_blacklist": "WritableFileWriter::Append,"
|
|
+ "WritableFileWriter::WriteBuffered",
|
|
})
|
|
elif kill_mode == 2:
|
|
additional_opts.update({
|
|
"kill_random_test": (kill_random_test / 5000 + 1),
|
|
"kill_prefix_blacklist": "WritableFileWriter::Append,"
|
|
"WritableFileWriter::WriteBuffered,"
|
|
"PosixMmapFile::Allocate,WritableFileWriter::Flush",
|
|
})
|
|
# Run kill mode 0, 1 and 2 by turn.
|
|
kill_mode = (kill_mode + 1) % 3
|
|
elif check_mode == 1:
|
|
# normal run with universal compaction mode
|
|
additional_opts = {
|
|
"kill_random_test": None,
|
|
"ops_per_thread": cmd_params['ops_per_thread'],
|
|
"compaction_style": 1,
|
|
}
|
|
elif check_mode == 2:
|
|
# normal run with FIFO compaction mode
|
|
# ops_per_thread is divided by 5 because FIFO compaction
|
|
# style is quite a bit slower on reads with lot of files
|
|
additional_opts = {
|
|
"kill_random_test": None,
|
|
"ops_per_thread": cmd_params['ops_per_thread'] / 5,
|
|
"compaction_style": 2,
|
|
}
|
|
else:
|
|
# normal run
|
|
additional_opts = additional_opts = {
|
|
"kill_random_test": None,
|
|
"ops_per_thread": cmd_params['ops_per_thread'],
|
|
}
|
|
|
|
cmd = gen_cmd(dict(cmd_params.items() + additional_opts.items()
|
|
+ {'db': dbname}.items()))
|
|
|
|
print "Running:" + cmd + "\n"
|
|
|
|
popen = subprocess.Popen([cmd], stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
shell=True)
|
|
stdoutdata, stderrdata = popen.communicate()
|
|
retncode = popen.returncode
|
|
msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format(
|
|
check_mode, additional_opts['kill_random_test'], retncode))
|
|
print msg
|
|
print stdoutdata
|
|
|
|
expected = False
|
|
if additional_opts['kill_random_test'] is None and (retncode == 0):
|
|
# we expect zero retncode if no kill option
|
|
expected = True
|
|
elif additional_opts['kill_random_test'] is not None and retncode < 0:
|
|
# we expect negative retncode if kill option was given
|
|
expected = True
|
|
|
|
if not expected:
|
|
print "TEST FAILED. See kill option and exit code above!!!\n"
|
|
sys.exit(1)
|
|
|
|
stdoutdata = stdoutdata.lower()
|
|
errorcount = (stdoutdata.count('error') -
|
|
stdoutdata.count('got errors 0 times'))
|
|
print "#times error occurred in output is " + str(errorcount) + "\n"
|
|
|
|
if (errorcount > 0):
|
|
print "TEST FAILED. Output has 'error'!!!\n"
|
|
sys.exit(2)
|
|
if (stdoutdata.find('fail') >= 0):
|
|
print "TEST FAILED. Output has 'fail'!!!\n"
|
|
sys.exit(2)
|
|
|
|
# First half of the duration, keep doing kill test. For the next half,
|
|
# try different modes.
|
|
if time.time() > half_time:
|
|
# we need to clean up after ourselves -- only do this on test
|
|
# success
|
|
shutil.rmtree(dbname, True)
|
|
check_mode = (check_mode + 1) % total_check_mode
|
|
|
|
time.sleep(1) # time to stabilize after a kill
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="This script runs and kills \
|
|
db_stress multiple times")
|
|
parser.add_argument("test_type", choices=["blackbox", "whitebox"])
|
|
parser.add_argument("--simple", action="store_true")
|
|
|
|
all_params = dict(default_params.items()
|
|
+ blackbox_default_params.items()
|
|
+ whitebox_default_params.items()
|
|
+ simple_default_params.items()
|
|
+ blackbox_simple_default_params.items()
|
|
+ whitebox_simple_default_params.items())
|
|
|
|
for k, v in all_params.items():
|
|
parser.add_argument("--" + k, type=type(v() if callable(v) else v))
|
|
args = parser.parse_args()
|
|
|
|
if args.test_type == 'blackbox':
|
|
blackbox_crash_main(args)
|
|
if args.test_type == 'whitebox':
|
|
whitebox_crash_main(args)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|