db_stress support tracking historical values (#8960)

Summary:
When `--sync_fault_injection` is set, this PR takes a snapshot of the expected values and starts an operation trace when the DB is opened. These files are stored in `--expected_values_dir`. They will be used for recovering the expected state of the DB following a crash where a suffix of unsynced operations are allowed to be lost.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8960

Test Plan: injected crashed at various points in `FileExpectedStateManager` and verified the next run recovers the state/trace file with highest seqno and removes all older/temporary files. Note we don't use sync_fault_injection in CI crash tests yet.

Reviewed By: pdillinger

Differential Revision: D31194941

Pulled By: ajkr

fbshipit-source-id: b0f935a529a0186c5a9c7709fcaa8829de8a84cf
This commit is contained in:
Andrew Kryczka 2021-12-07 13:40:46 -08:00 committed by Facebook GitHub Bot
parent 88875df821
commit a6a6aad74e
9 changed files with 236 additions and 22 deletions

View File

@ -16,6 +16,8 @@ class BatchedOpsStressTest : public StressTest {
BatchedOpsStressTest() {}
virtual ~BatchedOpsStressTest() {}
bool IsStateTracked() const override { return false; }
// Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
// ("9"+K, "9"+V) in DB atomically i.e in a single batch.
// Also refer BatchedOpsStressTest::TestGet

View File

@ -18,6 +18,8 @@ class CfConsistencyStressTest : public StressTest {
~CfConsistencyStressTest() override {}
bool IsStateTracked() const override { return false; }
Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& /* read_opts */,
const std::vector<int>& rand_column_families,

View File

@ -463,12 +463,13 @@ DEFINE_bool(test_secondary, false, "Test secondary instance.");
DEFINE_string(
expected_values_dir, "",
"Dir where file with array of expected uint32_t values will be stored. If "
"provided and non-empty, the DB state will be verified against these "
"values after recovery. --max_key and --column_family must be kept the "
"same across invocations of this program that use the same "
"--expected_values_dir. See --seed and --nooverwritepercent for further "
"requirements.");
"Dir where files containing info about the latest/historical values will "
"be stored. If provided and non-empty, the DB state will be verified "
"against values from these files after recovery. --max_key and "
"--column_family must be kept the same across invocations of this program "
"that use the same --expected_values_dir. Currently historical values are "
"only tracked when --sync_fault_injection is set. See --seed and "
"--nooverwritepercent for further requirements.");
DEFINE_bool(verify_checksum, false,
"Verify checksum for every block read from storage");
@ -805,7 +806,10 @@ DEFINE_int32(get_property_one_in, 1000,
DEFINE_bool(sync_fault_injection, false,
"If true, FaultInjectionTestFS will be used for write operations, "
" and unsynced data in DB will lost after crash.");
"and unsynced data in DB will lost after crash. In such a case we "
"track DB changes in a trace file (\"*.trace\") in "
"--expected_values_dir for verifying there are no holes in the "
"recovered data (future work).");
DEFINE_bool(best_efforts_recovery, false,
"If true, use best efforts recovery.");

View File

@ -250,6 +250,10 @@ class SharedState {
}
}
Status SaveAtAndAfter(DB* db) {
return expected_state_manager_->SaveAtAndAfter(db);
}
// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf) {
return expected_state_manager_->ClearColumnFamily(cf);

View File

@ -307,6 +307,15 @@ void StressTest::FinishInitDb(SharedState* shared) {
fprintf(stdout, "Compaction filter factory: %s\n",
compaction_filter_factory->Name());
}
// TODO(ajkr): First restore if there's already a trace.
if (FLAGS_sync_fault_injection && IsStateTracked()) {
Status s = shared->SaveAtAndAfter(db_);
if (!s.ok()) {
fprintf(stderr, "Error enabling history tracing: %s\n",
s.ToString().c_str());
exit(1);
}
}
}
bool StressTest::VerifySecondaries() {
@ -2790,6 +2799,15 @@ void StressTest::Reopen(ThreadState* thread) {
fprintf(stdout, "%s Reopening database for the %dth time\n",
clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
Open();
if (FLAGS_sync_fault_injection && IsStateTracked()) {
Status s = thread->shared->SaveAtAndAfter(db_);
if (!s.ok()) {
fprintf(stderr, "Error enabling history tracing: %s\n",
s.ToString().c_str());
exit(1);
}
}
}
void StressTest::CheckAndSetOptionsForUserTimestamp() {

View File

@ -65,6 +65,9 @@ class StressTest {
virtual bool ShouldAcquireMutexOnKey() const { return false; }
// Returns true if DB state is tracked by the stress test.
virtual bool IsStateTracked() const = 0;
virtual std::vector<int> GenerateColumnFamilies(
const int /* num_column_families */, int rand_column_family) const {
return {rand_column_family};

View File

@ -8,6 +8,7 @@
#include "db_stress_tool/expected_state.h"
#include "db_stress_tool/db_stress_shared_state.h"
#include "rocksdb/trace_reader_writer.h"
namespace ROCKSDB_NAMESPACE {
@ -145,7 +146,11 @@ ExpectedStateManager::ExpectedStateManager(size_t max_key,
ExpectedStateManager::~ExpectedStateManager() {}
const std::string FileExpectedStateManager::kLatestFilename = "LATEST.state";
const std::string FileExpectedStateManager::kLatestBasename = "LATEST";
const std::string FileExpectedStateManager::kStateFilenameSuffix = ".state";
const std::string FileExpectedStateManager::kTraceFilenameSuffix = ".trace";
const std::string FileExpectedStateManager::kTempFilenamePrefix = ".";
const std::string FileExpectedStateManager::kTempFilenameSuffix = ".tmp";
FileExpectedStateManager::FileExpectedStateManager(
size_t max_key, size_t num_column_families,
@ -156,9 +161,58 @@ FileExpectedStateManager::FileExpectedStateManager(
}
Status FileExpectedStateManager::Open() {
Status s = Clean();
// Before doing anything, sync directory state with ours. That is, determine
// `saved_seqno_`, and create any necessary missing files.
std::vector<std::string> expected_state_dir_children;
Status s = Env::Default()->GetChildren(expected_state_dir_path_,
&expected_state_dir_children);
bool found_trace = false;
if (s.ok()) {
for (size_t i = 0; i < expected_state_dir_children.size(); ++i) {
const auto& filename = expected_state_dir_children[i];
if (filename.size() >= kStateFilenameSuffix.size() &&
filename.rfind(kStateFilenameSuffix) ==
filename.size() - kStateFilenameSuffix.size() &&
filename.rfind(kLatestBasename, 0) == std::string::npos) {
SequenceNumber found_seqno = ParseUint64(
filename.substr(0, filename.size() - kStateFilenameSuffix.size()));
if (saved_seqno_ == kMaxSequenceNumber || found_seqno > saved_seqno_) {
saved_seqno_ = found_seqno;
}
}
}
// Check if crash happened after creating state file but before creating
// trace file.
if (saved_seqno_ != kMaxSequenceNumber) {
std::string saved_seqno_trace_path =
GetPathForFilename(ToString(saved_seqno_) + kTraceFilenameSuffix);
Status exists_status = Env::Default()->FileExists(saved_seqno_trace_path);
if (exists_status.ok()) {
found_trace = true;
} else if (exists_status.IsNotFound()) {
found_trace = false;
} else {
s = exists_status;
}
}
}
if (s.ok() && saved_seqno_ != kMaxSequenceNumber && !found_trace) {
// Create an empty trace file so later logic does not need to distinguish
// missing vs. empty trace file.
std::unique_ptr<WritableFile> wfile;
const EnvOptions soptions;
std::string saved_seqno_trace_path =
GetPathForFilename(ToString(saved_seqno_) + kTraceFilenameSuffix);
s = Env::Default()->NewWritableFile(saved_seqno_trace_path, &wfile,
soptions);
}
std::string expected_state_file_path = GetPathForFilename(kLatestFilename);
if (s.ok()) {
s = Clean();
}
std::string expected_state_file_path =
GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
bool found = false;
if (s.ok()) {
Status exists_status = Env::Default()->FileExists(expected_state_file_path);
@ -176,7 +230,7 @@ Status FileExpectedStateManager::Open() {
// this process is killed during setup, `Clean()` will take care of removing
// the incomplete expected values file.
std::string temp_expected_state_file_path =
GetTempPathForFilename(kLatestFilename);
GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix);
FileExpectedState temp_expected_state(temp_expected_state_file_path,
max_key_, num_column_families_);
if (s.ok()) {
@ -196,23 +250,115 @@ Status FileExpectedStateManager::Open() {
return s;
}
Status FileExpectedStateManager::Clean() {
// An incomplete `Open()` could have left behind an invalid temporary file.
std::string temp_path = GetTempPathForFilename(kLatestFilename);
Status s = Env::Default()->FileExists(temp_path);
#ifndef ROCKSDB_LITE
Status FileExpectedStateManager::SaveAtAndAfter(DB* db) {
SequenceNumber seqno = db->GetLatestSequenceNumber();
std::string state_filename = ToString(seqno) + kStateFilenameSuffix;
std::string state_file_temp_path = GetTempPathForFilename(state_filename);
std::string state_file_path = GetPathForFilename(state_filename);
std::string latest_file_path =
GetPathForFilename(kLatestBasename + kStateFilenameSuffix);
std::string trace_filename = ToString(seqno) + kTraceFilenameSuffix;
std::string trace_file_path = GetPathForFilename(trace_filename);
// Populate a tempfile and then rename it to atomically create "<seqno>.state"
// with contents from "LATEST.state"
Status s =
CopyFile(FileSystem::Default(), latest_file_path, state_file_temp_path,
0 /* size */, false /* use_fsync */);
if (s.ok()) {
s = Env::Default()->DeleteFile(temp_path);
} else if (s.IsNotFound()) {
s = Status::OK();
s = FileSystem::Default()->RenameFile(state_file_temp_path, state_file_path,
IOOptions(), nullptr /* dbg */);
}
SequenceNumber old_saved_seqno;
if (s.ok()) {
old_saved_seqno = saved_seqno_;
saved_seqno_ = seqno;
}
// If there is a crash now, i.e., after "<seqno>.state" was created but before
// "<seqno>.trace" is created, it will be treated as if "<seqno>.trace" were
// present but empty.
// Create "<seqno>.trace" directly. It is initially empty so no need for
// tempfile.
std::unique_ptr<TraceWriter> trace_writer;
if (s.ok()) {
EnvOptions soptions;
// Disable buffering so traces will not get stuck in application buffer.
soptions.writable_file_max_buffer_size = 0;
s = NewFileTraceWriter(Env::Default(), soptions, trace_file_path,
&trace_writer);
}
if (s.ok()) {
s = db->StartTrace(TraceOptions(), std::move(trace_writer));
}
// Delete old state/trace files. Deletion order does not matter since we only
// delete after successfully saving new files, so old files will never be used
// again, even if we crash.
if (s.ok() && old_saved_seqno != kMaxSequenceNumber &&
old_saved_seqno != saved_seqno_) {
s = Env::Default()->DeleteFile(
GetPathForFilename(ToString(old_saved_seqno) + kStateFilenameSuffix));
}
if (s.ok() && old_saved_seqno != kMaxSequenceNumber &&
old_saved_seqno != saved_seqno_) {
s = Env::Default()->DeleteFile(
GetPathForFilename(ToString(old_saved_seqno) + kTraceFilenameSuffix));
}
return s;
}
#else // ROCKSDB_LITE
Status FileExpectedStateManager::SaveAtAndAfter(DB* /* db */) {
return Status::NotSupported();
}
#endif // ROCKSDB_LITE
Status FileExpectedStateManager::Clean() {
std::vector<std::string> expected_state_dir_children;
Status s = Env::Default()->GetChildren(expected_state_dir_path_,
&expected_state_dir_children);
// An incomplete `Open()` or incomplete `SaveAtAndAfter()` could have left
// behind invalid temporary files. An incomplete `SaveAtAndAfter()` could have
// also left behind stale state/trace files.
for (size_t i = 0; s.ok() && i < expected_state_dir_children.size(); ++i) {
const auto& filename = expected_state_dir_children[i];
if (filename.rfind(kTempFilenamePrefix, 0 /* pos */) == 0 &&
filename.size() >= kTempFilenameSuffix.size() &&
filename.rfind(kTempFilenameSuffix) ==
filename.size() - kTempFilenameSuffix.size()) {
// Delete all temp files.
s = Env::Default()->DeleteFile(GetPathForFilename(filename));
} else if (filename.size() >= kStateFilenameSuffix.size() &&
filename.rfind(kStateFilenameSuffix) ==
filename.size() - kStateFilenameSuffix.size() &&
filename.rfind(kLatestBasename, 0) == std::string::npos &&
ParseUint64(filename.substr(
0, filename.size() - kStateFilenameSuffix.size())) <
saved_seqno_) {
assert(saved_seqno_ != kMaxSequenceNumber);
// Delete stale state files.
s = Env::Default()->DeleteFile(GetPathForFilename(filename));
} else if (filename.size() >= kTraceFilenameSuffix.size() &&
filename.rfind(kTraceFilenameSuffix) ==
filename.size() - kTraceFilenameSuffix.size() &&
ParseUint64(filename.substr(
0, filename.size() - kTraceFilenameSuffix.size())) <
saved_seqno_) {
assert(saved_seqno_ != kMaxSequenceNumber);
// Delete stale trace files.
s = Env::Default()->DeleteFile(GetPathForFilename(filename));
}
}
return s;
}
std::string FileExpectedStateManager::GetTempPathForFilename(
const std::string& filename) {
static const std::string kTempFilenamePrefix = ".";
static const std::string kTempFilenameSuffix = ".tmp";
assert(!expected_state_dir_path_.empty());
std::string expected_state_dir_path_slash =
expected_state_dir_path_.back() == '/' ? expected_state_dir_path_

View File

@ -12,8 +12,14 @@
#include <atomic>
#include <memory>
#include "db/dbformat.h"
#include "file/file_util.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/types.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
@ -126,6 +132,14 @@ class ExpectedStateManager {
// member function.
virtual Status Open() = 0;
// Saves expected values for the current state of `db` and begins tracking
// changes.
//
// Requires external locking preventing concurrent execution with any other
// member function. Furthermore, `db` must not be mutated while this function
// is executing.
virtual Status SaveAtAndAfter(DB* db) = 0;
// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); }
@ -186,6 +200,12 @@ class FileExpectedStateManager : public ExpectedStateManager {
// member function.
Status Open() override;
// See `ExpectedStateManager::SaveAtAndAfter()` API doc.
//
// This implementation makes a copy of "LATEST.state" into
// "<current seqno>.state", and starts a trace in "<current seqno>.trace".
Status SaveAtAndAfter(DB* db) override;
private:
// Requires external locking preventing concurrent execution with any other
// member function.
@ -194,9 +214,14 @@ class FileExpectedStateManager : public ExpectedStateManager {
std::string GetTempPathForFilename(const std::string& filename);
std::string GetPathForFilename(const std::string& filename);
static const std::string kLatestFilename;
static const std::string kLatestBasename;
static const std::string kStateFilenameSuffix;
static const std::string kTraceFilenameSuffix;
static const std::string kTempFilenamePrefix;
static const std::string kTempFilenameSuffix;
const std::string expected_state_dir_path_;
SequenceNumber saved_seqno_ = kMaxSequenceNumber;
};
// An `AnonExpectedStateManager` implements an `ExpectedStateManager` backed by
@ -205,6 +230,14 @@ class AnonExpectedStateManager : public ExpectedStateManager {
public:
explicit AnonExpectedStateManager(size_t max_key, size_t num_column_families);
// See `ExpectedStateManager::SaveAtAndAfter()` API doc.
//
// This implementation returns `Status::NotSupported` since we do not
// currently have a need to keep history of expected state within a process.
Status SaveAtAndAfter(DB* /* db */) override {
return Status::NotSupported();
}
// Requires external locking preventing concurrent execution with any other
// member function.
Status Open() override;

View File

@ -179,6 +179,8 @@ class NonBatchedOpsStressTest : public StressTest {
bool ShouldAcquireMutexOnKey() const override { return true; }
bool IsStateTracked() const override { return true; }
Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {