Inject the random write error to stress test (#7653)

Summary:
Inject the random write error to stress test, it requires set reopen=0 and disable_wal=true.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7653

Test Plan: pass db_stress and python3 db_crashtest.py blackbox

Reviewed By: ajkr

Differential Revision: D25354132

Pulled By: zhichao-cao

fbshipit-source-id: 44721104eecb416e27f65f854912c40e301dd669
This commit is contained in:
Zhichao Cao 2020-12-17 11:51:04 -08:00 committed by Facebook GitHub Bot
parent 99f5a800c3
commit 04b3524ad0
9 changed files with 115 additions and 10 deletions

View File

@ -16,10 +16,10 @@
#include "util/file_checksum_helper.h" #include "util/file_checksum_helper.h"
#include "util/xxhash.h" #include "util/xxhash.h"
ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env = nullptr; ROCKSDB_NAMESPACE::Env* db_stress_env = nullptr;
#ifndef NDEBUG #ifndef NDEBUG
// If non-null, injects read error at a rate specified by the // If non-null, injects read error at a rate specified by the
// read_fault_one_in flag // read_fault_one_in or write_fault_one_in flag
std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard; std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
#endif // NDEBUG #endif // NDEBUG
enum ROCKSDB_NAMESPACE::CompressionType compression_type_e = enum ROCKSDB_NAMESPACE::CompressionType compression_type_e =

View File

@ -252,7 +252,7 @@ const int kRandomValueMaxFactor = 3;
const int kValueMaxLen = 100; const int kValueMaxLen = 100;
// wrapped posix or hdfs environment // wrapped posix or hdfs environment
extern ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env; extern ROCKSDB_NAMESPACE::Env* db_stress_env;
#ifndef NDEBUG #ifndef NDEBUG
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class FaultInjectionTestFS; class FaultInjectionTestFS;

View File

@ -757,4 +757,7 @@ DEFINE_string(file_checksum_impl, "none",
"Name of an implementation for file_checksum_gen_factory, or " "Name of an implementation for file_checksum_gen_factory, or "
"\"none\" for null."); "\"none\" for null.");
DEFINE_int32(write_fault_one_in, 0,
"On non-zero, enables fault injection on write");
#endif // GFLAGS #endif // GFLAGS

View File

@ -29,6 +29,7 @@ DECLARE_bool(test_batches_snapshots);
DECLARE_int32(compaction_thread_pool_adjust_interval); DECLARE_int32(compaction_thread_pool_adjust_interval);
DECLARE_int32(continuous_verification_interval); DECLARE_int32(continuous_verification_interval);
DECLARE_int32(read_fault_one_in); DECLARE_int32(read_fault_one_in);
DECLARE_int32(write_fault_one_in);
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class StressTest; class StressTest;

View File

@ -15,6 +15,7 @@
#include "db_stress_tool/db_stress_table_properties_collector.h" #include "db_stress_tool/db_stress_table_properties_collector.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/sst_file_manager.h" #include "rocksdb/sst_file_manager.h"
#include "rocksdb/types.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "utilities/fault_injection_fs.h" #include "utilities/fault_injection_fs.h"
@ -525,6 +526,16 @@ void StressTest::OperateDb(ThreadState* thread) {
fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(), fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
FLAGS_read_fault_one_in); FLAGS_read_fault_one_in);
} }
if (FLAGS_write_fault_one_in) {
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
error_msg.SetRetryable(true);
std::vector<FileType> types;
types.push_back(FileType::kTableFile);
types.push_back(FileType::kDescriptorFile);
types.push_back(FileType::kCurrentFile);
fault_fs_guard->SetRandomWriteError(
thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg, types);
}
#endif // NDEBUG #endif // NDEBUG
thread->stats.Start(); thread->stats.Start();
for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) { for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
@ -618,7 +629,8 @@ void StressTest::OperateDb(ThreadState* thread) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
// Verify GetLiveFiles with a 1 in N chance. // Verify GetLiveFiles with a 1 in N chance.
if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in)) { if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in) &&
!FLAGS_write_fault_one_in) {
Status status = VerifyGetLiveFiles(); Status status = VerifyGetLiveFiles();
if (!status.ok()) { if (!status.ok()) {
VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status); VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status);
@ -1460,7 +1472,7 @@ Status StressTest::TestCheckpoint(ThreadState* thread,
FLAGS_db + "/.checkpoint" + ToString(thread->tid); FLAGS_db + "/.checkpoint" + ToString(thread->tid);
Options tmp_opts(options_); Options tmp_opts(options_);
tmp_opts.listeners.clear(); tmp_opts.listeners.clear();
tmp_opts.env = db_stress_env->target(); tmp_opts.env = db_stress_env;
DestroyDB(checkpoint_dir, tmp_opts); DestroyDB(checkpoint_dir, tmp_opts);
@ -1952,6 +1964,7 @@ void StressTest::PrintEnv() const {
fprintf(stdout, "Use dynamic level : %d\n", fprintf(stdout, "Use dynamic level : %d\n",
static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes)); static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in); fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in);
fprintf(stdout, "Write fault one in : %d\n", FLAGS_write_fault_one_in);
fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection); fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection);
fprintf(stdout, "Best efforts recovery : %d\n", fprintf(stdout, "Best efforts recovery : %d\n",
static_cast<int>(FLAGS_best_efforts_recovery)); static_cast<int>(FLAGS_best_efforts_recovery));

