Fix stress test with best-efforts-recovery (#9986)
Summary: This PR - since we are testing with disable_wal = true and best_efforts_recovery, we should set column family count to 1, due to the requirement of `ExpectedState` tracking and replaying logic. - during backup and checkpoint restore, disable best-efforts-recovery. This does not matter now because db_crashtest.py always disables wal when testing best-efforts-recovery. In the future, if we enable wal, then not setting `restore_opitions.best_efforts_recovery` will cause backup db not to recover the WALs, and differ from db (that enables WAL). - during verification of backup and checkpoint restore, print the key where inconsistency exists between expected state and db. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9986 Test Plan: TEST_TMPDIR=/dev/shm/rocksdb make crash_test_with_best_efforts_recovery Reviewed By: siying Differential Revision: D36353105 Pulled By: riversand963 fbshipit-source-id: a484da161273e6216a1f7e245bac15a349693917
This commit is contained in:
parent
bfc6a8ee4a
commit
f6d9730ea1
@ -1424,8 +1424,9 @@ void StressTest::TestCompactFiles(ThreadState* /* thread */,
|
|||||||
Status StressTest::TestBackupRestore(
|
Status StressTest::TestBackupRestore(
|
||||||
ThreadState* thread, const std::vector<int>& rand_column_families,
|
ThreadState* thread, const std::vector<int>& rand_column_families,
|
||||||
const std::vector<int64_t>& rand_keys) {
|
const std::vector<int64_t>& rand_keys) {
|
||||||
std::string backup_dir = FLAGS_db + "/.backup" + std::to_string(thread->tid);
|
const std::string backup_dir =
|
||||||
std::string restore_dir =
|
FLAGS_db + "/.backup" + std::to_string(thread->tid);
|
||||||
|
const std::string restore_dir =
|
||||||
FLAGS_db + "/.restore" + std::to_string(thread->tid);
|
FLAGS_db + "/.restore" + std::to_string(thread->tid);
|
||||||
BackupEngineOptions backup_opts(backup_dir);
|
BackupEngineOptions backup_opts(backup_dir);
|
||||||
// For debugging, get info_log from live options
|
// For debugging, get info_log from live options
|
||||||
@ -1558,6 +1559,7 @@ Status StressTest::TestBackupRestore(
|
|||||||
// Not yet implemented: opening restored BlobDB or TransactionDB
|
// Not yet implemented: opening restored BlobDB or TransactionDB
|
||||||
if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
|
if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
|
||||||
Options restore_options(options_);
|
Options restore_options(options_);
|
||||||
|
restore_options.best_efforts_recovery = false;
|
||||||
restore_options.listeners.clear();
|
restore_options.listeners.clear();
|
||||||
// Avoid dangling/shared file descriptors, for reliable destroy
|
// Avoid dangling/shared file descriptors, for reliable destroy
|
||||||
restore_options.sst_file_manager = nullptr;
|
restore_options.sst_file_manager = nullptr;
|
||||||
@ -1614,11 +1616,17 @@ Status StressTest::TestBackupRestore(
|
|||||||
bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
|
bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
|
||||||
if (get_status.ok()) {
|
if (get_status.ok()) {
|
||||||
if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
|
if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
|
||||||
s = Status::Corruption("key exists in restore but not in original db");
|
std::ostringstream oss;
|
||||||
|
oss << "0x" << key.ToString(true)
|
||||||
|
<< " exists in restore but not in original db";
|
||||||
|
s = Status::Corruption(oss.str());
|
||||||
}
|
}
|
||||||
} else if (get_status.IsNotFound()) {
|
} else if (get_status.IsNotFound()) {
|
||||||
if (exists && from_latest && ShouldAcquireMutexOnKey()) {
|
if (exists && from_latest && ShouldAcquireMutexOnKey()) {
|
||||||
s = Status::Corruption("key exists in original db but not in restore");
|
std::ostringstream oss;
|
||||||
|
oss << "0x" << key.ToString(true)
|
||||||
|
<< " exists in original db but not in restore";
|
||||||
|
s = Status::Corruption(oss.str());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
s = get_status;
|
s = get_status;
|
||||||
@ -1760,6 +1768,7 @@ Status StressTest::TestCheckpoint(ThreadState* thread,
|
|||||||
DB* checkpoint_db = nullptr;
|
DB* checkpoint_db = nullptr;
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
Options options(options_);
|
Options options(options_);
|
||||||
|
options.best_efforts_recovery = false;
|
||||||
options.listeners.clear();
|
options.listeners.clear();
|
||||||
// Avoid race condition in trash handling after delete checkpoint_db
|
// Avoid race condition in trash handling after delete checkpoint_db
|
||||||
options.sst_file_manager.reset();
|
options.sst_file_manager.reset();
|
||||||
@ -1791,13 +1800,18 @@ Status StressTest::TestCheckpoint(ThreadState* thread,
|
|||||||
thread->shared->Exists(rand_column_families[i], rand_keys[0]);
|
thread->shared->Exists(rand_column_families[i], rand_keys[0]);
|
||||||
if (get_status.ok()) {
|
if (get_status.ok()) {
|
||||||
if (!exists && ShouldAcquireMutexOnKey()) {
|
if (!exists && ShouldAcquireMutexOnKey()) {
|
||||||
s = Status::Corruption(
|
std::ostringstream oss;
|
||||||
"key exists in checkpoint but not in original db");
|
oss << "0x" << key.ToString(true) << " exists in checkpoint "
|
||||||
|
<< checkpoint_dir << " but not in original db";
|
||||||
|
s = Status::Corruption(oss.str());
|
||||||
}
|
}
|
||||||
} else if (get_status.IsNotFound()) {
|
} else if (get_status.IsNotFound()) {
|
||||||
if (exists && ShouldAcquireMutexOnKey()) {
|
if (exists && ShouldAcquireMutexOnKey()) {
|
||||||
s = Status::Corruption(
|
std::ostringstream oss;
|
||||||
"key exists in original db but not in checkpoint");
|
oss << "0x" << key.ToString(true)
|
||||||
|
<< " exists in original db but not in checkpoint "
|
||||||
|
<< checkpoint_dir;
|
||||||
|
s = Status::Corruption(oss.str());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
s = get_status;
|
s = get_status;
|
||||||
|
@ -313,10 +313,10 @@ txn_params = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
best_efforts_recovery_params = {
|
best_efforts_recovery_params = {
|
||||||
"best_efforts_recovery": True,
|
"best_efforts_recovery": 1,
|
||||||
"skip_verifydb": True,
|
"atomic_flush": 0,
|
||||||
"verify_db_one_in": 0,
|
"disable_wal": 1,
|
||||||
"continuous_verification_interval": 0,
|
"column_families": 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
blob_params = {
|
blob_params = {
|
||||||
@ -502,6 +502,13 @@ def finalize_and_sanitize(src_params):
|
|||||||
dest_params["memtable_prefix_bloom_size_ratio"] = 0
|
dest_params["memtable_prefix_bloom_size_ratio"] = 0
|
||||||
if dest_params.get("two_write_queues") == 1:
|
if dest_params.get("two_write_queues") == 1:
|
||||||
dest_params["enable_pipelined_write"] = 0
|
dest_params["enable_pipelined_write"] = 0
|
||||||
|
if dest_params.get("best_efforts_recovery") == 1:
|
||||||
|
dest_params["disable_wal"] = 1
|
||||||
|
dest_params["atomic_flush"] = 0
|
||||||
|
dest_params["enable_compaction_filter"] = 0
|
||||||
|
dest_params["sync"] = 0
|
||||||
|
dest_params["write_fault_one_in"] = 0
|
||||||
|
|
||||||
return dest_params
|
return dest_params
|
||||||
|
|
||||||
def gen_cmd_params(args):
|
def gen_cmd_params(args):
|
||||||
@ -560,42 +567,6 @@ def gen_cmd(params, unknown_params):
|
|||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
|
|
||||||
# Inject inconsistency to db directory.
|
|
||||||
def inject_inconsistencies_to_db_dir(dir_path):
|
|
||||||
files = os.listdir(dir_path)
|
|
||||||
file_num_rgx = re.compile(r'(?P<number>[0-9]{6})')
|
|
||||||
largest_fnum = 0
|
|
||||||
for f in files:
|
|
||||||
m = file_num_rgx.search(f)
|
|
||||||
if m and not f.startswith('LOG'):
|
|
||||||
largest_fnum = max(largest_fnum, int(m.group('number')))
|
|
||||||
|
|
||||||
candidates = [
|
|
||||||
f for f in files if re.search(r'[0-9]+\.sst', f)
|
|
||||||
]
|
|
||||||
deleted = 0
|
|
||||||
corrupted = 0
|
|
||||||
for f in candidates:
|
|
||||||
rnd = random.randint(0, 99)
|
|
||||||
f_path = os.path.join(dir_path, f)
|
|
||||||
if rnd < 10:
|
|
||||||
os.unlink(f_path)
|
|
||||||
deleted = deleted + 1
|
|
||||||
elif 10 <= rnd and rnd < 30:
|
|
||||||
with open(f_path, "a") as fd:
|
|
||||||
fd.write('12345678')
|
|
||||||
corrupted = corrupted + 1
|
|
||||||
print('Removed %d table files' % deleted)
|
|
||||||
print('Corrupted %d table files' % corrupted)
|
|
||||||
|
|
||||||
# Add corrupted MANIFEST and SST
|
|
||||||
for num in range(largest_fnum + 1, largest_fnum + 10):
|
|
||||||
rnd = random.randint(0, 1)
|
|
||||||
fname = ("MANIFEST-%06d" % num) if rnd == 0 else ("%06d.sst" % num)
|
|
||||||
print('Write %s' % fname)
|
|
||||||
with open(os.path.join(dir_path, fname), "w") as fd:
|
|
||||||
fd.write("garbage")
|
|
||||||
|
|
||||||
def execute_cmd(cmd, timeout):
|
def execute_cmd(cmd, timeout):
|
||||||
child = subprocess.Popen(cmd, stderr=subprocess.PIPE,
|
child = subprocess.Popen(cmd, stderr=subprocess.PIPE,
|
||||||
stdout=subprocess.PIPE)
|
stdout=subprocess.PIPE)
|
||||||
@ -649,9 +620,6 @@ def blackbox_crash_main(args, unknown_args):
|
|||||||
|
|
||||||
time.sleep(1) # time to stabilize before the next run
|
time.sleep(1) # time to stabilize before the next run
|
||||||
|
|
||||||
if args.test_best_efforts_recovery:
|
|
||||||
inject_inconsistencies_to_db_dir(dbname)
|
|
||||||
|
|
||||||
time.sleep(1) # time to stabilize before the next run
|
time.sleep(1) # time to stabilize before the next run
|
||||||
|
|
||||||
# we need to clean up after ourselves -- only do this on test success
|
# we need to clean up after ourselves -- only do this on test success
|
||||||
|
Loading…
Reference in New Issue
Block a user