rocksdb/env/composite_env_wrapper.h

1118 lines
40 KiB
C
Raw Normal View History

Introduce a new storage specific Env API (#5761) Summary: The current Env API encompasses both storage/file operations, as well as OS related operations. Most of the APIs return a Status, which does not have enough metadata about an error, such as whether its retry-able or not, scope (i.e fault domain) of the error etc., that may be required in order to properly handle a storage error. The file APIs also do not provide enough control over the IO SLA, such as timeout, prioritization, hinting about placement and redundancy etc. This PR separates out the file/storage APIs from Env into a new FileSystem class. The APIs are updated to return an IOStatus with metadata about the error, as well as to take an IOOptions structure as input in order to allow more control over the IO. The user can set both ```options.env``` and ```options.file_system``` to specify that RocksDB should use the former for OS related operations and the latter for storage operations. Internally, a ```CompositeEnvWrapper``` has been introduced that inherits from ```Env``` and redirects individual methods to either an ```Env``` implementation or the ```FileSystem``` as appropriate. When options are sanitized during ```DB::Open```, ```options.env``` is replaced with a newly allocated ```CompositeEnvWrapper``` instance if both env and file_system have been specified. This way, the rest of the RocksDB code can continue to function as before. This PR also ports PosixEnv to the new API by splitting it into two - PosixEnv and PosixFileSystem. PosixEnv is defined as a sub-class of CompositeEnvWrapper, and threading/time functions are overridden with Posix specific implementations in order to avoid an extra level of indirection. The ```CompositeEnvWrapper``` translates ```IOStatus``` return code to ```Status```, and sets the severity to ```kSoftError``` if the io_status is retryable. The error handling code in RocksDB can then recover the DB automatically. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5761 Differential Revision: D18868376 Pulled By: anand1976 fbshipit-source-id: 39efe18a162ea746fabac6360ff529baba48486f
2019-12-13 23:47:08 +01:00
// Copyright (c) 2019-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
namespace rocksdb {
// The CompositeEnvWrapper class provides an interface that is compatible
// with the old monolithic Env API, and an implementation that wraps around
// the new Env that provides threading and other OS related functionality, and
// the new FileSystem API that provides storage functionality. By
// providing the old Env interface, it allows the rest of RocksDB code to
// be agnostic of whether the underlying Env implementation is a monolithic
// Env or an Env + FileSystem. In the former case, the user will specify
// Options::env only, whereas in the latter case, the user will specify
// Options::env and Options::file_system.
inline IOStatus status_to_io_status(Status&& status) {
if (status.ok()) {
// Fast path
return IOStatus::OK();
} else {
const char* state = status.getState();
if (state) {
return IOStatus(status.code(), status.subcode(),
Slice(state, strlen(status.getState()) + 1),
Slice());
} else {
return IOStatus(status.code(), status.subcode());
}
}
}
class CompositeSequentialFileWrapper : public SequentialFile {
public:
explicit CompositeSequentialFileWrapper(
std::unique_ptr<FSSequentialFile>& target)
: target_(std::move(target)) {}
Status Read(size_t n, Slice* result, char* scratch) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Read(n, io_opts, result, scratch, &dbg);
}
Status Skip(uint64_t n) override { return target_->Skip(n); }
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
Status InvalidateCache(size_t offset, size_t length) override {
return target_->InvalidateCache(offset, length);
}
Status PositionedRead(uint64_t offset, size_t n, Slice* result,
char* scratch) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->PositionedRead(offset, n, io_opts, result, scratch, &dbg);
}
private:
std::unique_ptr<FSSequentialFile> target_;
};
class CompositeRandomAccessFileWrapper : public RandomAccessFile {
public:
explicit CompositeRandomAccessFileWrapper(
std::unique_ptr<FSRandomAccessFile>& target)
: target_(std::move(target)) {}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Read(offset, n, io_opts, result, scratch, &dbg);
}
Status MultiRead(ReadRequest* reqs, size_t num_reqs) override {
IOOptions io_opts;
IODebugContext dbg;
std::vector<FSReadRequest> fs_reqs;
Status status;
fs_reqs.resize(num_reqs);
for (size_t i = 0; i < num_reqs; ++i) {
fs_reqs[i].offset = reqs[i].offset;
fs_reqs[i].len = reqs[i].len;
fs_reqs[i].scratch = reqs[i].scratch;
fs_reqs[i].status = IOStatus::OK();
}
status = target_->MultiRead(fs_reqs.data(), num_reqs, io_opts, &dbg);
for (size_t i = 0; i < num_reqs; ++i) {
reqs[i].result = fs_reqs[i].result;
reqs[i].status = fs_reqs[i].status;
}
return status;
}
Status Prefetch(uint64_t offset, size_t n) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Prefetch(offset, n, io_opts, &dbg);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
};
void Hint(AccessPattern pattern) override {
target_->Hint((FSRandomAccessFile::AccessPattern)pattern);
}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
Status InvalidateCache(size_t offset, size_t length) override {
return target_->InvalidateCache(offset, length);
}
private:
std::unique_ptr<FSRandomAccessFile> target_;
};
class CompositeWritableFileWrapper : public WritableFile {
public:
explicit CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile>& t)
: target_(std::move(t)) {}
Status Append(const Slice& data) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Append(data, io_opts, &dbg);
}
Status PositionedAppend(const Slice& data, uint64_t offset) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->PositionedAppend(data, offset, io_opts, &dbg);
}
Status Truncate(uint64_t size) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Truncate(size, io_opts, &dbg);
}
Status Close() override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Close(io_opts, &dbg);
}
Status Flush() override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Flush(io_opts, &dbg);
}
Status Sync() override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Sync(io_opts, &dbg);
}
Status Fsync() override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Fsync(io_opts, &dbg);
}
bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
target_->SetWriteLifeTimeHint(hint);
}
Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
return target_->GetWriteLifeTimeHint();
}
uint64_t GetFileSize() override {
IOOptions io_opts;
IODebugContext dbg;
return target_->GetFileSize(io_opts, &dbg);
}
void SetPreallocationBlockSize(size_t size) override {
target_->SetPreallocationBlockSize(size);
}
void GetPreallocationStatus(size_t* block_size,
size_t* last_allocated_block) override {
target_->GetPreallocationStatus(block_size, last_allocated_block);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
Status InvalidateCache(size_t offset, size_t length) override {
return target_->InvalidateCache(offset, length);
}
Status RangeSync(uint64_t offset, uint64_t nbytes) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->RangeSync(offset, nbytes, io_opts, &dbg);
}
void PrepareWrite(size_t offset, size_t len) override {
IOOptions io_opts;
IODebugContext dbg;
target_->PrepareWrite(offset, len, io_opts, &dbg);
}
Status Allocate(uint64_t offset, uint64_t len) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Allocate(offset, len, io_opts, &dbg);
}
std::unique_ptr<FSWritableFile>* target() { return &target_; }
private:
std::unique_ptr<FSWritableFile> target_;
};
class CompositeRandomRWFileWrapper : public RandomRWFile {
public:
explicit CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile>& target)
: target_(std::move(target)) {}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
Status Write(uint64_t offset, const Slice& data) override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Write(offset, data, io_opts, &dbg);
}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Read(offset, n, io_opts, result, scratch, &dbg);
}
Status Flush() override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Flush(io_opts, &dbg);
}
Status Sync() override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Sync(io_opts, &dbg);
}
Status Fsync() override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Fsync(io_opts, &dbg);
}
Status Close() override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Close(io_opts, &dbg);
}
private:
std::unique_ptr<FSRandomRWFile> target_;
};
class CompositeDirectoryWrapper : public Directory {
public:
explicit CompositeDirectoryWrapper(std::unique_ptr<FSDirectory>& target)
: target_(std::move(target)) {}
Status Fsync() override {
IOOptions io_opts;
IODebugContext dbg;
return target_->Fsync(io_opts, &dbg);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
private:
std::unique_ptr<FSDirectory> target_;
};
class CompositeEnvWrapper : public Env {
public:
// Initialize a CompositeEnvWrapper that delegates all thread/time related
// calls to env, and all file operations to fs
explicit CompositeEnvWrapper(Env* env, FileSystem* fs)
: env_target_(env), fs_env_target_(fs) {}
~CompositeEnvWrapper() {}
// Return the target to which this Env forwards all calls
Env* env_target() const { return env_target_; }
FileSystem* fs_env_target() const { return fs_env_target_; }
// The following text is boilerplate that forwards all methods to target()
Status NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r,
const EnvOptions& options) override {
IODebugContext dbg;
std::unique_ptr<FSSequentialFile> file;
Status status;
status =
fs_env_target_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
if (status.ok()) {
r->reset(new CompositeSequentialFileWrapper(file));
}
return status;
}
Status NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const EnvOptions& options) override {
IODebugContext dbg;
std::unique_ptr<FSRandomAccessFile> file;
Status status;
status = fs_env_target_->NewRandomAccessFile(f, FileOptions(options), &file,
&dbg);
if (status.ok()) {
r->reset(new CompositeRandomAccessFileWrapper(file));
}
return status;
}
Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
const EnvOptions& options) override {
IODebugContext dbg;
std::unique_ptr<FSWritableFile> file;
Status status;
status =
fs_env_target_->NewWritableFile(f, FileOptions(options), &file, &dbg);
if (status.ok()) {
r->reset(new CompositeWritableFileWrapper(file));
}
return status;
}
Status ReopenWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override {
IODebugContext dbg;
Status status;
std::unique_ptr<FSWritableFile> file;
status = fs_env_target_->ReopenWritableFile(fname, FileOptions(options),
&file, &dbg);
if (status.ok()) {
result->reset(new CompositeWritableFileWrapper(file));
}
return status;
}
Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
std::unique_ptr<WritableFile>* r,
const EnvOptions& options) override {
IODebugContext dbg;
Status status;
std::unique_ptr<FSWritableFile> file;
status = fs_env_target_->ReuseWritableFile(
fname, old_fname, FileOptions(options), &file, &dbg);
if (status.ok()) {
r->reset(new CompositeWritableFileWrapper(file));
}
return status;
}
Status NewRandomRWFile(const std::string& fname,
std::unique_ptr<RandomRWFile>* result,
const EnvOptions& options) override {
IODebugContext dbg;
std::unique_ptr<FSRandomRWFile> file;
Status status;
status = fs_env_target_->NewRandomRWFile(fname, FileOptions(options), &file,
&dbg);
if (status.ok()) {
result->reset(new CompositeRandomRWFileWrapper(file));
}
return status;
}
Status NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) override {
return fs_env_target_->NewMemoryMappedFileBuffer(fname, result);
}
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override {
IOOptions io_opts;
IODebugContext dbg;
std::unique_ptr<FSDirectory> dir;
Status status;
status = fs_env_target_->NewDirectory(name, io_opts, &dir, &dbg);
if (status.ok()) {
result->reset(new CompositeDirectoryWrapper(dir));
}
return status;
}
Status FileExists(const std::string& f) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->FileExists(f, io_opts, &dbg);
}
Status GetChildren(const std::string& dir,
std::vector<std::string>* r) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetChildren(dir, io_opts, r, &dbg);
}
Status GetChildrenFileAttributes(
const std::string& dir, std::vector<FileAttributes>* result) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetChildrenFileAttributes(dir, io_opts, result,
&dbg);
}
Status DeleteFile(const std::string& f) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->DeleteFile(f, io_opts, &dbg);
}
Status Truncate(const std::string& fname, size_t size) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->Truncate(fname, size, io_opts, &dbg);
}
Status CreateDir(const std::string& d) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->CreateDir(d, io_opts, &dbg);
}
Status CreateDirIfMissing(const std::string& d) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->CreateDirIfMissing(d, io_opts, &dbg);
}
Status DeleteDir(const std::string& d) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->DeleteDir(d, io_opts, &dbg);
}
Status GetFileSize(const std::string& f, uint64_t* s) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetFileSize(f, io_opts, s, &dbg);
}
Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetFileModificationTime(fname, io_opts, file_mtime,
&dbg);
}
Status RenameFile(const std::string& s, const std::string& t) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->RenameFile(s, t, io_opts, &dbg);
}
Status LinkFile(const std::string& s, const std::string& t) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->LinkFile(s, t, io_opts, &dbg);
}
Status NumFileLinks(const std::string& fname, uint64_t* count) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->NumFileLinks(fname, io_opts, count, &dbg);
}
Status AreFilesSame(const std::string& first, const std::string& second,
bool* res) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->AreFilesSame(first, second, io_opts, res, &dbg);
}
Status LockFile(const std::string& f, FileLock** l) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->LockFile(f, io_opts, l, &dbg);
}
Status UnlockFile(FileLock* l) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->UnlockFile(l, io_opts, &dbg);
}
Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetAbsolutePath(db_path, io_opts, output_path, &dbg);
}
#if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION)
Status LoadLibrary(const std::string& lib_name,
const std::string& search_path,
std::shared_ptr<DynamicLibrary>* result) override {
return env_target_->LoadLibrary(lib_name, search_path, result);
}
#endif
void Schedule(void (*f)(void* arg), void* a, Priority pri,
void* tag = nullptr, void (*u)(void* arg) = nullptr) override {
return env_target_->Schedule(f, a, pri, tag, u);
}
int UnSchedule(void* tag, Priority pri) override {
return env_target_->UnSchedule(tag, pri);
}
void StartThread(void (*f)(void*), void* a) override {
return env_target_->StartThread(f, a);
}
void WaitForJoin() override { return env_target_->WaitForJoin(); }
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
return env_target_->GetThreadPoolQueueLen(pri);
}
Status GetTestDirectory(std::string* path) override {
return env_target_->GetTestDirectory(path);
}
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override {
return env_target_->NewLogger(fname, result);
}
uint64_t NowMicros() override { return env_target_->NowMicros(); }
uint64_t NowNanos() override { return env_target_->NowNanos(); }
uint64_t NowCPUNanos() override { return env_target_->NowCPUNanos(); }
void SleepForMicroseconds(int micros) override {
env_target_->SleepForMicroseconds(micros);
}
Status GetHostName(char* name, uint64_t len) override {
return env_target_->GetHostName(name, len);
}
Status GetCurrentTime(int64_t* unix_time) override {
return env_target_->GetCurrentTime(unix_time);
}
void SetBackgroundThreads(int num, Priority pri) override {
return env_target_->SetBackgroundThreads(num, pri);
}
int GetBackgroundThreads(Priority pri) override {
return env_target_->GetBackgroundThreads(pri);
}
Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
return env_target_->SetAllowNonOwnerAccess(allow_non_owner_access);
}
void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
return env_target_->IncBackgroundThreadsIfNeeded(num, pri);
}
void LowerThreadPoolIOPriority(Priority pool = LOW) override {
env_target_->LowerThreadPoolIOPriority(pool);
}
void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
env_target_->LowerThreadPoolCPUPriority(pool);
}
std::string TimeToString(uint64_t time) override {
return env_target_->TimeToString(time);
}
Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
return env_target_->GetThreadList(thread_list);
}
ThreadStatusUpdater* GetThreadStatusUpdater() const override {
return env_target_->GetThreadStatusUpdater();
}
uint64_t GetThreadID() const override { return env_target_->GetThreadID(); }
std::string GenerateUniqueId() override {
return env_target_->GenerateUniqueId();
}
EnvOptions OptimizeForLogRead(const EnvOptions& env_options) const override {
return fs_env_target_->OptimizeForLogRead(FileOptions(env_options));
}
EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const override {
return fs_env_target_->OptimizeForManifestRead(
FileOptions(env_options));
}
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const override {
return fs_env_target_->OptimizeForLogWrite(FileOptions(env_options),
db_options);
}
EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const override {
return fs_env_target_->OptimizeForManifestWrite(
FileOptions(env_options));
}
EnvOptions OptimizeForCompactionTableWrite(
const EnvOptions& env_options,
const ImmutableDBOptions& immutable_ops) const override {
return fs_env_target_->OptimizeForCompactionTableWrite(
FileOptions(env_options),
immutable_ops);
}
EnvOptions OptimizeForCompactionTableRead(
const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const override {
return fs_env_target_->OptimizeForCompactionTableRead(
FileOptions(env_options),
db_options);
}
Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetFreeSpace(path, io_opts, diskfree, &dbg);
}
private:
Env* env_target_;
FileSystem* fs_env_target_;
};
class LegacySequentialFileWrapper : public FSSequentialFile {
public:
explicit LegacySequentialFileWrapper(
std::unique_ptr<SequentialFile>&& _target)
: target_(std::move(_target)) {}
Introduce a new storage specific Env API (#5761) Summary: The current Env API encompasses both storage/file operations, as well as OS related operations. Most of the APIs return a Status, which does not have enough metadata about an error, such as whether its retry-able or not, scope (i.e fault domain) of the error etc., that may be required in order to properly handle a storage error. The file APIs also do not provide enough control over the IO SLA, such as timeout, prioritization, hinting about placement and redundancy etc. This PR separates out the file/storage APIs from Env into a new FileSystem class. The APIs are updated to return an IOStatus with metadata about the error, as well as to take an IOOptions structure as input in order to allow more control over the IO. The user can set both ```options.env``` and ```options.file_system``` to specify that RocksDB should use the former for OS related operations and the latter for storage operations. Internally, a ```CompositeEnvWrapper``` has been introduced that inherits from ```Env``` and redirects individual methods to either an ```Env``` implementation or the ```FileSystem``` as appropriate. When options are sanitized during ```DB::Open```, ```options.env``` is replaced with a newly allocated ```CompositeEnvWrapper``` instance if both env and file_system have been specified. This way, the rest of the RocksDB code can continue to function as before. This PR also ports PosixEnv to the new API by splitting it into two - PosixEnv and PosixFileSystem. PosixEnv is defined as a sub-class of CompositeEnvWrapper, and threading/time functions are overridden with Posix specific implementations in order to avoid an extra level of indirection. The ```CompositeEnvWrapper``` translates ```IOStatus``` return code to ```Status```, and sets the severity to ```kSoftError``` if the io_status is retryable. The error handling code in RocksDB can then recover the DB automatically. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5761 Differential Revision: D18868376 Pulled By: anand1976 fbshipit-source-id: 39efe18a162ea746fabac6360ff529baba48486f
2019-12-13 23:47:08 +01:00
IOStatus Read(size_t n, const IOOptions& /*options*/, Slice* result,
char* scratch, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Read(n, result, scratch));
}
IOStatus Skip(uint64_t n) override {
return status_to_io_status(target_->Skip(n));
}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
IOStatus InvalidateCache(size_t offset, size_t length) override {
return status_to_io_status(target_->InvalidateCache(offset, length));
}
IOStatus PositionedRead(uint64_t offset, size_t n,
const IOOptions& /*options*/, Slice* result,
char* scratch, IODebugContext* /*dbg*/) override {
return status_to_io_status(
target_->PositionedRead(offset, n, result, scratch));
}
SequentialFile* target() { return target_.get(); }
private:
std::unique_ptr<SequentialFile> target_;
};
class LegacyRandomAccessFileWrapper : public FSRandomAccessFile {
public:
explicit LegacyRandomAccessFileWrapper(
std::unique_ptr<RandomAccessFile>&& target)
: target_(std::move(target)) {}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
Slice* result, char* scratch,
IODebugContext* /*dbg*/) const override {
return status_to_io_status(target_->Read(offset, n, result, scratch));
}
IOStatus MultiRead(FSReadRequest* fs_reqs, size_t num_reqs,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
std::vector<ReadRequest> reqs;
Status status;
reqs.reserve(num_reqs);
for (size_t i = 0; i < num_reqs; ++i) {
ReadRequest req;
req.offset = fs_reqs[i].offset;
req.len = fs_reqs[i].len;
req.scratch = fs_reqs[i].scratch;
req.status = Status::OK();
reqs.emplace_back(req);
}
status = target_->MultiRead(reqs.data(), num_reqs);
for (size_t i = 0; i < num_reqs; ++i) {
fs_reqs[i].result = reqs[i].result;
fs_reqs[i].status = status_to_io_status(std::move(reqs[i].status));
}
return status_to_io_status(std::move(status));
;
}
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Prefetch(offset, n));
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
};
void Hint(AccessPattern pattern) override {
target_->Hint((RandomAccessFile::AccessPattern)pattern);
}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
IOStatus InvalidateCache(size_t offset, size_t length) override {
return status_to_io_status(target_->InvalidateCache(offset, length));
}
private:
std::unique_ptr<RandomAccessFile> target_;
};
class LegacyWritableFileWrapper : public FSWritableFile {
public:
explicit LegacyWritableFileWrapper(std::unique_ptr<WritableFile>&& _target)
: target_(std::move(_target)) {}
Introduce a new storage specific Env API (#5761) Summary: The current Env API encompasses both storage/file operations, as well as OS related operations. Most of the APIs return a Status, which does not have enough metadata about an error, such as whether its retry-able or not, scope (i.e fault domain) of the error etc., that may be required in order to properly handle a storage error. The file APIs also do not provide enough control over the IO SLA, such as timeout, prioritization, hinting about placement and redundancy etc. This PR separates out the file/storage APIs from Env into a new FileSystem class. The APIs are updated to return an IOStatus with metadata about the error, as well as to take an IOOptions structure as input in order to allow more control over the IO. The user can set both ```options.env``` and ```options.file_system``` to specify that RocksDB should use the former for OS related operations and the latter for storage operations. Internally, a ```CompositeEnvWrapper``` has been introduced that inherits from ```Env``` and redirects individual methods to either an ```Env``` implementation or the ```FileSystem``` as appropriate. When options are sanitized during ```DB::Open```, ```options.env``` is replaced with a newly allocated ```CompositeEnvWrapper``` instance if both env and file_system have been specified. This way, the rest of the RocksDB code can continue to function as before. This PR also ports PosixEnv to the new API by splitting it into two - PosixEnv and PosixFileSystem. PosixEnv is defined as a sub-class of CompositeEnvWrapper, and threading/time functions are overridden with Posix specific implementations in order to avoid an extra level of indirection. The ```CompositeEnvWrapper``` translates ```IOStatus``` return code to ```Status```, and sets the severity to ```kSoftError``` if the io_status is retryable. The error handling code in RocksDB can then recover the DB automatically. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5761 Differential Revision: D18868376 Pulled By: anand1976 fbshipit-source-id: 39efe18a162ea746fabac6360ff529baba48486f
2019-12-13 23:47:08 +01:00
IOStatus Append(const Slice& data, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Append(data));
}
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->PositionedAppend(data, offset));
}
IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Truncate(size));
}
IOStatus Close(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Close());
}
IOStatus Flush(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Flush());
}
IOStatus Sync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Sync());
}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Fsync());
}
bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
target_->SetWriteLifeTimeHint(hint);
}
Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
return target_->GetWriteLifeTimeHint();
}
uint64_t GetFileSize(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return target_->GetFileSize();
}
void SetPreallocationBlockSize(size_t size) override {
target_->SetPreallocationBlockSize(size);
}
void GetPreallocationStatus(size_t* block_size,
size_t* last_allocated_block) override {
target_->GetPreallocationStatus(block_size, last_allocated_block);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
IOStatus InvalidateCache(size_t offset, size_t length) override {
return status_to_io_status(target_->InvalidateCache(offset, length));
}
IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->RangeSync(offset, nbytes));
}
void PrepareWrite(size_t offset, size_t len, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
target_->PrepareWrite(offset, len);
}
IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Allocate(offset, len));
}
WritableFile* target() { return target_.get(); }
private:
std::unique_ptr<WritableFile> target_;
};
class LegacyRandomRWFileWrapper : public FSRandomRWFile {
public:
explicit LegacyRandomRWFileWrapper(std::unique_ptr<RandomRWFile>&& target)
: target_(std::move(target)) {}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
IOStatus Write(uint64_t offset, const Slice& data,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Write(offset, data));
}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
Slice* result, char* scratch,
IODebugContext* /*dbg*/) const override {
return status_to_io_status(target_->Read(offset, n, result, scratch));
}
IOStatus Flush(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Flush());
}
IOStatus Sync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Sync());
}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Fsync());
}
IOStatus Close(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Close());
}
private:
std::unique_ptr<RandomRWFile> target_;
};
class LegacyDirectoryWrapper : public FSDirectory {
public:
explicit LegacyDirectoryWrapper(std::unique_ptr<Directory>&& target)
: target_(std::move(target)) {}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Fsync());
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
private:
std::unique_ptr<Directory> target_;
};
class LegacyFileSystemWrapper : public FileSystem {
public:
// Initialize an EnvWrapper that delegates all calls to *t
explicit LegacyFileSystemWrapper(Env* t) : target_(t) {}
~LegacyFileSystemWrapper() override {}
const char* Name() const override { return "Legacy File System"; }
// Return the target to which this Env forwards all calls
Env* target() const { return target_; }
// The following text is boilerplate that forwards all methods to target()
IOStatus NewSequentialFile(const std::string& f,
const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<SequentialFile> file;
Status s = target_->NewSequentialFile(f, &file, file_opts);
if (s.ok()) {
r->reset(new LegacySequentialFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewRandomAccessFile(const std::string& f,
const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<RandomAccessFile> file;
Status s = target_->NewRandomAccessFile(f, &file, file_opts);
if (s.ok()) {
r->reset(new LegacyRandomAccessFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewWritableFile(const std::string& f, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<WritableFile> file;
Status s = target_->NewWritableFile(f, &file, file_opts);
if (s.ok()) {
r->reset(new LegacyWritableFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus ReopenWritableFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* /*dbg*/) override {
std::unique_ptr<WritableFile> file;
Status s = target_->ReopenWritableFile(fname, &file, file_opts);
if (s.ok()) {
result->reset(new LegacyWritableFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* /*dbg*/) override {
std::unique_ptr<WritableFile> file;
Status s = target_->ReuseWritableFile(fname, old_fname, &file, file_opts);
if (s.ok()) {
r->reset(new LegacyWritableFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewRandomRWFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSRandomRWFile>* result,
IODebugContext* /*dbg*/) override {
std::unique_ptr<RandomRWFile> file;
Status s = target_->NewRandomRWFile(fname, &file, file_opts);
if (s.ok()) {
result->reset(new LegacyRandomRWFileWrapper(std::move(file)));
}
return status_to_io_status(std::move(s));
}
IOStatus NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) override {
return status_to_io_status(
target_->NewMemoryMappedFileBuffer(fname, result));
}
IOStatus NewDirectory(const std::string& name, const IOOptions& /*io_opts*/,
std::unique_ptr<FSDirectory>* result,
IODebugContext* /*dbg*/) override {
std::unique_ptr<Directory> dir;
Status s = target_->NewDirectory(name, &dir);
if (s.ok()) {
result->reset(new LegacyDirectoryWrapper(std::move(dir)));
}
return status_to_io_status(std::move(s));
}
IOStatus FileExists(const std::string& f, const IOOptions& /*io_opts*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->FileExists(f));
}
IOStatus GetChildren(const std::string& dir, const IOOptions& /*io_opts*/,
std::vector<std::string>* r,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetChildren(dir, r));
}
IOStatus GetChildrenFileAttributes(const std::string& dir,
const IOOptions& /*options*/,
std::vector<FileAttributes>* result,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetChildrenFileAttributes(dir, result));
}
IOStatus DeleteFile(const std::string& f, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->DeleteFile(f));
}
IOStatus Truncate(const std::string& fname, size_t size,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Truncate(fname, size));
}
IOStatus CreateDir(const std::string& d, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->CreateDir(d));
}
IOStatus CreateDirIfMissing(const std::string& d,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->CreateDirIfMissing(d));
}
IOStatus DeleteDir(const std::string& d, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->DeleteDir(d));
}
IOStatus GetFileSize(const std::string& f, const IOOptions& /*options*/,
uint64_t* s, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetFileSize(f, s));
}
IOStatus GetFileModificationTime(const std::string& fname,
const IOOptions& /*options*/,
uint64_t* file_mtime,
IODebugContext* /*dbg*/) override {
return status_to_io_status(
target_->GetFileModificationTime(fname, file_mtime));
}
IOStatus GetAbsolutePath(const std::string& db_path,
const IOOptions& /*options*/,
std::string* output_path,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetAbsolutePath(db_path, output_path));
}
IOStatus RenameFile(const std::string& s, const std::string& t,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->RenameFile(s, t));
}
IOStatus LinkFile(const std::string& s, const std::string& t,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->LinkFile(s, t));
}
IOStatus NumFileLinks(const std::string& fname, const IOOptions& /*options*/,
uint64_t* count, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->NumFileLinks(fname, count));
}
IOStatus AreFilesSame(const std::string& first, const std::string& second,
const IOOptions& /*options*/, bool* res,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->AreFilesSame(first, second, res));
}
IOStatus LockFile(const std::string& f, const IOOptions& /*options*/,
FileLock** l, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->LockFile(f, l));
}
IOStatus UnlockFile(FileLock* l, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->UnlockFile(l));
}
IOStatus GetTestDirectory(const IOOptions& /*options*/, std::string* path,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetTestDirectory(path));
}
IOStatus NewLogger(const std::string& fname, const IOOptions& /*options*/,
std::shared_ptr<Logger>* result,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->NewLogger(fname, result));
}
FileOptions OptimizeForLogRead(
const FileOptions& file_options) const override {
return target_->OptimizeForLogRead(file_options);
}
FileOptions OptimizeForManifestRead(
const FileOptions& file_options) const override {
return target_->OptimizeForManifestRead(file_options);
}
FileOptions OptimizeForLogWrite(const FileOptions& file_options,
const DBOptions& db_options) const override {
return target_->OptimizeForLogWrite(file_options, db_options);
}
FileOptions OptimizeForManifestWrite(
const FileOptions& file_options) const override {
return target_->OptimizeForManifestWrite(file_options);
}
FileOptions OptimizeForCompactionTableWrite(
const FileOptions& file_options,
const ImmutableDBOptions& immutable_ops) const override {
return target_->OptimizeForCompactionTableWrite(file_options,
immutable_ops);
}
FileOptions OptimizeForCompactionTableRead(
const FileOptions& file_options,
const ImmutableDBOptions& db_options) const override {
return target_->OptimizeForCompactionTableRead(file_options, db_options);
}
IOStatus GetFreeSpace(const std::string& path, const IOOptions& /*options*/,
uint64_t* diskfree, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetFreeSpace(path, diskfree));
}
private:
Env* target_;
};
inline std::unique_ptr<FSSequentialFile> NewLegacySequentialFileWrapper(
std::unique_ptr<SequentialFile>& file) {
return std::unique_ptr<FSSequentialFile>(
new LegacySequentialFileWrapper(std::move(file)));
}
inline std::unique_ptr<FSRandomAccessFile> NewLegacyRandomAccessFileWrapper(
std::unique_ptr<RandomAccessFile>& file) {
return std::unique_ptr<FSRandomAccessFile>(
new LegacyRandomAccessFileWrapper(std::move(file)));
}
inline std::unique_ptr<FSWritableFile> NewLegacyWritableFileWrapper(
std::unique_ptr<WritableFile>&& file) {
return std::unique_ptr<FSWritableFile>(
new LegacyWritableFileWrapper(std::move(file)));
}
} // namespace rocksdb