Force a new manifest file if append to current one fails (#6331)
Summary: Fix for issue https://github.com/facebook/rocksdb/issues/6316 When an append/sync of the manifest file fails due to an IO error such as NoSpace, we don't always put the DB in read-only mode. This is true for flush and compactions, as well as foreground operatons such as column family add/drop, CompactFiles etc. Subsequent changes to the DB will be recorded in the same manifest file, which would have a corrupted record in the middle due to the previous failure. On next DB::Open(), it will fail to process the full manifest and data will be lost. To fix this, we reset VersionSet::descriptor_log_ on append/sync failure, which will force a new manifest file to be written on the next append. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6331 Test Plan: Add new unit tests in error_handler_test.cc Differential Revision: D19632951 Pulled By: anand1976 fbshipit-source-id: 68d527cb6e59a94cbbbf9f5a17a7f464381d51e3
This commit is contained in:
parent
f6b3de76e5
commit
0bc8750e82
@ -17,6 +17,7 @@
|
||||
* Fixed an issue where the thread pools were not resized upon setting `max_background_jobs` dynamically through the `SetDBOptions` interface.
|
||||
* Fix a bug that can cause write threads to hang when a slowdown/stall happens and there is a mix of writers with WriteOptions::no_slowdown set/unset.
|
||||
* Fixed an issue where an incorrect "number of input records" value was used to compute the "records dropped" statistics for compactions.
|
||||
* Fixed issue #6316 that can cause a corruption of the MANIFEST file in the middle when writing to it fails due to no disk space.
|
||||
|
||||
### New Features
|
||||
* It is now possible to enable periodic compactions for the base DB when using BlobDB.
|
||||
|
@ -166,12 +166,6 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Check if recovery is currently in progress. If it is, we will save this
|
||||
// error so we can check it at the end to see if recovery succeeded or not
|
||||
if (recovery_in_prog_ && recovery_error_.ok()) {
|
||||
recovery_error_ = bg_err;
|
||||
}
|
||||
|
||||
bool paranoid = db_options_.paranoid_checks;
|
||||
Status::Severity sev = Status::Severity::kFatalError;
|
||||
Status new_bg_err;
|
||||
@ -204,10 +198,15 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas
|
||||
|
||||
new_bg_err = Status(bg_err, sev);
|
||||
|
||||
// Check if recovery is currently in progress. If it is, we will save this
|
||||
// error so we can check it at the end to see if recovery succeeded or not
|
||||
if (recovery_in_prog_ && recovery_error_.ok()) {
|
||||
recovery_error_ = new_bg_err;
|
||||
}
|
||||
|
||||
bool auto_recovery = auto_recovery_;
|
||||
if (new_bg_err.severity() >= Status::Severity::kFatalError && auto_recovery) {
|
||||
auto_recovery = false;
|
||||
;
|
||||
}
|
||||
|
||||
// Allow some error specific overrides
|
||||
|
@ -22,6 +22,21 @@ namespace rocksdb {
|
||||
class DBErrorHandlingTest : public DBTestBase {
|
||||
public:
|
||||
DBErrorHandlingTest() : DBTestBase("/db_error_handling_test") {}
|
||||
|
||||
std::string GetManifestNameFromLiveFiles() {
|
||||
std::vector<std::string> live_files;
|
||||
uint64_t manifest_size;
|
||||
|
||||
dbfull()->GetLiveFiles(live_files, &manifest_size, false);
|
||||
for (auto& file : live_files) {
|
||||
uint64_t num = 0;
|
||||
FileType type;
|
||||
if (ParseFileName(file, &num, &type) && type == kDescriptorFile) {
|
||||
return file;
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
};
|
||||
|
||||
class DBErrorHandlingEnv : public EnvWrapper {
|
||||
@ -161,6 +176,169 @@ TEST_F(DBErrorHandlingTest, FLushWriteError) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingTest, ManifestWriteError) {
|
||||
std::unique_ptr<FaultInjectionTestEnv> fault_env(
|
||||
new FaultInjectionTestEnv(Env::Default()));
|
||||
std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.create_if_missing = true;
|
||||
options.env = fault_env.get();
|
||||
options.listeners.emplace_back(listener);
|
||||
Status s;
|
||||
std::string old_manifest;
|
||||
std::string new_manifest;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
old_manifest = GetManifestNameFromLiveFiles();
|
||||
|
||||
Put(Key(0), "val");
|
||||
Flush();
|
||||
Put(Key(1), "val");
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionSet::LogAndApply:WriteManifest", [&](void *) {
|
||||
fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_env->SetFilesystemActive(true);
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
|
||||
new_manifest = GetManifestNameFromLiveFiles();
|
||||
ASSERT_NE(new_manifest, old_manifest);
|
||||
|
||||
Reopen(options);
|
||||
ASSERT_EQ("val", Get(Key(0)));
|
||||
ASSERT_EQ("val", Get(Key(1)));
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingTest, DoubleManifestWriteError) {
|
||||
std::unique_ptr<FaultInjectionTestEnv> fault_env(
|
||||
new FaultInjectionTestEnv(Env::Default()));
|
||||
std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.create_if_missing = true;
|
||||
options.env = fault_env.get();
|
||||
options.listeners.emplace_back(listener);
|
||||
Status s;
|
||||
std::string old_manifest;
|
||||
std::string new_manifest;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
old_manifest = GetManifestNameFromLiveFiles();
|
||||
|
||||
Put(Key(0), "val");
|
||||
Flush();
|
||||
Put(Key(1), "val");
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionSet::LogAndApply:WriteManifest", [&](void *) {
|
||||
fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
|
||||
fault_env->SetFilesystemActive(true);
|
||||
|
||||
// This Resume() will attempt to create a new manifest file and fail again
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
|
||||
fault_env->SetFilesystemActive(true);
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
// A successful Resume() will create a new manifest file
|
||||
s = dbfull()->Resume();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
|
||||
new_manifest = GetManifestNameFromLiveFiles();
|
||||
ASSERT_NE(new_manifest, old_manifest);
|
||||
|
||||
Reopen(options);
|
||||
ASSERT_EQ("val", Get(Key(0)));
|
||||
ASSERT_EQ("val", Get(Key(1)));
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingTest, CompactionManifestWriteError) {
|
||||
std::unique_ptr<FaultInjectionTestEnv> fault_env(
|
||||
new FaultInjectionTestEnv(Env::Default()));
|
||||
std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
|
||||
Options options = GetDefaultOptions();
|
||||
options.create_if_missing = true;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.env = fault_env.get();
|
||||
Status s;
|
||||
std::string old_manifest;
|
||||
std::string new_manifest;
|
||||
std::atomic<bool> fail_manifest(false);
|
||||
DestroyAndReopen(options);
|
||||
old_manifest = GetManifestNameFromLiveFiles();
|
||||
|
||||
Put(Key(0), "val");
|
||||
Put(Key(2), "val");
|
||||
s = Flush();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
// Wait for flush of 2nd L0 file before starting compaction
|
||||
{{"DBImpl::FlushMemTable:FlushMemTableFinished",
|
||||
"BackgroundCallCompaction:0"},
|
||||
// Wait for compaction to detect manifest write error
|
||||
{"BackgroundCallCompaction:1",
|
||||
"CompactionManifestWriteError:0"},
|
||||
// Make compaction thread wait for error to be cleared
|
||||
{"CompactionManifestWriteError:1",
|
||||
"DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
|
||||
// Wait for DB instance to clear bg_error before calling
|
||||
// TEST_WaitForCompact
|
||||
{"SstFileManagerImpl::ClearError",
|
||||
"CompactionManifestWriteError:2"}});
|
||||
// trigger manifest write failure in compaction thread
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"BackgroundCallCompaction:0", [&](void *) {
|
||||
fail_manifest.store(true);
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"VersionSet::LogAndApply:WriteManifest", [&](void *) {
|
||||
if (fail_manifest.load()) {
|
||||
fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Put(Key(1), "val");
|
||||
// This Flush will trigger a compaction, which will fail when appending to
|
||||
// the manifest
|
||||
s = Flush();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
|
||||
TEST_SYNC_POINT("CompactionManifestWriteError:0");
|
||||
// Clear all errors so when the compaction is retried, it will succeed
|
||||
fault_env->SetFilesystemActive(true);
|
||||
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
TEST_SYNC_POINT("CompactionManifestWriteError:1");
|
||||
TEST_SYNC_POINT("CompactionManifestWriteError:2");
|
||||
|
||||
s = dbfull()->TEST_WaitForCompact();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
ASSERT_EQ(s, Status::OK());
|
||||
|
||||
new_manifest = GetManifestNameFromLiveFiles();
|
||||
ASSERT_NE(new_manifest, old_manifest);
|
||||
Reopen(options);
|
||||
ASSERT_EQ("val", Get(Key(0)));
|
||||
ASSERT_EQ("val", Get(Key(1)));
|
||||
ASSERT_EQ("val", Get(Key(2)));
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingTest, CompactionWriteError) {
|
||||
std::unique_ptr<FaultInjectionTestEnv> fault_env(
|
||||
new FaultInjectionTestEnv(Env::Default()));
|
||||
|
@ -3953,12 +3953,15 @@ Status VersionSet::ProcessManifestWrites(
|
||||
for (auto v : versions) {
|
||||
delete v;
|
||||
}
|
||||
// If manifest append failed for whatever reason, the file could be
|
||||
// corrupted. So we need to force the next version update to start a
|
||||
// new manifest file.
|
||||
descriptor_log_.reset();
|
||||
if (new_descriptor_log) {
|
||||
ROCKS_LOG_INFO(db_options_->info_log,
|
||||
"Deleting manifest %" PRIu64 " current manifest %" PRIu64
|
||||
"\n",
|
||||
manifest_file_number_, pending_manifest_file_number_);
|
||||
descriptor_log_.reset();
|
||||
env_->DeleteFile(
|
||||
DescriptorFileName(dbname_, pending_manifest_file_number_));
|
||||
}
|
||||
|
@ -308,6 +308,7 @@ void SstFileManagerImpl::ClearError() {
|
||||
// since the ErrorHandler::recovery_in_prog_ flag would be true
|
||||
cur_instance_ = error_handler;
|
||||
mu_.Unlock();
|
||||
TEST_SYNC_POINT("SstFileManagerImpl::ClearError");
|
||||
s = error_handler->RecoverFromBGError();
|
||||
mu_.Lock();
|
||||
// The DB instance might have been deleted while we were
|
||||
|
Loading…
x
Reference in New Issue
Block a user