View File

@ -97,20 +97,42 @@ int db_stress_tool(int argc, char** argv) {
} }
#ifndef NDEBUG #ifndef NDEBUG
if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection) { if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection ||
FLAGS_write_fault_one_in) {
FaultInjectionTestFS* fs = FaultInjectionTestFS* fs =
new FaultInjectionTestFS(raw_env->GetFileSystem()); new FaultInjectionTestFS(raw_env->GetFileSystem());
fault_fs_guard.reset(fs); fault_fs_guard.reset(fs);
if (FLAGS_write_fault_one_in) {
fault_fs_guard->SetFilesystemDirectWritable(false);
} else {
fault_fs_guard->SetFilesystemDirectWritable(true); fault_fs_guard->SetFilesystemDirectWritable(true);
}
fault_env_guard = fault_env_guard =
std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard); std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard);
raw_env = fault_env_guard.get(); raw_env = fault_env_guard.get();
} }
if (FLAGS_write_fault_one_in) {
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:BeforeFinishBuildTable",
[&](void*) { fault_fs_guard->EnableWriteErrorInjection(); });
SyncPoint::GetInstance()->EnableProcessing();
}
#endif #endif
env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env); env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env);
db_stress_env = env_wrapper_guard.get(); db_stress_env = env_wrapper_guard.get();
#ifndef NDEBUG
if (FLAGS_write_fault_one_in) {
// In the write injection case, we need to use the FS interface and returns
// the IOStatus with different error and flags. Therefore,
// DbStressEnvWrapper cannot be used which will swallow the FS
// implementations. We should directly use the raw_env which is the
// CompositeEnvWrapper of env and fault_fs.
db_stress_env = raw_env;
}
#endif
FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str()); FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
// The number of background threads should be at least as much the // The number of background threads should be at least as much the

View File

@ -299,6 +299,7 @@ def finalize_and_sanitize(src_params):
if dest_params.get("disable_wal", 0) == 1: if dest_params.get("disable_wal", 0) == 1:
dest_params["atomic_flush"] = 1 dest_params["atomic_flush"] = 1
dest_params["sync"] = 0 dest_params["sync"] = 0
dest_params["write_fault_one_in"] = 0
if dest_params.get("open_files", 1) != -1: if dest_params.get("open_files", 1) != -1:
# Compaction TTL and periodic compactions are only compatible # Compaction TTL and periodic compactions are only compatible
# with open_files = -1 # with open_files = -1

View File

