rocksdb/env/env_posix.cc
anand76 afa2420c2b Introduce a new storage specific Env API (#5761)
Summary:
The current Env API encompasses both storage/file operations, as well as OS related operations. Most of the APIs return a Status, which does not have enough metadata about an error, such as whether its retry-able or not, scope (i.e fault domain) of the error etc., that may be required in order to properly handle a storage error. The file APIs also do not provide enough control over the IO SLA, such as timeout, prioritization, hinting about placement and redundancy etc.

This PR separates out the file/storage APIs from Env into a new FileSystem class. The APIs are updated to return an IOStatus with metadata about the error, as well as to take an IOOptions structure as input in order to allow more control over the IO.

The user can set both ```options.env``` and ```options.file_system``` to specify that RocksDB should use the former for OS related operations and the latter for storage operations. Internally, a ```CompositeEnvWrapper``` has been introduced that inherits from ```Env``` and redirects individual methods to either an ```Env``` implementation or the ```FileSystem``` as appropriate. When options are sanitized during ```DB::Open```, ```options.env``` is replaced with a newly allocated ```CompositeEnvWrapper``` instance if both env and file_system have been specified. This way, the rest of the RocksDB code can continue to function as before.

This PR also ports PosixEnv to the new API by splitting it into two - PosixEnv and PosixFileSystem. PosixEnv is defined as a sub-class of CompositeEnvWrapper, and threading/time functions are overridden with Posix specific implementations in order to avoid an extra level of indirection.

The ```CompositeEnvWrapper``` translates ```IOStatus``` return code to ```Status```, and sets the severity to ```kSoftError``` if the io_status is retryable. The error handling code in RocksDB can then recover the DB automatically.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5761

Differential Revision: D18868376

Pulled By: anand1976

fbshipit-source-id: 39efe18a162ea746fabac6360ff529baba48486f
2019-12-13 14:48:41 -08:00

538 lines
16 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors
#include <dirent.h>
#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
#include <dlfcn.h>
#endif
#include <errno.h>
#include <fcntl.h>
#if defined(OS_LINUX)
#include <linux/fs.h>
#endif
#if defined(ROCKSDB_IOURING_PRESENT)
#include <liburing.h>
#endif
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
#include <sys/statfs.h>
#include <sys/syscall.h>
#include <sys/sysmacros.h>
#endif
#include <sys/statvfs.h>
#include <sys/time.h>
#include <sys/types.h>
#if defined(ROCKSDB_IOURING_PRESENT)
#include <sys/uio.h>
#endif
#include <time.h>
#include <algorithm>
// Get nano time includes
#if defined(OS_LINUX) || defined(OS_FREEBSD)
#elif defined(__MACH__)
#include <Availability.h>
#include <mach/clock.h>
#include <mach/mach.h>
#else
#include <chrono>
#endif
#include <deque>
#include <set>
#include <vector>
#include "env/composite_env_wrapper.h"
#include "env/io_posix.h"
#include "logging/logging.h"
#include "logging/posix_logger.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_updater.h"
#include "port/port.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "test_util/sync_point.h"
#include "util/coding.h"
#include "util/compression_context_cache.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/thread_local.h"
#include "util/threadpool_imp.h"
#if !defined(TMPFS_MAGIC)
#define TMPFS_MAGIC 0x01021994
#endif
#if !defined(XFS_SUPER_MAGIC)
#define XFS_SUPER_MAGIC 0x58465342
#endif
#if !defined(EXT4_SUPER_MAGIC)
#define EXT4_SUPER_MAGIC 0xEF53
#endif
namespace rocksdb {
#if defined(OS_WIN)
static const std::string kSharedLibExt = ".dll";
static const char kPathSeparator = ';';
#else
static const char kPathSeparator = ':';
#if defined(OS_MACOSX)
static const std::string kSharedLibExt = ".dylib";
#else
static const std::string kSharedLibExt = ".so";
#endif
#endif
namespace {
ThreadStatusUpdater* CreateThreadStatusUpdater() {
return new ThreadStatusUpdater();
}
// list of pathnames that are locked
static std::set<std::string> lockedFiles;
static port::Mutex mutex_lockedFiles;
class PosixFileLock : public FileLock {
public:
int fd_;
std::string filename;
};
#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
class PosixDynamicLibrary : public DynamicLibrary {
public:
PosixDynamicLibrary(const std::string& name, void* handle)
: name_(name), handle_(handle) {}
~PosixDynamicLibrary() override { dlclose(handle_); }
Status LoadSymbol(const std::string& sym_name, void** func) override {
assert(nullptr != func);
dlerror(); // Clear any old error
*func = dlsym(handle_, sym_name.c_str());
if (*func != nullptr) {
return Status::OK();
} else {
char* err = dlerror();
return Status::NotFound("Error finding symbol: " + sym_name, err);
}
}
const char* Name() const override { return name_.c_str(); }
private:
std::string name_;
void* handle_;
};
#endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
class PosixEnv : public CompositeEnvWrapper {
public:
PosixEnv();
~PosixEnv() override {
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
}
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].JoinAllThreads();
}
// Delete the thread_status_updater_ only when the current Env is not
// Env::Default(). This is to avoid the free-after-use error when
// Env::Default() is destructed while some other child threads are
// still trying to update thread status.
if (this != Env::Default()) {
delete thread_status_updater_;
}
}
void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
}
}
#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
// Loads the named library into the result.
// If the input name is empty, the current executable is loaded
// On *nix systems, a "lib" prefix is added to the name if one is not supplied
// Comparably, the appropriate shared library extension is added to the name
// if not supplied. If search_path is not specified, the shared library will
// be loaded using the default path (LD_LIBRARY_PATH) If search_path is
// specified, the shared library will be searched for in the directories
// provided by the search path
Status LoadLibrary(const std::string& name, const std::string& path,
std::shared_ptr<DynamicLibrary>* result) override {
Status status;
assert(result != nullptr);
if (name.empty()) {
void* hndl = dlopen(NULL, RTLD_NOW);
if (hndl != nullptr) {
result->reset(new PosixDynamicLibrary(name, hndl));
return Status::OK();
}
} else {
std::string library_name = name;
if (library_name.find(kSharedLibExt) == std::string::npos) {
library_name = library_name + kSharedLibExt;
}
#if !defined(OS_WIN)
if (library_name.find('/') == std::string::npos &&
library_name.compare(0, 3, "lib") != 0) {
library_name = "lib" + library_name;
}
#endif
if (path.empty()) {
void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
if (hndl != nullptr) {
result->reset(new PosixDynamicLibrary(library_name, hndl));
return Status::OK();
}
} else {
std::string local_path;
std::stringstream ss(path);
while (getline(ss, local_path, kPathSeparator)) {
if (!path.empty()) {
std::string full_name = local_path + "/" + library_name;
void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
if (hndl != nullptr) {
result->reset(new PosixDynamicLibrary(full_name, hndl));
return Status::OK();
}
}
}
}
}
return Status::IOError(
IOErrorMsg("Failed to open shared library: xs", name), dlerror());
}
#endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
void* tag = nullptr,
void (*unschedFunction)(void* arg) = nullptr) override;
int UnSchedule(void* arg, Priority pri) override;
void StartThread(void (*function)(void* arg), void* arg) override;
void WaitForJoin() override;
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
Status GetTestDirectory(std::string* result) override {
const char* env = getenv("TEST_TMPDIR");
if (env && env[0] != '\0') {
*result = env;
} else {
char buf[100];
snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid()));
*result = buf;
}
// Directory may already exist
CreateDir(*result);
return Status::OK();
}
Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
assert(thread_status_updater_);
return thread_status_updater_->GetThreadList(thread_list);
}
static uint64_t gettid(pthread_t tid) {
uint64_t thread_id = 0;
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
return thread_id;
}
static uint64_t gettid() {
pthread_t tid = pthread_self();
return gettid(tid);
}
uint64_t GetThreadID() const override { return gettid(pthread_self()); }
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override {
FILE* f;
{
IOSTATS_TIMER_GUARD(open_nanos);
f = fopen(fname.c_str(),
"w"
#ifdef __GLIBC_PREREQ
#if __GLIBC_PREREQ(2, 7)
"e" // glibc extension to enable O_CLOEXEC
#endif
#endif
);
}
if (f == nullptr) {
result->reset();
return IOError("when fopen a file for new logger", fname, errno);
} else {
int fd = fileno(f);
#ifdef ROCKSDB_FALLOCATE_PRESENT
fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
#endif
SetFD_CLOEXEC(fd, nullptr);
result->reset(new PosixLogger(f, &PosixEnv::gettid, this));
return Status::OK();
}
}
uint64_t NowMicros() override {
struct timeval tv;
gettimeofday(&tv, nullptr);
return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
}
uint64_t NowNanos() override {
#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX)
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
#elif defined(OS_SOLARIS)
return gethrtime();
#elif defined(__MACH__)
clock_serv_t cclock;
mach_timespec_t ts;
host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
clock_get_time(cclock, &ts);
mach_port_deallocate(mach_task_self(), cclock);
return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
#else
return std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
#endif
}
uint64_t NowCPUNanos() override {
#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \
(defined(__MACH__) && defined(__MAC_10_12))
struct timespec ts;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
#endif
return 0;
}
void SleepForMicroseconds(int micros) override { usleep(micros); }
Status GetHostName(char* name, uint64_t len) override {
int ret = gethostname(name, static_cast<size_t>(len));
if (ret < 0) {
if (errno == EFAULT || errno == EINVAL) {
return Status::InvalidArgument(strerror(errno));
} else {
return IOError("GetHostName", name, errno);
}
}
return Status::OK();
}
Status GetCurrentTime(int64_t* unix_time) override {
time_t ret = time(nullptr);
if (ret == (time_t) -1) {
return IOError("GetCurrentTime", "", errno);
}
*unix_time = (int64_t) ret;
return Status::OK();
}
ThreadStatusUpdater* GetThreadStatusUpdater() const override {
return Env::GetThreadStatusUpdater();
}
std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
// Allow increasing the number of worker threads.
void SetBackgroundThreads(int num, Priority pri) override {
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].SetBackgroundThreads(num);
}
int GetBackgroundThreads(Priority pri) override {
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
return thread_pools_[pri].GetBackgroundThreads();
}
Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
allow_non_owner_access_ = allow_non_owner_access;
return Status::OK();
}
// Allow increasing the number of worker threads.
void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
}
void LowerThreadPoolIOPriority(Priority pool = LOW) override {
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
#ifdef OS_LINUX
thread_pools_[pool].LowerIOPriority();
#else
(void)pool;
#endif
}
void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
#ifdef OS_LINUX
thread_pools_[pool].LowerCPUPriority();
#else
(void)pool;
#endif
}
std::string TimeToString(uint64_t secondsSince1970) override {
const time_t seconds = (time_t)secondsSince1970;
struct tm t;
int maxsize = 64;
std::string dummy;
dummy.reserve(maxsize);
dummy.resize(maxsize);
char* p = &dummy[0];
localtime_r(&seconds, &t);
snprintf(p, maxsize,
"%04d/%02d/%02d-%02d:%02d:%02d ",
t.tm_year + 1900,
t.tm_mon + 1,
t.tm_mday,
t.tm_hour,
t.tm_min,
t.tm_sec);
return dummy;
}
private:
std::vector<ThreadPoolImpl> thread_pools_;
pthread_mutex_t mu_;
std::vector<pthread_t> threads_to_join_;
// If true, allow non owner read access for db files. Otherwise, non-owner
// has no access to db files.
bool allow_non_owner_access_;
};
PosixEnv::PosixEnv()
: CompositeEnvWrapper(this, FileSystem::Default().get()),
thread_pools_(Priority::TOTAL),
allow_non_owner_access_(true) {
ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].SetThreadPriority(
static_cast<Env::Priority>(pool_id));
// This allows later initializing the thread-local-env of each thread.
thread_pools_[pool_id].SetHostEnv(this);
}
thread_status_updater_ = CreateThreadStatusUpdater();
}
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
void* tag, void (*unschedFunction)(void* arg)) {
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
}
int PosixEnv::UnSchedule(void* arg, Priority pri) {
return thread_pools_[pri].UnSchedule(arg);
}
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
return thread_pools_[pri].GetQueueLen();
}
struct StartThreadState {
void (*user_function)(void*);
void* arg;
};
static void* StartThreadWrapper(void* arg) {
StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
state->user_function(state->arg);
delete state;
return nullptr;
}
void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
pthread_t t;
StartThreadState* state = new StartThreadState;
state->user_function = function;
state->arg = arg;
ThreadPoolImpl::PthreadCall(
"start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
threads_to_join_.push_back(t);
ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void PosixEnv::WaitForJoin() {
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
}
threads_to_join_.clear();
}
} // namespace
std::string Env::GenerateUniqueId() {
std::string uuid_file = "/proc/sys/kernel/random/uuid";
Status s = FileExists(uuid_file);
if (s.ok()) {
std::string uuid;
s = ReadFileToString(this, uuid_file, &uuid);
if (s.ok()) {
return uuid;
}
}
// Could not read uuid_file - generate uuid using "nanos-random"
Random64 r(time(nullptr));
uint64_t random_uuid_portion =
r.Uniform(std::numeric_limits<uint64_t>::max());
uint64_t nanos_uuid_portion = NowNanos();
char uuid2[200];
snprintf(uuid2,
200,
"%lx-%lx",
(unsigned long)nanos_uuid_portion,
(unsigned long)random_uuid_portion);
return uuid2;
}
//
// Default Posix Env
//
Env* Env::Default() {
// The following function call initializes the singletons of ThreadLocalPtr
// right before the static default_env. This guarantees default_env will
// always being destructed before the ThreadLocalPtr singletons get
// destructed as C++ guarantees that the destructions of static variables
// is in the reverse order of their constructions.
//
// Since static members are destructed in the reverse order
// of their construction, having this call here guarantees that
// the destructor of static PosixEnv will go first, then the
// the singletons of ThreadLocalPtr.
ThreadLocalPtr::InitSingletons();
CompressionContextCache::InitSingleton();
INIT_SYNC_POINT_SINGLETONS();
static PosixEnv default_env;
static CompositeEnvWrapper composite_env(&default_env,
FileSystem::Default().get());
return &composite_env;
}
} // namespace rocksdb