Expand auto recovery to background read errors (#9679)
Summary: Fix and enhance the background error recovery logic to handle the following situations - 1. Background read errors during flush/compaction (previously was resulting in unrecoverable state) 2. Fix auto recovery failure on read/write errors during atomic flush. It was failing due to a bug in setting the resuming_from_bg_err variable in AtomicFlushMemTablesToOutputFiles. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9679 Test Plan: Add new unit tests in error_handler_fs_test Reviewed By: riversand963 Differential Revision: D34770097 Pulled By: anand1976 fbshipit-source-id: 136da973a28d684b9c74bdf668519b0cbbbe1742
This commit is contained in:
parent
2c8100e60e
commit
a88d8795ec
@ -8,6 +8,7 @@
|
||||
* Add support for user-defined timestamps to write-committed transaction without API change. The `TransactionDB` layer APIs do not allow timestamps because we require that all user-defined-timestamps-aware operations go through the `Transaction` APIs.
|
||||
* Added BlobDB options to `ldb`
|
||||
* `BlockBasedTableOptions::detect_filter_construct_corruption` can now be dynamically configured using `DB::SetOptions`.
|
||||
* Automatically recover from retryable read IO errors during backgorund flush/compaction.
|
||||
|
||||
### Bug Fixes
|
||||
* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)
|
||||
@ -18,6 +19,7 @@
|
||||
* Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled.
|
||||
* Fixed a bug that `Iterator::Refresh()` reads stale keys after DeleteRange() performed.
|
||||
* Fixed a race condition when disable and re-enable manual compaction.
|
||||
* Fixed automatic error recovery failure in atomic flush.
|
||||
|
||||
### Public API changes
|
||||
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
|
||||
|
@ -318,6 +318,7 @@ Status BuildTable(
|
||||
|
||||
// TODO Also check the IO status when create the Iterator.
|
||||
|
||||
TEST_SYNC_POINT("BuildTable:BeforeOutputValidation");
|
||||
if (s.ok() && !empty) {
|
||||
// Verify that the table is usable
|
||||
// We set for_compaction to false and don't OptimizeForCompactionTableRead
|
||||
|
@ -263,11 +263,6 @@ Status DBImpl::FlushMemTableToOutputFile(
|
||||
if (!s.ok() && need_cancel) {
|
||||
flush_job.Cancel();
|
||||
}
|
||||
IOStatus io_s = IOStatus::OK();
|
||||
io_s = flush_job.io_status();
|
||||
if (s.ok()) {
|
||||
s = io_s;
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
InstallSuperVersionAndScheduleWork(cfd, superversion_context,
|
||||
@ -303,9 +298,7 @@ Status DBImpl::FlushMemTableToOutputFile(
|
||||
}
|
||||
|
||||
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
|
||||
if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
|
||||
!io_s.IsColumnFamilyDropped()) {
|
||||
assert(log_io_s.ok());
|
||||
if (log_io_s.ok()) {
|
||||
// Error while writing to MANIFEST.
|
||||
// In fact, versions_->io_status() can also be the result of renaming
|
||||
// CURRENT file. With current code, it's just difficult to tell. So just
|
||||
@ -316,24 +309,19 @@ Status DBImpl::FlushMemTableToOutputFile(
|
||||
// error), all the Manifest write will be map to soft error.
|
||||
// TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor is
|
||||
// needed.
|
||||
error_handler_.SetBGError(io_s,
|
||||
error_handler_.SetBGError(s,
|
||||
BackgroundErrorReason::kManifestWriteNoWAL);
|
||||
} else {
|
||||
// If WAL sync is successful (either WAL size is 0 or there is no IO
|
||||
// error), all the other SST file write errors will be set as
|
||||
// kFlushNoWAL.
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
|
||||
error_handler_.SetBGError(s, BackgroundErrorReason::kFlushNoWAL);
|
||||
}
|
||||
} else {
|
||||
if (log_io_s.ok()) {
|
||||
Status new_bg_error = s;
|
||||
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
|
||||
}
|
||||
assert(s == log_io_s);
|
||||
Status new_bg_error = s;
|
||||
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
|
||||
}
|
||||
} else {
|
||||
// If we got here, then we decided not to care about the i_os status (either
|
||||
// from never needing it or ignoring the flush job status
|
||||
io_s.PermitUncheckedError();
|
||||
}
|
||||
// If flush ran smoothly and no mempurge happened
|
||||
// install new SST file path.
|
||||
@ -416,6 +404,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
for (const auto cfd : cfds) {
|
||||
assert(cfd->imm()->NumNotFlushed() != 0);
|
||||
assert(cfd->imm()->IsFlushPending());
|
||||
assert(cfd->GetFlushReason() == cfds[0]->GetFlushReason());
|
||||
}
|
||||
#endif /* !NDEBUG */
|
||||
|
||||
@ -502,12 +491,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
// exec_status stores the execution status of flush_jobs as
|
||||
// <bool /* executed */, Status /* status code */>
|
||||
autovector<std::pair<bool, Status>> exec_status;
|
||||
autovector<IOStatus> io_status;
|
||||
std::vector<bool> pick_status;
|
||||
for (int i = 0; i != num_cfs; ++i) {
|
||||
// Initially all jobs are not executed, with status OK.
|
||||
exec_status.emplace_back(false, Status::OK());
|
||||
io_status.emplace_back(IOStatus::OK());
|
||||
pick_status.push_back(false);
|
||||
}
|
||||
|
||||
@ -527,7 +514,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i],
|
||||
&(switched_to_mempurge.at(i)));
|
||||
exec_status[i].first = true;
|
||||
io_status[i] = jobs[i]->io_status();
|
||||
}
|
||||
if (num_cfs > 1) {
|
||||
TEST_SYNC_POINT(
|
||||
@ -541,7 +527,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
&logs_with_prep_tracker_, file_meta.data() /* &file_meta[0] */,
|
||||
switched_to_mempurge.empty() ? nullptr : &(switched_to_mempurge.at(0)));
|
||||
exec_status[0].first = true;
|
||||
io_status[0] = jobs[0]->io_status();
|
||||
|
||||
Status error_status;
|
||||
for (const auto& e : exec_status) {
|
||||
@ -560,21 +545,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
s = error_status.ok() ? s : error_status;
|
||||
}
|
||||
|
||||
IOStatus io_s = IOStatus::OK();
|
||||
if (io_s.ok()) {
|
||||
IOStatus io_error = IOStatus::OK();
|
||||
for (int i = 0; i != static_cast<int>(io_status.size()); i++) {
|
||||
if (!io_status[i].ok() && !io_status[i].IsShutdownInProgress() &&
|
||||
!io_status[i].IsColumnFamilyDropped()) {
|
||||
io_error = io_status[i];
|
||||
}
|
||||
}
|
||||
io_s = io_error;
|
||||
if (s.ok() && !io_s.ok()) {
|
||||
s = io_s;
|
||||
}
|
||||
}
|
||||
|
||||
if (s.IsColumnFamilyDropped()) {
|
||||
s = Status::OK();
|
||||
}
|
||||
@ -647,7 +617,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
return std::make_pair(Status::OK(), !ready);
|
||||
};
|
||||
|
||||
bool resuming_from_bg_err = error_handler_.IsDBStopped();
|
||||
bool resuming_from_bg_err =
|
||||
error_handler_.IsDBStopped() ||
|
||||
(cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery ||
|
||||
cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush);
|
||||
while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) {
|
||||
std::pair<Status, bool> res = wait_to_install_func();
|
||||
|
||||
@ -662,7 +635,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
}
|
||||
atomic_flush_install_cv_.Wait();
|
||||
|
||||
resuming_from_bg_err = error_handler_.IsDBStopped();
|
||||
resuming_from_bg_err =
|
||||
error_handler_.IsDBStopped() ||
|
||||
(cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery ||
|
||||
cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush);
|
||||
}
|
||||
|
||||
if (!resuming_from_bg_err) {
|
||||
@ -786,8 +762,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
// Need to undo atomic flush if something went wrong, i.e. s is not OK and
|
||||
// it is not because of CF drop.
|
||||
if (!s.ok() && !s.IsColumnFamilyDropped()) {
|
||||
if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) {
|
||||
assert(log_io_s.ok());
|
||||
if (log_io_s.ok()) {
|
||||
// Error while writing to MANIFEST.
|
||||
// In fact, versions_->io_status() can also be the result of renaming
|
||||
// CURRENT file. With current code, it's just difficult to tell. So just
|
||||
@ -798,19 +773,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
||||
// error), all the Manifest write will be map to soft error.
|
||||
// TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor
|
||||
// is needed.
|
||||
error_handler_.SetBGError(io_s,
|
||||
error_handler_.SetBGError(s,
|
||||
BackgroundErrorReason::kManifestWriteNoWAL);
|
||||
} else {
|
||||
// If WAL sync is successful (either WAL size is 0 or there is no IO
|
||||
// error), all the other SST file write errors will be set as
|
||||
// kFlushNoWAL.
|
||||
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
|
||||
error_handler_.SetBGError(s, BackgroundErrorReason::kFlushNoWAL);
|
||||
}
|
||||
} else {
|
||||
if (log_io_s.ok()) {
|
||||
Status new_bg_error = s;
|
||||
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
|
||||
}
|
||||
assert(s == log_io_s);
|
||||
Status new_bg_error = s;
|
||||
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -272,8 +272,8 @@ STATIC_AVOID_DESTRUCTION(const Status, kOkStatus){Status::OK()};
|
||||
// This can also get called as part of a recovery operation. In that case, we
|
||||
// also track the error separately in recovery_error_ so we can tell in the
|
||||
// end whether recovery succeeded or not
|
||||
const Status& ErrorHandler::SetBGError(const Status& bg_err,
|
||||
BackgroundErrorReason reason) {
|
||||
const Status& ErrorHandler::HandleKnownErrors(const Status& bg_err,
|
||||
BackgroundErrorReason reason) {
|
||||
db_mutex_->AssertHeld();
|
||||
if (bg_err.ok()) {
|
||||
return kOkStatus;
|
||||
@ -382,9 +382,12 @@ const Status& ErrorHandler::SetBGError(const Status& bg_err,
|
||||
// c) all other errors are mapped to hard error.
|
||||
// 3) for other cases, SetBGError(const Status& bg_err, BackgroundErrorReason
|
||||
// reason) will be called to handle other error cases.
|
||||
const Status& ErrorHandler::SetBGError(const IOStatus& bg_io_err,
|
||||
const Status& ErrorHandler::SetBGError(const Status& bg_status,
|
||||
BackgroundErrorReason reason) {
|
||||
db_mutex_->AssertHeld();
|
||||
Status tmp_status = bg_status;
|
||||
IOStatus bg_io_err = status_to_io_status(std::move(tmp_status));
|
||||
|
||||
if (bg_io_err.ok()) {
|
||||
return kOkStatus;
|
||||
}
|
||||
@ -483,7 +486,11 @@ const Status& ErrorHandler::SetBGError(const IOStatus& bg_io_err,
|
||||
if (bg_error_stats_ != nullptr) {
|
||||
RecordTick(bg_error_stats_.get(), ERROR_HANDLER_BG_IO_ERROR_COUNT);
|
||||
}
|
||||
return SetBGError(new_bg_io_err, reason);
|
||||
// HandleKnownErrors() will use recovery_error_, so ignore
|
||||
// recovery_io_error_.
|
||||
// TODO: Do some refactoring and use only one recovery_error_
|
||||
recovery_io_error_.PermitUncheckedError();
|
||||
return HandleKnownErrors(new_bg_io_err, reason);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -53,9 +53,6 @@ class ErrorHandler {
|
||||
|
||||
const Status& SetBGError(const Status& bg_err, BackgroundErrorReason reason);
|
||||
|
||||
const Status& SetBGError(const IOStatus& bg_io_err,
|
||||
BackgroundErrorReason reason);
|
||||
|
||||
Status GetBGError() const { return bg_error_; }
|
||||
|
||||
Status GetRecoveryError() const { return recovery_error_; }
|
||||
@ -112,6 +109,8 @@ class ErrorHandler {
|
||||
// The pointer of DB statistics.
|
||||
std::shared_ptr<Statistics> bg_error_stats_;
|
||||
|
||||
const Status& HandleKnownErrors(const Status& bg_err,
|
||||
BackgroundErrorReason reason);
|
||||
Status OverrideNoSpaceError(const Status& bg_error, bool* auto_recovery);
|
||||
void RecoverFromNoSpace();
|
||||
const Status& StartRecoverFromRetryableBGIOError(const IOStatus& io_error);
|
||||
|
@ -2468,6 +2468,210 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAbortRecovery) {
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, FlushReadError) {
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener =
|
||||
std::make_shared<ErrorHandlerFSListener>();
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_env_.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.statistics = CreateDBStatistics();
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
ASSERT_OK(Put(Key(0), "val"));
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeOutputValidation", [&](void*) {
|
||||
IOStatus st = IOStatus::IOError();
|
||||
st.SetRetryable(true);
|
||||
st.SetScope(IOStatus::IOErrorScope::kIOErrorScopeFile);
|
||||
fault_fs_->SetFilesystemActive(false, st);
|
||||
});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeDeleteFile",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush();
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
ASSERT_EQ(listener->WaitForRecovery(5000000), true);
|
||||
ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_BG_ERROR_COUNT));
|
||||
ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_BG_IO_ERROR_COUNT));
|
||||
ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
|
||||
ASSERT_LE(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_AUTORESUME_COUNT));
|
||||
ASSERT_LE(0, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
|
||||
s = dbfull()->TEST_GetBGError();
|
||||
ASSERT_OK(s);
|
||||
|
||||
Reopen(GetDefaultOptions());
|
||||
ASSERT_EQ("val", Get(Key(0)));
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, AtomicFlushReadError) {
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener =
|
||||
std::make_shared<ErrorHandlerFSListener>();
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_env_.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.statistics = CreateDBStatistics();
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(false);
|
||||
options.atomic_flush = true;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
|
||||
ASSERT_OK(Put(0, Key(0), "val"));
|
||||
ASSERT_OK(Put(1, Key(0), "val"));
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeOutputValidation", [&](void*) {
|
||||
IOStatus st = IOStatus::IOError();
|
||||
st.SetRetryable(true);
|
||||
st.SetScope(IOStatus::IOErrorScope::kIOErrorScopeFile);
|
||||
fault_fs_->SetFilesystemActive(false, st);
|
||||
});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeDeleteFile",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush({0, 1});
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
ASSERT_EQ(listener->WaitForRecovery(5000000), true);
|
||||
ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_BG_ERROR_COUNT));
|
||||
ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_BG_IO_ERROR_COUNT));
|
||||
ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
|
||||
ASSERT_LE(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_AUTORESUME_COUNT));
|
||||
ASSERT_LE(0, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
|
||||
s = dbfull()->TEST_GetBGError();
|
||||
ASSERT_OK(s);
|
||||
|
||||
TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
|
||||
GetDefaultOptions());
|
||||
ASSERT_EQ("val", Get(Key(0)));
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, AtomicFlushNoSpaceError) {
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener =
|
||||
std::make_shared<ErrorHandlerFSListener>();
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_env_.get();
|
||||
options.create_if_missing = true;
|
||||
options.listeners.emplace_back(listener);
|
||||
options.statistics = CreateDBStatistics();
|
||||
Status s;
|
||||
|
||||
listener->EnableAutoRecovery(true);
|
||||
options.atomic_flush = true;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
|
||||
ASSERT_OK(Put(0, Key(0), "val"));
|
||||
ASSERT_OK(Put(1, Key(0), "val"));
|
||||
SyncPoint::GetInstance()->SetCallBack("BuildTable:create_file", [&](void*) {
|
||||
IOStatus st = IOStatus::NoSpace();
|
||||
fault_fs_->SetFilesystemActive(false, st);
|
||||
});
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"BuildTable:BeforeDeleteFile",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
s = Flush({0, 1});
|
||||
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
fault_fs_->SetFilesystemActive(true);
|
||||
ASSERT_EQ(listener->WaitForRecovery(5000000), true);
|
||||
ASSERT_LE(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_BG_ERROR_COUNT));
|
||||
ASSERT_LE(1, options.statistics->getAndResetTickerCount(
|
||||
ERROR_HANDLER_BG_IO_ERROR_COUNT));
|
||||
s = dbfull()->TEST_GetBGError();
|
||||
ASSERT_OK(s);
|
||||
|
||||
TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
|
||||
GetDefaultOptions());
|
||||
ASSERT_EQ("val", Get(Key(0)));
|
||||
}
|
||||
|
||||
TEST_F(DBErrorHandlingFSTest, CompactionReadRetryableErrorAutoRecover) {
|
||||
// In this test, in the first round of compaction, the FS is set to error.
|
||||
// So the first compaction fails due to retryable IO error and it is mapped
|
||||
// to soft error. Then, compaction is rescheduled, in the second round of
|
||||
// compaction, the FS is set to active and compaction is successful, so
|
||||
// the test will hit the CompactionJob::FinishCompactionOutputFile1 sync
|
||||
// point.
|
||||
std::shared_ptr<ErrorHandlerFSListener> listener =
|
||||
std::make_shared<ErrorHandlerFSListener>();
|
||||
Options options = GetDefaultOptions();
|
||||
options.env = fault_env_.get();
|
||||
options.create_if_missing = true;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.listeners.emplace_back(listener);
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.no_block_cache = true;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
Status s;
|
||||
std::atomic<bool> fail_first(false);
|
||||
std::atomic<bool> fail_second(true);
|
||||
Random rnd(301);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
ASSERT_OK(Put(Key(i), rnd.RandomString(1024)));
|
||||
}
|
||||
s = Flush();
|
||||
ASSERT_OK(s);
|
||||
|
||||
listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
|
||||
listener->EnableAutoRecovery(false);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::FlushMemTable:FlushMemTableFinished",
|
||||
"BackgroundCallCompaction:0"},
|
||||
{"CompactionJob::FinishCompactionOutputFile1",
|
||||
"CompactionWriteRetryableErrorAutoRecover0"}});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::BackgroundCompaction:Start",
|
||||
[&](void*) { fault_fs_->SetFilesystemActive(true); });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"BackgroundCallCompaction:0", [&](void*) { fail_first.store(true); });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"CompactionJob::Run():PausingManualCompaction:2", [&](void*) {
|
||||
if (fail_first.load() && fail_second.load()) {
|
||||
fault_fs_->SetFilesystemActive(false, error_msg);
|
||||
fail_second.store(false);
|
||||
}
|
||||
});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ASSERT_OK(Put(Key(1), "val"));
|
||||
s = Flush();
|
||||
ASSERT_OK(s);
|
||||
|
||||
s = dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_OK(s);
|
||||
TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
Reopen(GetDefaultOptions());
|
||||
}
|
||||
|
||||
class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest,
|
||||
public testing::WithParamInterface<bool> {};
|
||||
|
||||
|
@ -136,7 +136,6 @@ FlushJob::FlushJob(
|
||||
}
|
||||
|
||||
FlushJob::~FlushJob() {
|
||||
io_status_.PermitUncheckedError();
|
||||
ThreadStatusUtil::ResetThreadStatus();
|
||||
}
|
||||
|
||||
@ -290,17 +289,13 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
|
||||
} else if (write_manifest_) {
|
||||
TEST_SYNC_POINT("FlushJob::InstallResults");
|
||||
// Replace immutable memtable with the generated Table
|
||||
IOStatus tmp_io_s;
|
||||
s = cfd_->imm()->TryInstallMemtableFlushResults(
|
||||
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
|
||||
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
|
||||
log_buffer_, &committed_flush_jobs_info_, &tmp_io_s,
|
||||
log_buffer_, &committed_flush_jobs_info_,
|
||||
!(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
|
||||
but 'false' if mempurge successful: no new min log number
|
||||
or new level 0 file path to write to manifest. */);
|
||||
if (!tmp_io_s.ok()) {
|
||||
io_status_ = tmp_io_s;
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok() && file_meta != nullptr) {
|
||||
@ -926,9 +921,9 @@ Status FlushJob::WriteLevel0Table() {
|
||||
job_context_->job_id, Env::IO_HIGH, &table_properties_, write_hint,
|
||||
full_history_ts_low, blob_callback_, &num_input_entries,
|
||||
&memtable_payload_bytes, &memtable_garbage_bytes);
|
||||
if (!io_s.ok()) {
|
||||
io_status_ = io_s;
|
||||
}
|
||||
// TODO: Cleanup io_status in BuildTable and table builders
|
||||
assert(!s.ok() || io_s.ok());
|
||||
io_s.PermitUncheckedError();
|
||||
if (num_input_entries != total_num_entries && s.ok()) {
|
||||
std::string msg = "Expected " + ToString(total_num_entries) +
|
||||
" entries in memtables, but read " +
|
||||
|
@ -93,9 +93,6 @@ class FlushJob {
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
// Return the IO status
|
||||
IOStatus io_status() const { return io_status_; }
|
||||
|
||||
private:
|
||||
void ReportStartedFlush();
|
||||
void ReportFlushInputSize(const autovector<MemTable*>& mems);
|
||||
@ -184,7 +181,6 @@ class FlushJob {
|
||||
Version* base_;
|
||||
bool pick_memtable_called;
|
||||
Env::Priority thread_pri_;
|
||||
IOStatus io_status_;
|
||||
|
||||
const std::shared_ptr<IOTracer> io_tracer_;
|
||||
SystemClock* clock_;
|
||||
|
@ -407,7 +407,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
|
||||
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
||||
LogBuffer* log_buffer,
|
||||
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
|
||||
IOStatus* io_s, bool write_edits) {
|
||||
bool write_edits) {
|
||||
AutoThreadOperationStageUpdater stage_updater(
|
||||
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
|
||||
mu->AssertHeld();
|
||||
@ -529,7 +529,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
|
||||
db_directory, /*new_descriptor_log=*/false,
|
||||
/*column_family_options=*/nullptr,
|
||||
manifest_write_cb);
|
||||
*io_s = vset->io_status();
|
||||
} else {
|
||||
// If write_edit is false (e.g: successful mempurge),
|
||||
// then remove old memtables, wake up manifest write queue threads,
|
||||
@ -545,7 +544,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
|
||||
// TODO(bjlemaire): explain full reason WakeUpWaitingManifestWriters
|
||||
// needed or investigate more.
|
||||
vset->WakeUpWaitingManifestWriters();
|
||||
*io_s = IOStatus::OK();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -270,7 +270,7 @@ class MemTableList {
|
||||
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
|
||||
LogBuffer* log_buffer,
|
||||
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
|
||||
IOStatus* io_s, bool write_edits = true);
|
||||
bool write_edits = true);
|
||||
|
||||
// New memtables are inserted at the front of the list.
|
||||
// Takes ownership of the referenced held on *m by the caller of Add().
|
||||
|
@ -124,7 +124,7 @@ class MemTableListTest : public testing::Test {
|
||||
std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
|
||||
Status s = list->TryInstallMemtableFlushResults(
|
||||
cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
|
||||
file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info, &io_s);
|
||||
file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info);
|
||||
EXPECT_OK(io_s);
|
||||
return s;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user