rocksdb/tools/db_crashtest.py
Maysam Yabandeh 9623a66b24 Refresh snapshot list during long compactions (2nd attempt) (#5278)
Summary:
Part of compaction cpu goes to processing snapshot list, the larger the list the bigger the overhead. Although the lifetime of most of the snapshots is much shorter than the lifetime of compactions, the compaction conservatively operates on the list of snapshots that it initially obtained. This patch allows the snapshot list to be updated via a callback if the compaction is taking long. This should let the compaction to continue more efficiently with much smaller snapshot list.
For simplicity, to avoid the feature is disabled in two cases: i) When more than one sub-compaction are sharing the same snapshot list, ii) when Range Delete is used in which the range delete aggregator has its own copy of snapshot list.
This fixes the reverted https://github.com/facebook/rocksdb/pull/5099 issue with range deletes.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5278

Differential Revision: D15203291

Pulled By: maysamyabandeh

fbshipit-source-id: fa645611e606aa222c7ce53176dc5bb6f259c258
2019-09-18 14:35:50 -07:00

412 lines
15 KiB
Python

#! /usr/bin/env python
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
import os
import sys
import time
import random
import tempfile
import subprocess
import shutil
import argparse
# params overwrite priority:
# for default:
# default_params < {blackbox,whitebox}_default_params < args
# for simple:
# default_params < {blackbox,whitebox}_default_params <
# simple_default_params <
# {blackbox,whitebox}_simple_default_params < args
# for enable_atomic_flush:
# default_params < {blackbox,whitebox}_default_params <
# atomic_flush_params < args
expected_values_file = tempfile.NamedTemporaryFile()
default_params = {
"acquire_snapshot_one_in": 10000,
"block_size": 16384,
"cache_index_and_filter_blocks": lambda: random.randint(0, 1),
"cache_size": 1048576,
"checkpoint_one_in": 1000000,
"compression_type": "snappy",
"compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
"compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
"clear_column_family_one_in": 0,
"compact_files_one_in": 1000000,
"compact_range_one_in": 1000000,
"delpercent": 4,
"delrangepercent": 1,
"destroy_db_initially": 0,
"enable_pipelined_write": lambda: random.randint(0, 1),
"expected_values_path": expected_values_file.name,
"flush_one_in": 1000000,
"max_background_compactions": 20,
"max_bytes_for_level_base": 10485760,
"max_key": 100000000,
"max_write_buffer_number": 3,
"mmap_read": lambda: random.randint(0, 1),
"nooverwritepercent": 1,
"open_files": 500000,
"prefixpercent": 5,
"progress_reports": 0,
"readpercent": 45,
"recycle_log_file_num": lambda: random.randint(0, 1),
"reopen": 20,
"snapshot_hold_ops": 100000,
"subcompactions": lambda: random.randint(1, 4),
"target_file_size_base": 2097152,
"target_file_size_multiplier": 2,
"use_direct_reads": lambda: random.randint(0, 1),
"use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1),
"use_full_merge_v1": lambda: random.randint(0, 1),
"use_merge": lambda: random.randint(0, 1),
"verify_checksum": 1,
"write_buffer_size": 4 * 1024 * 1024,
"writepercent": 35,
"format_version": lambda: random.randint(2, 4),
"index_block_restart_interval": lambda: random.choice(range(1, 16)),
"use_multiget" : lambda: random.randint(0, 1),
}
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
def get_dbname(test_name):
test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
if test_tmpdir is None or test_tmpdir == "":
dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_' + test_name)
else:
dbname = test_tmpdir + "/rocksdb_crashtest_" + test_name
shutil.rmtree(dbname, True)
os.mkdir(dbname)
return dbname
def is_direct_io_supported(dbname):
with tempfile.NamedTemporaryFile(dir=dbname) as f:
try:
os.open(f.name, os.O_DIRECT)
except:
return False
return True
blackbox_default_params = {
# total time for this script to test db_stress
"duration": 6000,
# time for one db_stress instance to run
"interval": 120,
# since we will be killing anyway, use large value for ops_per_thread
"ops_per_thread": 100000000,
"set_options_one_in": 10000,
"test_batches_snapshots": 1,
}
whitebox_default_params = {
"duration": 10000,
"log2_keys_per_lock": 10,
"ops_per_thread": 200000,
"random_kill_odd": 888887,
"test_batches_snapshots": lambda: random.randint(0, 1),
}
simple_default_params = {
"allow_concurrent_memtable_write": lambda: random.randint(0, 1),
"column_families": 1,
"max_background_compactions": 1,
"max_bytes_for_level_base": 67108864,
"memtablerep": "skip_list",
"prefixpercent": 25,
"readpercent": 25,
"target_file_size_base": 16777216,
"target_file_size_multiplier": 1,
"test_batches_snapshots": 0,
"write_buffer_size": 32 * 1024 * 1024,
}
blackbox_simple_default_params = {
"open_files": -1,
"set_options_one_in": 0,
}
whitebox_simple_default_params = {}
atomic_flush_params = {
"disable_wal": 1,
"reopen": 0,
"test_atomic_flush": 1,
# use small value for write_buffer_size so that RocksDB triggers flush
# more frequently
"write_buffer_size": 1024 * 1024,
# disable pipelined write when test_atomic_flush is true
"enable_pipelined_write": 0,
}
def finalize_and_sanitize(src_params):
dest_params = dict([(k, v() if callable(v) else v)
for (k, v) in src_params.items()])
if dest_params.get("compression_type") != "zstd" or \
dest_params.get("compression_max_dict_bytes") == 0:
dest_params["compression_zstd_max_train_bytes"] = 0
if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
dest_params["memtablerep"] = "skip_list"
if dest_params["mmap_read"] == 1 or not is_direct_io_supported(
dest_params["db"]):
dest_params["use_direct_io_for_flush_and_compaction"] = 0
dest_params["use_direct_reads"] = 0
if dest_params.get("test_batches_snapshots") == 1:
dest_params["delpercent"] += dest_params["delrangepercent"]
dest_params["delrangepercent"] = 0
return dest_params
def gen_cmd_params(args):
params = {}
params.update(default_params)
if args.test_type == 'blackbox':
params.update(blackbox_default_params)
if args.test_type == 'whitebox':
params.update(whitebox_default_params)
if args.simple:
params.update(simple_default_params)
if args.test_type == 'blackbox':
params.update(blackbox_simple_default_params)
if args.test_type == 'whitebox':
params.update(whitebox_simple_default_params)
if args.enable_atomic_flush:
params.update(atomic_flush_params)
for k, v in vars(args).items():
if v is not None:
params[k] = v
return params
def gen_cmd(params, unknown_params):
cmd = ['./db_stress'] + [
'--{0}={1}'.format(k, v)
for k, v in finalize_and_sanitize(params).items()
if k not in set(['test_type', 'simple', 'duration', 'interval',
'random_kill_odd', 'enable_atomic_flush'])
and v is not None] + unknown_params
return cmd
# This script runs and kills db_stress multiple times. It checks consistency
# in case of unsafe crashes in RocksDB.
def blackbox_crash_main(args, unknown_args):
cmd_params = gen_cmd_params(args)
dbname = get_dbname('blackbox')
exit_time = time.time() + cmd_params['duration']
print("Running blackbox-crash-test with \n"
+ "interval_between_crash=" + str(cmd_params['interval']) + "\n"
+ "total-duration=" + str(cmd_params['duration']) + "\n")
while time.time() < exit_time:
run_had_errors = False
killtime = time.time() + cmd_params['interval']
cmd = gen_cmd(dict(
cmd_params.items() +
{'db': dbname}.items()), unknown_args)
child = subprocess.Popen(cmd, stderr=subprocess.PIPE)
print("Running db_stress with pid=%d: %s\n\n"
% (child.pid, ' '.join(cmd)))
stop_early = False
while time.time() < killtime:
if child.poll() is not None:
print("WARNING: db_stress ended before kill: exitcode=%d\n"
% child.returncode)
stop_early = True
break
time.sleep(1)
if not stop_early:
if child.poll() is not None:
print("WARNING: db_stress ended before kill: exitcode=%d\n"
% child.returncode)
else:
child.kill()
print("KILLED %d\n" % child.pid)
time.sleep(1) # time to stabilize after a kill
while True:
line = child.stderr.readline().strip()
if line == '':
break
elif not line.startswith('WARNING'):
run_had_errors = True
print('stderr has error message:')
print('***' + line + '***')
if run_had_errors:
sys.exit(2)
time.sleep(1) # time to stabilize before the next run
# we need to clean up after ourselves -- only do this on test success
shutil.rmtree(dbname, True)
# This python script runs db_stress multiple times. Some runs with
# kill_random_test that causes rocksdb to crash at various points in code.
def whitebox_crash_main(args, unknown_args):
cmd_params = gen_cmd_params(args)
dbname = get_dbname('whitebox')
cur_time = time.time()
exit_time = cur_time + cmd_params['duration']
half_time = cur_time + cmd_params['duration'] / 2
print("Running whitebox-crash-test with \n"
+ "total-duration=" + str(cmd_params['duration']) + "\n")
total_check_mode = 4
check_mode = 0
kill_random_test = cmd_params['random_kill_odd']
kill_mode = 0
while time.time() < exit_time:
if check_mode == 0:
additional_opts = {
# use large ops per thread since we will kill it anyway
"ops_per_thread": 100 * cmd_params['ops_per_thread'],
}
# run with kill_random_test, with three modes.
# Mode 0 covers all kill points. Mode 1 covers less kill points but
# increases change of triggering them. Mode 2 covers even less
# frequent kill points and further increases triggering change.
if kill_mode == 0:
additional_opts.update({
"kill_random_test": kill_random_test,
})
elif kill_mode == 1:
additional_opts.update({
"kill_random_test": (kill_random_test / 10 + 1),
"kill_prefix_blacklist": "WritableFileWriter::Append,"
+ "WritableFileWriter::WriteBuffered",
})
elif kill_mode == 2:
# TODO: May need to adjust random odds if kill_random_test
# is too small.
additional_opts.update({
"kill_random_test": (kill_random_test / 5000 + 1),
"kill_prefix_blacklist": "WritableFileWriter::Append,"
"WritableFileWriter::WriteBuffered,"
"PosixMmapFile::Allocate,WritableFileWriter::Flush",
})
# Run kill mode 0, 1 and 2 by turn.
kill_mode = (kill_mode + 1) % 3
elif check_mode == 1:
# normal run with universal compaction mode
additional_opts = {
"kill_random_test": None,
"ops_per_thread": cmd_params['ops_per_thread'],
"compaction_style": 1,
}
elif check_mode == 2:
# normal run with FIFO compaction mode
# ops_per_thread is divided by 5 because FIFO compaction
# style is quite a bit slower on reads with lot of files
additional_opts = {
"kill_random_test": None,
"ops_per_thread": cmd_params['ops_per_thread'] / 5,
"compaction_style": 2,
}
else:
# normal run
additional_opts = {
"kill_random_test": None,
"ops_per_thread": cmd_params['ops_per_thread'],
}
cmd = gen_cmd(dict(cmd_params.items() + additional_opts.items()
+ {'db': dbname}.items()), unknown_args)
print "Running:" + ' '.join(cmd) + "\n" # noqa: E999 T25377293 Grandfathered in
popen = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdoutdata, stderrdata = popen.communicate()
retncode = popen.returncode
msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format(
check_mode, additional_opts['kill_random_test'], retncode))
print msg
print stdoutdata
expected = False
if additional_opts['kill_random_test'] is None and (retncode == 0):
# we expect zero retncode if no kill option
expected = True
elif additional_opts['kill_random_test'] is not None and retncode <= 0:
# When kill option is given, the test MIGHT kill itself.
# If it does, negative retncode is expected. Otherwise 0.
expected = True
if not expected:
print "TEST FAILED. See kill option and exit code above!!!\n"
sys.exit(1)
stdoutdata = stdoutdata.lower()
errorcount = (stdoutdata.count('error') -
stdoutdata.count('got errors 0 times'))
print "#times error occurred in output is " + str(errorcount) + "\n"
if (errorcount > 0):
print "TEST FAILED. Output has 'error'!!!\n"
sys.exit(2)
if (stdoutdata.find('fail') >= 0):
print "TEST FAILED. Output has 'fail'!!!\n"
sys.exit(2)
# First half of the duration, keep doing kill test. For the next half,
# try different modes.
if time.time() > half_time:
# we need to clean up after ourselves -- only do this on test
# success
shutil.rmtree(dbname, True)
os.mkdir(dbname)
cmd_params.pop('expected_values_path', None)
check_mode = (check_mode + 1) % total_check_mode
time.sleep(1) # time to stabilize after a kill
def main():
parser = argparse.ArgumentParser(description="This script runs and kills \
db_stress multiple times")
parser.add_argument("test_type", choices=["blackbox", "whitebox"])
parser.add_argument("--simple", action="store_true")
parser.add_argument("--enable_atomic_flush", action='store_true')
all_params = dict(default_params.items()
+ blackbox_default_params.items()
+ whitebox_default_params.items()
+ simple_default_params.items()
+ blackbox_simple_default_params.items()
+ whitebox_simple_default_params.items())
for k, v in all_params.items():
parser.add_argument("--" + k, type=type(v() if callable(v) else v))
# unknown_args are passed directly to db_stress
args, unknown_args = parser.parse_known_args()
test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
if test_tmpdir is not None and not os.path.isdir(test_tmpdir):
print('%s env var is set to a non-existent directory: %s' %
(_TEST_DIR_ENV_VAR, test_tmpdir))
sys.exit(1)
if args.test_type == 'blackbox':
blackbox_crash_main(args, unknown_args)
if args.test_type == 'whitebox':
whitebox_crash_main(args, unknown_args)
if __name__ == '__main__':
main()