rocksdb/port/port_posix.cc
sdong 6e9fbeb27c Move rate_limiter, write buffering, most perf context instrumentation and most random kill out of Env
Summary: We want to keep Env a think layer for better portability. Less platform dependent codes should be moved out of Env. In this patch, I create a wrapper of file readers and writers, and put rate limiting, write buffering, as well as most perf context instrumentation and random kill out of Env. It will make it easier to maintain multiple Env in the future.

Test Plan: Run all existing unit tests.

Reviewers: anthony, kradhakrishnan, IslamAbdelRahman, yhchiang, igor

Reviewed By: igor

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D42321
2015-07-17 16:58:18 -07:00

146 lines
3.8 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "port/port_posix.h"
#include <assert.h>
#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
#include <cstdlib>
#include "util/logging.h"
namespace rocksdb {
namespace port {
static int PthreadCall(const char* label, int result) {
if (result != 0 && result != ETIMEDOUT) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
abort();
}
return result;
}
Mutex::Mutex(bool adaptive) {
#ifdef OS_LINUX
if (!adaptive) {
PthreadCall("init mutex", pthread_mutex_init(&mu_, nullptr));
} else {
pthread_mutexattr_t mutex_attr;
PthreadCall("init mutex attr", pthread_mutexattr_init(&mutex_attr));
PthreadCall("set mutex attr",
pthread_mutexattr_settype(&mutex_attr,
PTHREAD_MUTEX_ADAPTIVE_NP));
PthreadCall("init mutex", pthread_mutex_init(&mu_, &mutex_attr));
PthreadCall("destroy mutex attr",
pthread_mutexattr_destroy(&mutex_attr));
}
#else // ignore adaptive for non-linux platform
PthreadCall("init mutex", pthread_mutex_init(&mu_, nullptr));
#endif // OS_LINUX
}
Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); }
void Mutex::Lock() {
PthreadCall("lock", pthread_mutex_lock(&mu_));
#ifndef NDEBUG
locked_ = true;
#endif
}
void Mutex::Unlock() {
#ifndef NDEBUG
locked_ = false;
#endif
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void Mutex::AssertHeld() {
#ifndef NDEBUG
assert(locked_);
#endif
}
CondVar::CondVar(Mutex* mu)
: mu_(mu) {
PthreadCall("init cv", pthread_cond_init(&cv_, nullptr));
}
CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); }
void CondVar::Wait() {
#ifndef NDEBUG
mu_->locked_ = false;
#endif
PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_));
#ifndef NDEBUG
mu_->locked_ = true;
#endif
}
bool CondVar::TimedWait(uint64_t abs_time_us) {
struct timespec ts;
ts.tv_sec = static_cast<time_t>(abs_time_us / 1000000);
ts.tv_nsec = static_cast<suseconds_t>((abs_time_us % 1000000) * 1000);
#ifndef NDEBUG
mu_->locked_ = false;
#endif
int err = pthread_cond_timedwait(&cv_, &mu_->mu_, &ts);
#ifndef NDEBUG
mu_->locked_ = true;
#endif
if (err == ETIMEDOUT) {
return true;
}
if (err != 0) {
PthreadCall("timedwait", err);
}
return false;
}
void CondVar::Signal() {
PthreadCall("signal", pthread_cond_signal(&cv_));
}
void CondVar::SignalAll() {
PthreadCall("broadcast", pthread_cond_broadcast(&cv_));
}
RWMutex::RWMutex() {
PthreadCall("init mutex", pthread_rwlock_init(&mu_, nullptr));
}
RWMutex::~RWMutex() { PthreadCall("destroy mutex", pthread_rwlock_destroy(&mu_)); }
void RWMutex::ReadLock() { PthreadCall("read lock", pthread_rwlock_rdlock(&mu_)); }
void RWMutex::WriteLock() { PthreadCall("write lock", pthread_rwlock_wrlock(&mu_)); }
void RWMutex::ReadUnlock() { PthreadCall("read unlock", pthread_rwlock_unlock(&mu_)); }
void RWMutex::WriteUnlock() { PthreadCall("write unlock", pthread_rwlock_unlock(&mu_)); }
void InitOnce(OnceType* once, void (*initializer)()) {
PthreadCall("once", pthread_once(once, initializer));
}
void Crash(const std::string& srcfile, int srcline) {
fprintf(stdout, "Crashing at %s:%d\n", srcfile.c_str(), srcline);
fflush(stdout);
kill(getpid(), SIGTERM);
}
} // namespace port
} // namespace rocksdb