rocksdb/env/env.cc
Zhichao Cao 4246888101 Pass IOStatus to write path and set retryable IO Error as hard error in BG jobs (#6487)
Summary:
In the current code base, we use Status to get and store the returned status from the call. Specifically, for IO related functions, the current Status cannot reflect the IO Error details such as error scope, error retryable attribute, and others. With the implementation of https://github.com/facebook/rocksdb/issues/5761, we have the new Wrapper for IO, which returns IOStatus instead of Status. However, the IOStatus is purged at the lower level of write path and transferred to Status.

The first job of this PR is to pass the IOStatus to the write path (flush, WAL write, and Compaction). The second job is to identify the Retryable IO Error as HardError, and set the bg_error_ as HardError. In this case, the DB Instance becomes read only. User is informed of the Status and need to take actions to deal with it (e.g., call db->Resume()).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6487

Test Plan: Added the testing case to error_handler_fs_test. Pass make asan_check

Reviewed By: anand1976

Differential Revision: D20685017

Pulled By: zhichao-cao

fbshipit-source-id: ff85f042896243abcd6ef37877834e26f36b6eb0
2020-03-27 16:04:43 -07:00

482 lines
13 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/env.h"
#include <thread>
#include "env/composite_env_wrapper.h"
#include "logging/env_logger.h"
#include "memory/arena.h"
#include "options/db_options.h"
#include "port/port.h"
#include "port/sys_time.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/object_registry.h"
#include "util/autovector.h"
namespace ROCKSDB_NAMESPACE {
Env::Env() : thread_status_updater_(nullptr) {
file_system_ = std::make_shared<LegacyFileSystemWrapper>(this);
}
Env::Env(std::shared_ptr<FileSystem> fs)
: thread_status_updater_(nullptr),
file_system_(fs) {}
Env::~Env() {
}
Status Env::NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) {
return NewEnvLogger(fname, this, result);
}
Status Env::LoadEnv(const std::string& value, Env** result) {
Env* env = *result;
Status s;
#ifndef ROCKSDB_LITE
s = ObjectRegistry::NewInstance()->NewStaticObject<Env>(value, &env);
#else
s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
#endif
if (s.ok()) {
*result = env;
}
return s;
}
Status Env::LoadEnv(const std::string& value, Env** result,
std::shared_ptr<Env>* guard) {
assert(result);
Status s;
#ifndef ROCKSDB_LITE
Env* env = nullptr;
std::unique_ptr<Env> uniq_guard;
std::string err_msg;
assert(guard != nullptr);
env = ObjectRegistry::NewInstance()->NewObject<Env>(value, &uniq_guard,
&err_msg);
if (!env) {
s = Status::NotFound(std::string("Cannot load ") + Env::Type() + ": " +
value);
env = Env::Default();
}
if (s.ok() && uniq_guard) {
guard->reset(uniq_guard.release());
*result = guard->get();
} else {
*result = env;
}
#else
(void)result;
(void)guard;
s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
#endif
return s;
}
std::string Env::PriorityToString(Env::Priority priority) {
switch (priority) {
case Env::Priority::BOTTOM:
return "Bottom";
case Env::Priority::LOW:
return "Low";
case Env::Priority::HIGH:
return "High";
case Env::Priority::USER:
return "User";
case Env::Priority::TOTAL:
assert(false);
}
return "Invalid";
}
uint64_t Env::GetThreadID() const {
std::hash<std::thread::id> hasher;
return hasher(std::this_thread::get_id());
}
Status Env::ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {
Status s = RenameFile(old_fname, fname);
if (!s.ok()) {
return s;
}
return NewWritableFile(fname, result, options);
}
Status Env::GetChildrenFileAttributes(const std::string& dir,
std::vector<FileAttributes>* result) {
assert(result != nullptr);
std::vector<std::string> child_fnames;
Status s = GetChildren(dir, &child_fnames);
if (!s.ok()) {
return s;
}
result->resize(child_fnames.size());
size_t result_size = 0;
for (size_t i = 0; i < child_fnames.size(); ++i) {
const std::string path = dir + "/" + child_fnames[i];
if (!(s = GetFileSize(path, &(*result)[result_size].size_bytes)).ok()) {
if (FileExists(path).IsNotFound()) {
// The file may have been deleted since we listed the directory
continue;
}
return s;
}
(*result)[result_size].name = std::move(child_fnames[i]);
result_size++;
}
result->resize(result_size);
return Status::OK();
}
SequentialFile::~SequentialFile() {
}
RandomAccessFile::~RandomAccessFile() {
}
WritableFile::~WritableFile() {
}
MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {}
Logger::~Logger() {}
Status Logger::Close() {
if (!closed_) {
closed_ = true;
return CloseImpl();
} else {
return Status::OK();
}
}
Status Logger::CloseImpl() { return Status::NotSupported(); }
FileLock::~FileLock() {
}
void LogFlush(Logger *info_log) {
if (info_log) {
info_log->Flush();
}
}
static void Logv(Logger *info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
}
}
void Log(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Logv(info_log, format, ap);
va_end(ap);
}
void Logger::Logv(const InfoLogLevel log_level, const char* format, va_list ap) {
static const char* kInfoLogLevelNames[5] = { "DEBUG", "INFO", "WARN",
"ERROR", "FATAL" };
if (log_level < log_level_) {
return;
}
if (log_level == InfoLogLevel::INFO_LEVEL) {
// Doesn't print log level if it is INFO level.
// This is to avoid unexpected performance regression after we add
// the feature of log level. All the logs before we add the feature
// are INFO level. We don't want to add extra costs to those existing
// logging.
Logv(format, ap);
} else if (log_level == InfoLogLevel::HEADER_LEVEL) {
LogHeader(format, ap);
} else {
char new_format[500];
snprintf(new_format, sizeof(new_format) - 1, "[%s] %s",
kInfoLogLevelNames[log_level], format);
Logv(new_format, ap);
}
}
static void Logv(const InfoLogLevel log_level, Logger *info_log, const char *format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= log_level) {
if (log_level == InfoLogLevel::HEADER_LEVEL) {
info_log->LogHeader(format, ap);
} else {
info_log->Logv(log_level, format, ap);
}
}
}
void Log(const InfoLogLevel log_level, Logger* info_log, const char* format,
...) {
va_list ap;
va_start(ap, format);
Logv(log_level, info_log, format, ap);
va_end(ap);
}
static void Headerv(Logger *info_log, const char *format, va_list ap) {
if (info_log) {
info_log->LogHeader(format, ap);
}
}
void Header(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Headerv(info_log, format, ap);
va_end(ap);
}
static void Debugv(Logger* info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) {
info_log->Logv(InfoLogLevel::DEBUG_LEVEL, format, ap);
}
}
void Debug(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Debugv(info_log, format, ap);
va_end(ap);
}
static void Infov(Logger* info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
}
}
void Info(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Infov(info_log, format, ap);
va_end(ap);
}
static void Warnv(Logger* info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::WARN_LEVEL) {
info_log->Logv(InfoLogLevel::WARN_LEVEL, format, ap);
}
}
void Warn(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Warnv(info_log, format, ap);
va_end(ap);
}
static void Errorv(Logger* info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::ERROR_LEVEL) {
info_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap);
}
}
void Error(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Errorv(info_log, format, ap);
va_end(ap);
}
static void Fatalv(Logger* info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::FATAL_LEVEL) {
info_log->Logv(InfoLogLevel::FATAL_LEVEL, format, ap);
}
}
void Fatal(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Fatalv(info_log, format, ap);
va_end(ap);
}
void LogFlush(const std::shared_ptr<Logger>& info_log) {
LogFlush(info_log.get());
}
void Log(const InfoLogLevel log_level, const std::shared_ptr<Logger>& info_log,
const char* format, ...) {
va_list ap;
va_start(ap, format);
Logv(log_level, info_log.get(), format, ap);
va_end(ap);
}
void Header(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Headerv(info_log.get(), format, ap);
va_end(ap);
}
void Debug(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Debugv(info_log.get(), format, ap);
va_end(ap);
}
void Info(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Infov(info_log.get(), format, ap);
va_end(ap);
}
void Warn(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Warnv(info_log.get(), format, ap);
va_end(ap);
}
void Error(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Errorv(info_log.get(), format, ap);
va_end(ap);
}
void Fatal(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Fatalv(info_log.get(), format, ap);
va_end(ap);
}
void Log(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Logv(info_log.get(), format, ap);
va_end(ap);
}
Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname,
bool should_sync) {
LegacyFileSystemWrapper lfsw(env);
return WriteStringToFile(&lfsw, data, fname, should_sync);
}
Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
LegacyFileSystemWrapper lfsw(env);
return ReadFileToString(&lfsw, fname, data);
}
EnvWrapper::~EnvWrapper() {
}
namespace { // anonymous namespace
void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
env_options->use_mmap_reads = options.allow_mmap_reads;
env_options->use_mmap_writes = options.allow_mmap_writes;
env_options->use_direct_reads = options.use_direct_reads;
env_options->set_fd_cloexec = options.is_fd_close_on_exec;
env_options->bytes_per_sync = options.bytes_per_sync;
env_options->compaction_readahead_size = options.compaction_readahead_size;
env_options->random_access_max_buffer_size =
options.random_access_max_buffer_size;
env_options->rate_limiter = options.rate_limiter.get();
env_options->writable_file_max_buffer_size =
options.writable_file_max_buffer_size;
env_options->allow_fallocate = options.allow_fallocate;
env_options->strict_bytes_per_sync = options.strict_bytes_per_sync;
options.env->SanitizeEnvOptions(env_options);
}
}
EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.bytes_per_sync = db_options.wal_bytes_per_sync;
optimized_env_options.writable_file_max_buffer_size =
db_options.writable_file_max_buffer_size;
return optimized_env_options;
}
EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
return env_options;
}
EnvOptions Env::OptimizeForLogRead(const EnvOptions& env_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_reads = false;
return optimized_env_options;
}
EnvOptions Env::OptimizeForManifestRead(const EnvOptions& env_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_reads = false;
return optimized_env_options;
}
EnvOptions Env::OptimizeForCompactionTableWrite(
const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_writes =
db_options.use_direct_io_for_flush_and_compaction;
return optimized_env_options;
}
EnvOptions Env::OptimizeForCompactionTableRead(
const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_reads = db_options.use_direct_reads;
return optimized_env_options;
}
EnvOptions::EnvOptions(const DBOptions& options) {
AssignEnvOptions(this, options);
}
EnvOptions::EnvOptions() {
DBOptions options;
AssignEnvOptions(this, options);
}
Status NewEnvLogger(const std::string& fname, Env* env,
std::shared_ptr<Logger>* result) {
EnvOptions options;
// TODO: Tune the buffer size.
options.writable_file_max_buffer_size = 1024 * 1024;
std::unique_ptr<WritableFile> writable_file;
const auto status = env->NewWritableFile(fname, &writable_file, options);
if (!status.ok()) {
return status;
}
*result = std::make_shared<EnvLogger>(
NewLegacyWritableFileWrapper(std::move(writable_file)), fname, options,
env);
return Status::OK();
}
const std::shared_ptr<FileSystem>& Env::GetFileSystem() const {
return file_system_;
}
#ifdef OS_WIN
std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs) {
return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
}
#endif
} // namespace ROCKSDB_NAMESPACE