Update bg_error when log flush fails in SwitchMemtable() (#5072)
Summary: There is a potential failure case in DBImpl::SwitchMemtable() that is not handled properly. The call to cur_log_writer->WriteBuffer() can fail due to an IO error. In that case, we need to call SetBGError() in order set the background error since the WriteBuffer() failure may result in data loss. Also, the asserts for !new_mem and !new_log are incorrect, as those would have been allocated by the time this failure is detected. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5072 Differential Revision: D14461384 Pulled By: anand1976 fbshipit-source-id: fb59bce9d61378f37d2dfcd28c0b704b0f43c3cf
This commit is contained in:
parent
2263f86901
commit
b4fa51dfaf
@ -1462,12 +1462,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||
context->superversion_context.NewSuperVersion();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
// PLEASE NOTE: We assume that there are no failable operations
|
||||
// after lock is acquired below since we are already notifying
|
||||
// client about mem table becoming immutable.
|
||||
NotifyOnMemTableSealed(cfd, memtable_info);
|
||||
#endif //ROCKSDB_LITE
|
||||
}
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"[%s] New memtable created with log file: #%" PRIu64
|
||||
@ -1476,10 +1470,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||
mutex_.Lock();
|
||||
if (s.ok() && creating_new_log) {
|
||||
log_write_mutex_.Lock();
|
||||
logfile_number_ = new_log_number;
|
||||
assert(new_log != nullptr);
|
||||
log_empty_ = true;
|
||||
log_dir_synced_ = false;
|
||||
if (!logs_.empty()) {
|
||||
// Alway flush the buffer of the last log before switching to a new one
|
||||
log::Writer* cur_log_writer = logs_.back().writer;
|
||||
@ -1492,16 +1483,36 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||
new_log_number);
|
||||
}
|
||||
}
|
||||
logs_.emplace_back(logfile_number_, new_log);
|
||||
alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
|
||||
if (s.ok()) {
|
||||
logfile_number_ = new_log_number;
|
||||
log_empty_ = true;
|
||||
log_dir_synced_ = false;
|
||||
logs_.emplace_back(logfile_number_, new_log);
|
||||
alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
|
||||
}
|
||||
log_write_mutex_.Unlock();
|
||||
}
|
||||
|
||||
if (!s.ok()) {
|
||||
// how do we fail if we're not creating new log?
|
||||
assert(creating_new_log);
|
||||
assert(!new_mem);
|
||||
assert(!new_log);
|
||||
if (new_mem) {
|
||||
delete new_mem;
|
||||
}
|
||||
if (new_log) {
|
||||
delete new_log;
|
||||
}
|
||||
SuperVersion* new_superversion =
|
||||
context->superversion_context.new_superversion.release();
|
||||
if (new_superversion != nullptr) {
|
||||
delete new_superversion;
|
||||
}
|
||||
// We may have lost data from the WritableFileBuffer in-memory buffer for
|
||||
// the current log, so treat it as a fatal error and set bg_error
|
||||
error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
|
||||
// Read back bg_error in order to get the right severity
|
||||
s = error_handler_.GetBGError();
|
||||
|
||||
if (two_write_queues_) {
|
||||
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
||||
}
|
||||
@ -1528,6 +1539,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||
cfd->SetMemtable(new_mem);
|
||||
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
|
||||
mutable_cf_options);
|
||||
#ifndef ROCKSDB_LITE
|
||||
mutex_.Unlock();
|
||||
// Notify client that memtable is sealed, now that we have successfully
|
||||
// installed a new memtable
|
||||
NotifyOnMemTableSealed(cfd, memtable_info);
|
||||
mutex_.Lock();
|
||||
#endif // ROCKSDB_LITE
|
||||
if (two_write_queues_) {
|
||||
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
||||
}
|
||||
|
@ -140,6 +140,32 @@ TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
|
||||
Random rnd(301);
|
||||
std::unique_ptr<FaultInjectionTestEnv> mock_env(
|
||||
new FaultInjectionTestEnv(Env::Default()));
|
||||
Options options = GetOptions();
|
||||
options.env = mock_env.get();
|
||||
options.writable_file_max_buffer_size = 4 * 1024 * 1024;
|
||||
options.write_buffer_size = 3 * 512 * 1024;
|
||||
options.wal_bytes_per_sync = 256 * 1024;
|
||||
options.manual_wal_flush = true;
|
||||
Reopen(options);
|
||||
mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
|
||||
Status s;
|
||||
for (int i = 0; i < 4 * 512; ++i) {
|
||||
s = Put(Key(i), RandomString(&rnd, 1024));
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
|
||||
|
||||
mock_env->SetFilesystemActive(true);
|
||||
// Close before mock_env destruct.
|
||||
Close();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
|
||||
testing::Values(DBTestBase::kDefault,
|
||||
DBTestBase::kConcurrentWALWrites,
|
||||
|
@ -811,6 +811,7 @@ struct DBOptions {
|
||||
//
|
||||
// Default: 0, turned off
|
||||
//
|
||||
// Note: DOES NOT apply to WAL files. See wal_bytes_per_sync instead
|
||||
// Dynamically changeable through SetDBOptions() API.
|
||||
uint64_t bytes_per_sync = 0;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user