Add a new IOStatus subcode to indicate that writes are fenced off (#7374)
Summary: In a distributed file system, directory ownership is enforced by fencing off the previous owner once they've been preempted by a new owner. This PR adds a IOStatus subcode for ```StatusCode::IOError``` to indicate this. Once this error is returned for a file write, the DB is put in read-only mode and not allowed to resume in read-write mode. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7374 Test Plan: Add new unit tests in ```error_handler_fs_test``` Reviewed By: riversand963 Differential Revision: D23687777 Pulled By: anand1976 fbshipit-source-id: bef948642089dc0af399057864d9a8ca339e8b2f
This commit is contained in:
parent
ecc8ffe17b
commit
bb95ed284d
@ -833,6 +833,7 @@ void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
|
||||
// Is setting bg_error_ enough here? This will at least stop
|
||||
// compaction and fail any further writes.
|
||||
// Caller must hold mutex_.
|
||||
assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
|
||||
mutex_.AssertHeld();
|
||||
if (immutable_db_options_.paranoid_checks && !status.ok() &&
|
||||
!status.IsBusy() && !status.IsIncomplete()) {
|
||||
@ -843,6 +844,7 @@ void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
|
||||
void DBImpl::WriteStatusCheck(const Status& status) {
|
||||
// Is setting bg_error_ enough here? This will at least stop
|
||||
// compaction and fail any further writes.
|
||||
assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
|
||||
if (immutable_db_options_.paranoid_checks && !status.ok() &&
|
||||
!status.IsBusy() && !status.IsIncomplete()) {
|
||||
mutex_.Lock();
|
||||
@ -854,8 +856,9 @@ void DBImpl::WriteStatusCheck(const Status& status) {
|
||||
void DBImpl::IOStatusCheck(const IOStatus& io_status) {
|
||||
// Is setting bg_error_ enough here? This will at least stop
|
||||
// compaction and fail any further writes.
|
||||
if (immutable_db_options_.paranoid_checks && !io_status.ok() &&
|
||||
!io_status.IsBusy() && !io_status.IsIncomplete()) {
|
||||
if ((immutable_db_options_.paranoid_checks && !io_status.ok() &&
|
||||
!io_status.IsBusy() && !io_status.IsIncomplete()) ||
|
||||
io_status.IsIOFenced()) {
|
||||
mutex_.Lock();
|
||||
error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback);
|
||||
mutex_.Unlock();
|
||||
|
@ -32,6 +32,14 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
|
||||
Status::Code::kIOError, Status::SubCode::kSpaceLimit,
|
||||
true),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kCompaction,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kCompaction,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
false),
|
||||
Status::Severity::kFatalError},
|
||||
// Errors during BG flush
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
|
||||
Status::SubCode::kNoSpace, true),
|
||||
@ -42,6 +50,12 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
|
||||
Status::SubCode::kSpaceLimit, true),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
|
||||
Status::SubCode::kIOFenced, true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
|
||||
Status::SubCode::kIOFenced, false),
|
||||
Status::Severity::kFatalError},
|
||||
// Errors during Write
|
||||
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
@ -51,6 +65,14 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
false),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
false),
|
||||
Status::Severity::kFatalError},
|
||||
// Errors during MANIFEST write
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
@ -60,6 +82,14 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
|
||||
Status::Code::kIOError, Status::SubCode::kNoSpace,
|
||||
false),
|
||||
Status::Severity::kHardError},
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
true),
|
||||
Status::Severity::kFatalError},
|
||||
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
|
||||
Status::Code::kIOError, Status::SubCode::kIOFenced,
|
||||
false),
|
||||
Status::Severity::kFatalError},
|
||||
};
|
||||
|
||||
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>,
|
||||
|
@ -1962,6 +1962,194 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) {
|
||||
Close();
|
||||
}
|
||||
|
||||
class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest,
|
||||
public testing::WithParamInterface<bool> {};
|
||||
|
||||
TEST_P(DBErrorHandlingFencingTest, FLushWriteFenced) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.paranoid_checks = GetParam();
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(true);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
Put(Key(0), "val");
|
||||
SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
|
||||
fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_P(DBErrorHandlingFencingTest, ManifestWriteFenced) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.paranoid_checks = GetParam();
|
||||
Status s;
|
||||
std::string old_manifest;
|
||||
std::string new_manifest;
|
||||
|
||||
listener->EnableAutoRecovery(true);
|
||||
DestroyAndReopen(options);
|
||||
old_manifest = GetManifestNameFromLiveFiles();
|
||||
|
||||
Put(Key(0), "val");
|
||||
Flush();
|
||||
Put(Key(1), "val");
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionSet::LogAndApply:WriteManifest", [&](void*) {
|
||||
fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_P(DBErrorHandlingFencingTest, CompactionWriteFenced) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.paranoid_checks = GetParam();
|
||||
Status s;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
Put(Key(0), "va;");
|
||||
Put(Key(2), "va;");
|
||||
s = Flush();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
|
||||
listener->EnableAutoRecovery(true);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::FlushMemTable:FlushMemTableFinished",
|
||||
"BackgroundCallCompaction:0"}});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"BackgroundCallCompaction:0", [&](void*) {
|
||||
fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
|
||||
});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Put(Key(1), "val");
|
||||
s = Flush();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
|
||||
s = dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_P(DBErrorHandlingFencingTest, WALWriteFenced) {
|
||||
std::shared_ptr<FaultInjectionTestFS> fault_fs(
|
||||
new FaultInjectionTestFS(FileSystem::Default()));
|
||||
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener(
|
||||
new ErrorHandlerFSListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_fs_env.get();
|
||||
options.create_if_missing = true;
|
||||
options.writable_file_max_buffer_size = 32768;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.paranoid_checks = GetParam();
|
||||
Status s;
|
||||
Random rnd(301);
|
||||
|
||||
listener->EnableAutoRecovery(true);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
{
|
||||
WriteBatch batch;
|
||||
|
||||
for (auto i = 0; i < 100; ++i) {
|
||||
batch.Put(Key(i), rnd.RandomString(1024));
|
||||
}
|
||||
|
||||
WriteOptions wopts;
|
||||
wopts.sync = true;
|
||||
ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
|
||||
};
|
||||
|
||||
{
|
||||
WriteBatch batch;
|
||||
int write_error = 0;
|
||||
|
||||
for (auto i = 100; i < 199; ++i) {
|
||||
batch.Put(Key(i), rnd.RandomString(1024));
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
|
||||
write_error++;
|
||||
if (write_error > 2) {
|
||||
fault_fs->SetFilesystemActive(false,
|
||||
IOStatus::IOFenced("IO fenced"));
|
||||
}
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
WriteOptions wopts;
|
||||
wopts.sync = true;
|
||||
s = dbfull()->Write(wopts, &batch);
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
}
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs->SetFilesystemActive(true);
|
||||
{
|
||||
WriteBatch batch;
|
||||
|
||||
for (auto i = 0; i < 100; ++i) {
|
||||
batch.Put(Key(i), rnd.RandomString(1024));
|
||||
}
|
||||
|
||||
WriteOptions wopts;
|
||||
wopts.sync = true;
|
||||
s = dbfull()->Write(wopts, &batch);
|
||||
ASSERT_TRUE(s.IsIOFenced());
|
||||
}
|
||||
Close();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBErrorHandlingFSTest, DBErrorHandlingFencingTest,
|
||||
::testing::Bool());
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -126,6 +126,11 @@ class IOStatus : public Status {
|
||||
return IOStatus(kIOError, kPathNotFound, msg, msg2);
|
||||
}
|
||||
|
||||
static IOStatus IOFenced() { return IOStatus(kIOError, kIOFenced); }
|
||||
static IOStatus IOFenced(const Slice& msg, const Slice& msg2 = Slice()) {
|
||||
return IOStatus(kIOError, kIOFenced, msg, msg2);
|
||||
}
|
||||
|
||||
// Return a string representation of this status suitable for printing.
|
||||
// Returns the string "OK" for success.
|
||||
// std::string ToString() const;
|
||||
|
@ -113,6 +113,7 @@ class Status {
|
||||
kManualCompactionPaused = 11,
|
||||
kOverwritten = 12,
|
||||
kTxnNotPrepared = 13,
|
||||
kIOFenced = 14,
|
||||
kMaxSubCode
|
||||
};
|
||||
|
||||
@ -482,6 +483,14 @@ class Status {
|
||||
return (code() == kInvalidArgument) && (subcode() == kTxnNotPrepared);
|
||||
}
|
||||
|
||||
// Returns true iff the status indicates a IOFenced error.
|
||||
bool IsIOFenced() const {
|
||||
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
checked_ = true;
|
||||
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
return (code() == kIOError) && (subcode() == kIOFenced);
|
||||
}
|
||||
|
||||
// Return a string representation of this status suitable for printing.
|
||||
// Returns the string "OK" for success.
|
||||
std::string ToString() const;
|
||||
|
@ -52,7 +52,9 @@ static const char* msgs[static_cast<int>(Status::kMaxSubCode)] = {
|
||||
"Insufficient capacity for merge operands",
|
||||
// kManualCompactionPaused
|
||||
"Manual compaction paused",
|
||||
" (overwritten)", // kOverwritten, subcode of OK
|
||||
" (overwritten)", // kOverwritten, subcode of OK
|
||||
"Txn not prepared", // kTxnNotPrepared
|
||||
"IO fenced off", // kIOFenced
|
||||
};
|
||||
|
||||
Status::Status(Code _code, SubCode _subcode, const Slice& msg,
|
||||
|
Loading…
Reference in New Issue
Block a user