@ -99,7 +99,8 @@ IOStatus TestFSWritableFile::Append(const Slice& data, const IOOptions&,
state_.buffer_.append(data.data(), data.size()); state_.buffer_.append(data.data(), data.size());
state_.pos_ += data.size(); state_.pos_ += data.size();
fs_->WritableFileAppended(state_); fs_->WritableFileAppended(state_);
return IOStatus::OK(); IOStatus io_s = fs_->InjectWriteError(state_.filename_);
return io_s;
} }
IOStatus TestFSWritableFile::Close(const IOOptions& options, IOStatus TestFSWritableFile::Close(const IOOptions& options,
@ -536,6 +537,34 @@ IOStatus FaultInjectionTestFS::InjectError(ErrorOperation op,
return IOStatus::OK(); return IOStatus::OK();
} }
IOStatus FaultInjectionTestFS::InjectWriteError(const std::string& file_name) {
MutexLock l(&mutex_);
if (!enable_write_error_injection_ || !write_error_one_in_) {
return IOStatus::OK();
}
bool allowed_type = false;
uint64_t number;
FileType cur_type = kTempFile;
std::size_t found = file_name.find_last_of("/");
std::string file = file_name.substr(found);
bool ret = ParseFileName(file, &number, &cur_type);
if (ret) {
for (const auto& type : write_error_allowed_types_) {
if (cur_type == type) {
allowed_type = true;
}
}
}
if (allowed_type) {
if (write_error_rand_.OneIn(write_error_one_in_)) {
return GetError();
}
}
return IOStatus::OK();
}
void FaultInjectionTestFS::PrintFaultBacktrace() { void FaultInjectionTestFS::PrintFaultBacktrace() {
#if defined(OS_LINUX) #if defined(OS_LINUX)
ErrorContext* ctx = ErrorContext* ctx =

View File

@ -172,8 +172,9 @@ class FaultInjectionTestFS : public FileSystemWrapper {
: FileSystemWrapper(base), : FileSystemWrapper(base),
filesystem_active_(true), filesystem_active_(true),
filesystem_writable_(false), filesystem_writable_(false),
thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)) { thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)),
} enable_write_error_injection_(false),
write_error_rand_(0) {}
virtual ~FaultInjectionTestFS() { error_.PermitUncheckedError(); } virtual ~FaultInjectionTestFS() { error_.PermitUncheckedError(); }
const char* Name() const override { return "FaultInjectionTestFS"; } const char* Name() const override { return "FaultInjectionTestFS"; }
@ -316,6 +317,27 @@ class FaultInjectionTestFS : public FileSystemWrapper {
delete ctx; delete ctx;
} }
// This is to set the parameters for the write error injection.
// seed is the seed for the random number generator, and one_in determines
// the probability of injecting error (i.e an error is injected with
// 1/one_in probability). For write error, we can specify the error we
// want to inject. Types decides the file types we want to inject the
// error (e.g., Wal files, SST files), which is empty by default.
void SetRandomWriteError(uint32_t seed, int one_in, IOStatus error,
const std::vector<FileType>& types) {
MutexLock l(&mutex_);
Random tmp_rand(seed);
error.PermitUncheckedError();
error_ = error;
write_error_rand_ = tmp_rand;
write_error_one_in_ = one_in;
write_error_allowed_types_ = types;
}
// Inject an write error with randomlized parameter and the predefined
// error type. Only the allowed file types will inject the write error
IOStatus InjectWriteError(const std::string& file_name);
// Inject an error. For a READ operation, a status of IOError(), a // Inject an error. For a READ operation, a status of IOError(), a
// corruption in the contents of scratch, or truncation of slice // corruption in the contents of scratch, or truncation of slice
// are the types of error with equal probability. For OPEN, // are the types of error with equal probability. For OPEN,
@ -343,6 +365,16 @@ class FaultInjectionTestFS : public FileSystemWrapper {
} }
} }
void EnableWriteErrorInjection() {
MutexLock l(&mutex_);
enable_write_error_injection_ = true;
}
void DisableWriteErrorInjection() {
MutexLock l(&mutex_);
enable_write_error_injection_ = false;
}
void DisableErrorInjection() { void DisableErrorInjection() {
ErrorContext* ctx = ErrorContext* ctx =
static_cast<ErrorContext*>(thread_local_error_->Get()); static_cast<ErrorContext*>(thread_local_error_->Get());
@ -396,6 +428,10 @@ class FaultInjectionTestFS : public FileSystemWrapper {
}; };
std::unique_ptr<ThreadLocalPtr> thread_local_error_; std::unique_ptr<ThreadLocalPtr> thread_local_error_;
bool enable_write_error_injection_;
Random write_error_rand_;
int write_error_one_in_;
std::vector<FileType> write_error_allowed_types_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE