rocksdb/env/mock_env.cc
Zhongyi Xie cac87fcf57 move dump stats to a separate thread (#4382)
Summary:
Currently statistics are supposed to be dumped to info log at intervals of `options.stats_dump_period_sec`. However the implementation choice was to bind it with compaction thread, meaning if the database has been serving very light traffic, the stats may not get dumped at all.
We decided to separate stats dumping into a new timed thread using `TimerQueue`, which is already used in blob_db. This will allow us schedule new timed tasks with more deterministic behavior.

Tested with db_bench using `--stats_dump_period_sec=20` in command line:
> LOG:2018/09/17-14:07:45.575025 7fe99fbfe700 [WARN] [db/db_impl.cc:605] ------- DUMPING STATS -------
LOG:2018/09/17-14:08:05.643286 7fe99fbfe700 [WARN] [db/db_impl.cc:605] ------- DUMPING STATS -------
LOG:2018/09/17-14:08:25.691325 7fe99fbfe700 [WARN] [db/db_impl.cc:605] ------- DUMPING STATS -------
LOG:2018/09/17-14:08:45.740989 7fe99fbfe700 [WARN] [db/db_impl.cc:605] ------- DUMPING STATS -------

LOG content:
> 2018/09/17-14:07:45.575025 7fe99fbfe700 [WARN] [db/db_impl.cc:605] ------- DUMPING STATS -------
2018/09/17-14:07:45.575080 7fe99fbfe700 [WARN] [db/db_impl.cc:606]
** DB Stats **
Uptime(secs): 20.0 total, 20.0 interval
Cumulative writes: 4447K writes, 4447K keys, 4447K commit groups, 1.0 writes per commit group, ingest: 5.57 GB, 285.01 MB/s
Cumulative WAL: 4447K writes, 0 syncs, 4447638.00 writes per sync, written: 5.57 GB, 285.01 MB/s
Cumulative stall: 00:00:0.012 H:M:S, 0.1 percent
Interval writes: 4447K writes, 4447K keys, 4447K commit groups, 1.0 writes per commit group, ingest: 5700.71 MB, 285.01 MB/s
Interval WAL: 4447K writes, 0 syncs, 4447638.00 writes per sync, written: 5.57 MB, 285.01 MB/s
Interval stall: 00:00:0.012 H:M:S, 0.1 percent
** Compaction Stats [default] **
Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4382

Differential Revision: D9933051

Pulled By: miasantreble

fbshipit-source-id: 6d12bb1e4977674eea4bf2d2ac6d486b814bb2fa
2018-10-08 22:54:43 -07:00

776 lines
21 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "env/mock_env.h"
#include <algorithm>
#include <chrono>
#include "port/sys_time.h"
#include "util/cast_util.h"
#include "util/murmurhash.h"
#include "util/random.h"
#include "util/rate_limiter.h"
namespace rocksdb {
class MemFile {
public:
explicit MemFile(Env* env, const std::string& fn, bool _is_lock_file = false)
: env_(env),
fn_(fn),
refs_(0),
is_lock_file_(_is_lock_file),
locked_(false),
size_(0),
modified_time_(Now()),
rnd_(static_cast<uint32_t>(
MurmurHash(fn.data(), static_cast<int>(fn.size()), 0))),
fsynced_bytes_(0) {}
void Ref() {
MutexLock lock(&mutex_);
++refs_;
}
bool is_lock_file() const { return is_lock_file_; }
bool Lock() {
assert(is_lock_file_);
MutexLock lock(&mutex_);
if (locked_) {
return false;
} else {
locked_ = true;
return true;
}
}
void Unlock() {
assert(is_lock_file_);
MutexLock lock(&mutex_);
locked_ = false;
}
void Unref() {
bool do_delete = false;
{
MutexLock lock(&mutex_);
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
do_delete = true;
}
}
if (do_delete) {
delete this;
}
}
uint64_t Size() const { return size_; }
void Truncate(size_t size) {
MutexLock lock(&mutex_);
if (size < size_) {
data_.resize(size);
size_ = size;
}
}
void CorruptBuffer() {
if (fsynced_bytes_ >= size_) {
return;
}
uint64_t buffered_bytes = size_ - fsynced_bytes_;
uint64_t start =
fsynced_bytes_ + rnd_.Uniform(static_cast<int>(buffered_bytes));
uint64_t end = std::min(start + 512, size_.load());
MutexLock lock(&mutex_);
for (uint64_t pos = start; pos < end; ++pos) {
data_[static_cast<size_t>(pos)] = static_cast<char>(rnd_.Uniform(256));
}
}
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const {
MutexLock lock(&mutex_);
const uint64_t available = Size() - std::min(Size(), offset);
size_t offset_ = static_cast<size_t>(offset);
if (n > available) {
n = static_cast<size_t>(available);
}
if (n == 0) {
*result = Slice();
return Status::OK();
}
if (scratch) {
memcpy(scratch, &(data_[offset_]), n);
*result = Slice(scratch, n);
} else {
*result = Slice(&(data_[offset_]), n);
}
return Status::OK();
}
Status Write(uint64_t offset, const Slice& data) {
MutexLock lock(&mutex_);
size_t offset_ = static_cast<size_t>(offset);
if (offset + data.size() > data_.size()) {
data_.resize(offset_ + data.size());
}
data_.replace(offset_, data.size(), data.data(), data.size());
size_ = data_.size();
modified_time_ = Now();
return Status::OK();
}
Status Append(const Slice& data) {
MutexLock lock(&mutex_);
data_.append(data.data(), data.size());
size_ = data_.size();
modified_time_ = Now();
return Status::OK();
}
Status Fsync() {
fsynced_bytes_ = size_.load();
return Status::OK();
}
uint64_t ModifiedTime() const { return modified_time_; }
private:
uint64_t Now() {
int64_t unix_time = 0;
auto s = env_->GetCurrentTime(&unix_time);
assert(s.ok());
return static_cast<uint64_t>(unix_time);
}
// Private since only Unref() should be used to delete it.
~MemFile() { assert(refs_ == 0); }
// No copying allowed.
MemFile(const MemFile&);
void operator=(const MemFile&);
Env* env_;
const std::string fn_;
mutable port::Mutex mutex_;
int refs_;
bool is_lock_file_;
bool locked_;
// Data written into this file, all bytes before fsynced_bytes are
// persistent.
std::string data_;
std::atomic<uint64_t> size_;
std::atomic<uint64_t> modified_time_;
Random rnd_;
std::atomic<uint64_t> fsynced_bytes_;
};
namespace {
class MockSequentialFile : public SequentialFile {
public:
explicit MockSequentialFile(MemFile* file) : file_(file), pos_(0) {
file_->Ref();
}
~MockSequentialFile() { file_->Unref(); }
virtual Status Read(size_t n, Slice* result, char* scratch) override {
Status s = file_->Read(pos_, n, result, scratch);
if (s.ok()) {
pos_ += result->size();
}
return s;
}
virtual Status Skip(uint64_t n) override {
if (pos_ > file_->Size()) {
return Status::IOError("pos_ > file_->Size()");
}
const uint64_t available = file_->Size() - pos_;
if (n > available) {
n = available;
}
pos_ += static_cast<size_t>(n);
return Status::OK();
}
private:
MemFile* file_;
size_t pos_;
};
class MockRandomAccessFile : public RandomAccessFile {
public:
explicit MockRandomAccessFile(MemFile* file) : file_(file) { file_->Ref(); }
~MockRandomAccessFile() { file_->Unref(); }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
return file_->Read(offset, n, result, scratch);
}
private:
MemFile* file_;
};
class MockRandomRWFile : public RandomRWFile {
public:
explicit MockRandomRWFile(MemFile* file) : file_(file) { file_->Ref(); }
~MockRandomRWFile() { file_->Unref(); }
virtual Status Write(uint64_t offset, const Slice& data) override {
return file_->Write(offset, data);
}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
return file_->Read(offset, n, result, scratch);
}
virtual Status Close() override { return file_->Fsync(); }
virtual Status Flush() override { return Status::OK(); }
virtual Status Sync() override { return file_->Fsync(); }
private:
MemFile* file_;
};
class MockWritableFile : public WritableFile {
public:
MockWritableFile(MemFile* file, RateLimiter* rate_limiter)
: file_(file), rate_limiter_(rate_limiter) {
file_->Ref();
}
~MockWritableFile() { file_->Unref(); }
virtual Status Append(const Slice& data) override {
size_t bytes_written = 0;
while (bytes_written < data.size()) {
auto bytes = RequestToken(data.size() - bytes_written);
Status s = file_->Append(Slice(data.data() + bytes_written, bytes));
if (!s.ok()) {
return s;
}
bytes_written += bytes;
}
return Status::OK();
}
virtual Status Truncate(uint64_t size) override {
file_->Truncate(static_cast<size_t>(size));
return Status::OK();
}
virtual Status Close() override { return file_->Fsync(); }
virtual Status Flush() override { return Status::OK(); }
virtual Status Sync() override { return file_->Fsync(); }
virtual uint64_t GetFileSize() override { return file_->Size(); }
private:
inline size_t RequestToken(size_t bytes) {
if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) {
bytes = std::min(
bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
rate_limiter_->Request(bytes, io_priority_);
}
return bytes;
}
MemFile* file_;
RateLimiter* rate_limiter_;
};
class MockEnvDirectory : public Directory {
public:
virtual Status Fsync() override { return Status::OK(); }
};
class MockEnvFileLock : public FileLock {
public:
explicit MockEnvFileLock(const std::string& fname) : fname_(fname) {}
std::string FileName() const { return fname_; }
private:
const std::string fname_;
};
class TestMemLogger : public Logger {
private:
std::unique_ptr<WritableFile> file_;
std::atomic_size_t log_size_;
static const uint64_t flush_every_seconds_ = 5;
std::atomic_uint_fast64_t last_flush_micros_;
Env* env_;
std::atomic<bool> flush_pending_;
public:
TestMemLogger(std::unique_ptr<WritableFile> f, Env* env,
const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
: Logger(log_level),
file_(std::move(f)),
log_size_(0),
last_flush_micros_(0),
env_(env),
flush_pending_(false) {}
virtual ~TestMemLogger() {}
virtual void Flush() override {
if (flush_pending_) {
flush_pending_ = false;
}
last_flush_micros_ = env_->NowMicros();
}
using Logger::Logv;
virtual void Logv(const char* format, va_list ap) override {
// We try twice: the first time with a fixed-size stack allocated buffer,
// and the second time with a much larger dynamically allocated buffer.
char buffer[500];
for (int iter = 0; iter < 2; iter++) {
char* base;
int bufsize;
if (iter == 0) {
bufsize = sizeof(buffer);
base = buffer;
} else {
bufsize = 30000;
base = new char[bufsize];
}
char* p = base;
char* limit = base + bufsize;
struct timeval now_tv;
gettimeofday(&now_tv, nullptr);
const time_t seconds = now_tv.tv_sec;
struct tm t;
memset(&t, 0, sizeof(t));
struct tm* ret __attribute__((__unused__));
ret = localtime_r(&seconds, &t);
assert(ret);
p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d ",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour,
t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec));
// Print the message
if (p < limit) {
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Truncate to available space if necessary
if (p >= limit) {
if (iter == 0) {
continue; // Try again with larger buffer
} else {
p = limit - 1;
}
}
// Add newline if necessary
if (p == base || p[-1] != '\n') {
*p++ = '\n';
}
assert(p <= limit);
const size_t write_size = p - base;
file_->Append(Slice(base, write_size));
flush_pending_ = true;
log_size_ += write_size;
uint64_t now_micros =
static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + now_tv.tv_usec;
if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
flush_pending_ = false;
last_flush_micros_ = now_micros;
}
if (base != buffer) {
delete[] base;
}
break;
}
}
size_t GetLogFileSize() const override { return log_size_; }
};
} // Anonymous namespace
MockEnv::MockEnv(Env* base_env) : EnvWrapper(base_env), fake_sleep_micros_(0) {}
MockEnv::~MockEnv() {
for (FileSystem::iterator i = file_map_.begin(); i != file_map_.end(); ++i) {
i->second->Unref();
}
}
// Partial implementation of the Env interface.
Status MockEnv::NewSequentialFile(const std::string& fname,
unique_ptr<SequentialFile>* result,
const EnvOptions& /*soptions*/) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
if (file_map_.find(fn) == file_map_.end()) {
*result = nullptr;
return Status::IOError(fn, "File not found");
}
auto* f = file_map_[fn];
if (f->is_lock_file()) {
return Status::InvalidArgument(fn, "Cannot open a lock file.");
}
result->reset(new MockSequentialFile(f));
return Status::OK();
}
Status MockEnv::NewRandomAccessFile(const std::string& fname,
unique_ptr<RandomAccessFile>* result,
const EnvOptions& /*soptions*/) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
if (file_map_.find(fn) == file_map_.end()) {
*result = nullptr;
return Status::IOError(fn, "File not found");
}
auto* f = file_map_[fn];
if (f->is_lock_file()) {
return Status::InvalidArgument(fn, "Cannot open a lock file.");
}
result->reset(new MockRandomAccessFile(f));
return Status::OK();
}
Status MockEnv::NewRandomRWFile(const std::string& fname,
unique_ptr<RandomRWFile>* result,
const EnvOptions& /*soptions*/) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
if (file_map_.find(fn) == file_map_.end()) {
*result = nullptr;
return Status::IOError(fn, "File not found");
}
auto* f = file_map_[fn];
if (f->is_lock_file()) {
return Status::InvalidArgument(fn, "Cannot open a lock file.");
}
result->reset(new MockRandomRWFile(f));
return Status::OK();
}
Status MockEnv::ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) {
auto s = RenameFile(old_fname, fname);
if (!s.ok()) {
return s;
}
result->reset();
return NewWritableFile(fname, result, options);
}
Status MockEnv::NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& env_options) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
if (file_map_.find(fn) != file_map_.end()) {
DeleteFileInternal(fn);
}
MemFile* file = new MemFile(this, fn, false);
file->Ref();
file_map_[fn] = file;
result->reset(new MockWritableFile(file, env_options.rate_limiter));
return Status::OK();
}
Status MockEnv::NewDirectory(const std::string& /*name*/,
unique_ptr<Directory>* result) {
result->reset(new MockEnvDirectory());
return Status::OK();
}
Status MockEnv::FileExists(const std::string& fname) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
if (file_map_.find(fn) != file_map_.end()) {
// File exists
return Status::OK();
}
// Now also check if fn exists as a dir
for (const auto& iter : file_map_) {
const std::string& filename = iter.first;
if (filename.size() >= fn.size() + 1 && filename[fn.size()] == '/' &&
Slice(filename).starts_with(Slice(fn))) {
return Status::OK();
}
}
return Status::NotFound();
}
Status MockEnv::GetChildren(const std::string& dir,
std::vector<std::string>* result) {
auto d = NormalizePath(dir);
bool found_dir = false;
{
MutexLock lock(&mutex_);
result->clear();
for (const auto& iter : file_map_) {
const std::string& filename = iter.first;
if (filename == d) {
found_dir = true;
} else if (filename.size() >= d.size() + 1 && filename[d.size()] == '/' &&
Slice(filename).starts_with(Slice(d))) {
found_dir = true;
size_t next_slash = filename.find('/', d.size() + 1);
if (next_slash != std::string::npos) {
result->push_back(
filename.substr(d.size() + 1, next_slash - d.size() - 1));
} else {
result->push_back(filename.substr(d.size() + 1));
}
}
}
}
result->erase(std::unique(result->begin(), result->end()), result->end());
return found_dir ? Status::OK() : Status::NotFound();
}
void MockEnv::DeleteFileInternal(const std::string& fname) {
assert(fname == NormalizePath(fname));
const auto& pair = file_map_.find(fname);
if (pair != file_map_.end()) {
pair->second->Unref();
file_map_.erase(fname);
}
}
Status MockEnv::DeleteFile(const std::string& fname) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
if (file_map_.find(fn) == file_map_.end()) {
return Status::IOError(fn, "File not found");
}
DeleteFileInternal(fn);
return Status::OK();
}
Status MockEnv::Truncate(const std::string& fname, size_t size) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
auto iter = file_map_.find(fn);
if (iter == file_map_.end()) {
return Status::IOError(fn, "File not found");
}
iter->second->Truncate(size);
return Status::OK();
}
Status MockEnv::CreateDir(const std::string& dirname) {
auto dn = NormalizePath(dirname);
if (file_map_.find(dn) == file_map_.end()) {
MemFile* file = new MemFile(this, dn, false);
file->Ref();
file_map_[dn] = file;
} else {
return Status::IOError();
}
return Status::OK();
}
Status MockEnv::CreateDirIfMissing(const std::string& dirname) {
CreateDir(dirname);
return Status::OK();
}
Status MockEnv::DeleteDir(const std::string& dirname) {
return DeleteFile(dirname);
}
Status MockEnv::GetFileSize(const std::string& fname, uint64_t* file_size) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
auto iter = file_map_.find(fn);
if (iter == file_map_.end()) {
return Status::IOError(fn, "File not found");
}
*file_size = iter->second->Size();
return Status::OK();
}
Status MockEnv::GetFileModificationTime(const std::string& fname,
uint64_t* time) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
auto iter = file_map_.find(fn);
if (iter == file_map_.end()) {
return Status::IOError(fn, "File not found");
}
*time = iter->second->ModifiedTime();
return Status::OK();
}
Status MockEnv::RenameFile(const std::string& src, const std::string& dest) {
auto s = NormalizePath(src);
auto t = NormalizePath(dest);
MutexLock lock(&mutex_);
if (file_map_.find(s) == file_map_.end()) {
return Status::IOError(s, "File not found");
}
DeleteFileInternal(t);
file_map_[t] = file_map_[s];
file_map_.erase(s);
return Status::OK();
}
Status MockEnv::LinkFile(const std::string& src, const std::string& dest) {
auto s = NormalizePath(src);
auto t = NormalizePath(dest);
MutexLock lock(&mutex_);
if (file_map_.find(s) == file_map_.end()) {
return Status::IOError(s, "File not found");
}
DeleteFileInternal(t);
file_map_[t] = file_map_[s];
file_map_[t]->Ref(); // Otherwise it might get deleted when noone uses s
return Status::OK();
}
Status MockEnv::NewLogger(const std::string& fname,
shared_ptr<Logger>* result) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
auto iter = file_map_.find(fn);
MemFile* file = nullptr;
if (iter == file_map_.end()) {
file = new MemFile(this, fn, false);
file->Ref();
file_map_[fn] = file;
} else {
file = iter->second;
}
std::unique_ptr<WritableFile> f(new MockWritableFile(file, nullptr));
result->reset(new TestMemLogger(std::move(f), this));
return Status::OK();
}
Status MockEnv::LockFile(const std::string& fname, FileLock** flock) {
auto fn = NormalizePath(fname);
{
MutexLock lock(&mutex_);
if (file_map_.find(fn) != file_map_.end()) {
if (!file_map_[fn]->is_lock_file()) {
return Status::InvalidArgument(fname, "Not a lock file.");
}
if (!file_map_[fn]->Lock()) {
return Status::IOError(fn, "Lock is already held.");
}
} else {
auto* file = new MemFile(this, fn, true);
file->Ref();
file->Lock();
file_map_[fn] = file;
}
}
*flock = new MockEnvFileLock(fn);
return Status::OK();
}
Status MockEnv::UnlockFile(FileLock* flock) {
std::string fn =
static_cast_with_check<MockEnvFileLock, FileLock>(flock)->FileName();
{
MutexLock lock(&mutex_);
if (file_map_.find(fn) != file_map_.end()) {
if (!file_map_[fn]->is_lock_file()) {
return Status::InvalidArgument(fn, "Not a lock file.");
}
file_map_[fn]->Unlock();
}
}
delete flock;
return Status::OK();
}
Status MockEnv::GetTestDirectory(std::string* path) {
*path = "/test";
return Status::OK();
}
Status MockEnv::GetCurrentTime(int64_t* unix_time) {
auto s = EnvWrapper::GetCurrentTime(unix_time);
if (s.ok()) {
*unix_time += fake_sleep_micros_.load() / (1000 * 1000);
}
return s;
}
uint64_t MockEnv::NowMicros() {
return EnvWrapper::NowMicros() + fake_sleep_micros_.load();
}
uint64_t MockEnv::NowNanos() {
return EnvWrapper::NowNanos() + fake_sleep_micros_.load() * 1000;
}
Status MockEnv::CorruptBuffer(const std::string& fname) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
auto iter = file_map_.find(fn);
if (iter == file_map_.end()) {
return Status::IOError(fn, "File not found");
}
iter->second->CorruptBuffer();
return Status::OK();
}
std::string MockEnv::NormalizePath(const std::string path) {
std::string dst;
for (auto c : path) {
if (!dst.empty() && c == '/' && dst.back() == '/') {
continue;
}
dst.push_back(c);
}
return dst;
}
void MockEnv::FakeSleepForMicroseconds(int64_t micros) {
fake_sleep_micros_.fetch_add(micros);
}
#ifndef ROCKSDB_LITE
// This is to maintain the behavior before swithcing from InMemoryEnv to MockEnv
Env* NewMemEnv(Env* base_env) { return new MockEnv(base_env); }
#else // ROCKSDB_LITE
Env* NewMemEnv(Env* /*base_env*/) { return nullptr; }
#endif // !ROCKSDB_LITE
} // namespace rocksdb