12f1137355
Summary: Introduces and uses a SystemClock class to RocksDB. This class contains the time-related functions of an Env and these functions can be redirected from the Env to the SystemClock. Many of the places that used an Env (Timer, PerfStepTimer, RepeatableThread, RateLimiter, WriteController) for time-related functions have been changed to use SystemClock instead. There are likely more places that can be changed, but this is a start to show what can/should be done. Over time it would be nice to migrate most (if not all) of the uses of the time functions from the Env to the SystemClock. There are several Env classes that implement these functions. Most of these have not been converted yet to SystemClock implementations; that will come in a subsequent PR. It would be good to unify many of the Mock Timer implementations, so that they behave similarly and be tested similarly (some override Sleep, some use a MockSleep, etc). Additionally, this change will allow new methods to be introduced to the SystemClock (like https://github.com/facebook/rocksdb/issues/7101 WaitFor) in a consistent manner across a smaller number of classes. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7858 Reviewed By: pdillinger Differential Revision: D26006406 Pulled By: mrambacher fbshipit-source-id: ed10a8abbdab7ff2e23d69d85bd25b3e7e899e90
426 lines
12 KiB
C++
426 lines
12 KiB
C++
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include "utilities/persistent_cache/block_cache_tier.h"
|
|
|
|
#include <regex>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "logging/logging.h"
|
|
#include "port/port.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/stop_watch.h"
|
|
#include "utilities/persistent_cache/block_cache_tier_file.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
//
|
|
// BlockCacheImpl
|
|
//
|
|
Status BlockCacheTier::Open() {
|
|
Status status;
|
|
|
|
WriteLock _(&lock_);
|
|
|
|
assert(!size_);
|
|
|
|
// Check the validity of the options
|
|
status = opt_.ValidateSettings();
|
|
assert(status.ok());
|
|
if (!status.ok()) {
|
|
Error(opt_.log, "Invalid block cache options");
|
|
return status;
|
|
}
|
|
|
|
// Create base directory or cleanup existing directory
|
|
status = opt_.env->CreateDirIfMissing(opt_.path);
|
|
if (!status.ok()) {
|
|
Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
|
|
status.ToString().c_str());
|
|
return status;
|
|
}
|
|
|
|
// Create base/<cache dir> directory
|
|
status = opt_.env->CreateDir(GetCachePath());
|
|
if (!status.ok()) {
|
|
// directory already exists, clean it up
|
|
status = CleanupCacheFolder(GetCachePath());
|
|
assert(status.ok());
|
|
if (!status.ok()) {
|
|
Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
|
|
status.ToString().c_str());
|
|
return status;
|
|
}
|
|
}
|
|
|
|
// create a new file
|
|
assert(!cache_file_);
|
|
status = NewCacheFile();
|
|
if (!status.ok()) {
|
|
Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(),
|
|
status.ToString().c_str());
|
|
return status;
|
|
}
|
|
|
|
assert(cache_file_);
|
|
|
|
if (opt_.pipeline_writes) {
|
|
assert(!insert_th_.joinable());
|
|
insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this);
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
bool IsCacheFile(const std::string& file) {
|
|
// check if the file has .rc suffix
|
|
// Unfortunately regex support across compilers is not even, so we use simple
|
|
// string parsing
|
|
size_t pos = file.find(".");
|
|
if (pos == std::string::npos) {
|
|
return false;
|
|
}
|
|
|
|
std::string suffix = file.substr(pos);
|
|
return suffix == ".rc";
|
|
}
|
|
|
|
Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) {
|
|
std::vector<std::string> files;
|
|
Status status = opt_.env->GetChildren(folder, &files);
|
|
if (!status.ok()) {
|
|
Error(opt_.log, "Error getting files for %s. %s", folder.c_str(),
|
|
status.ToString().c_str());
|
|
return status;
|
|
}
|
|
|
|
// cleanup files with the patter :digi:.rc
|
|
for (auto file : files) {
|
|
if (IsCacheFile(file)) {
|
|
// cache file
|
|
Info(opt_.log, "Removing file %s.", file.c_str());
|
|
status = opt_.env->DeleteFile(folder + "/" + file);
|
|
if (!status.ok()) {
|
|
Error(opt_.log, "Error deleting file %s. %s", file.c_str(),
|
|
status.ToString().c_str());
|
|
return status;
|
|
}
|
|
} else {
|
|
ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str());
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status BlockCacheTier::Close() {
|
|
// stop the insert thread
|
|
if (opt_.pipeline_writes && insert_th_.joinable()) {
|
|
InsertOp op(/*quit=*/true);
|
|
insert_ops_.Push(std::move(op));
|
|
insert_th_.join();
|
|
}
|
|
|
|
// stop the writer before
|
|
writer_.Stop();
|
|
|
|
// clear all metadata
|
|
WriteLock _(&lock_);
|
|
metadata_.Clear();
|
|
return Status::OK();
|
|
}
|
|
|
|
template<class T>
|
|
void Add(std::map<std::string, double>* stats, const std::string& key,
|
|
const T& t) {
|
|
stats->insert({key, static_cast<double>(t)});
|
|
}
|
|
|
|
PersistentCache::StatsType BlockCacheTier::Stats() {
|
|
std::map<std::string, double> stats;
|
|
Add(&stats, "persistentcache.blockcachetier.bytes_piplined",
|
|
stats_.bytes_pipelined_.Average());
|
|
Add(&stats, "persistentcache.blockcachetier.bytes_written",
|
|
stats_.bytes_written_.Average());
|
|
Add(&stats, "persistentcache.blockcachetier.bytes_read",
|
|
stats_.bytes_read_.Average());
|
|
Add(&stats, "persistentcache.blockcachetier.insert_dropped",
|
|
stats_.insert_dropped_);
|
|
Add(&stats, "persistentcache.blockcachetier.cache_hits",
|
|
stats_.cache_hits_);
|
|
Add(&stats, "persistentcache.blockcachetier.cache_misses",
|
|
stats_.cache_misses_);
|
|
Add(&stats, "persistentcache.blockcachetier.cache_errors",
|
|
stats_.cache_errors_);
|
|
Add(&stats, "persistentcache.blockcachetier.cache_hits_pct",
|
|
stats_.CacheHitPct());
|
|
Add(&stats, "persistentcache.blockcachetier.cache_misses_pct",
|
|
stats_.CacheMissPct());
|
|
Add(&stats, "persistentcache.blockcachetier.read_hit_latency",
|
|
stats_.read_hit_latency_.Average());
|
|
Add(&stats, "persistentcache.blockcachetier.read_miss_latency",
|
|
stats_.read_miss_latency_.Average());
|
|
Add(&stats, "persistentcache.blockcachetier.write_latency",
|
|
stats_.write_latency_.Average());
|
|
|
|
auto out = PersistentCacheTier::Stats();
|
|
out.push_back(stats);
|
|
return out;
|
|
}
|
|
|
|
Status BlockCacheTier::Insert(const Slice& key, const char* data,
|
|
const size_t size) {
|
|
// update stats
|
|
stats_.bytes_pipelined_.Add(size);
|
|
|
|
if (opt_.pipeline_writes) {
|
|
// off load the write to the write thread
|
|
insert_ops_.Push(
|
|
InsertOp(key.ToString(), std::move(std::string(data, size))));
|
|
return Status::OK();
|
|
}
|
|
|
|
assert(!opt_.pipeline_writes);
|
|
return InsertImpl(key, Slice(data, size));
|
|
}
|
|
|
|
void BlockCacheTier::InsertMain() {
|
|
while (true) {
|
|
InsertOp op(insert_ops_.Pop());
|
|
|
|
if (op.signal_) {
|
|
// that is a secret signal to exit
|
|
break;
|
|
}
|
|
|
|
size_t retry = 0;
|
|
Status s;
|
|
while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) {
|
|
if (retry > kMaxRetry) {
|
|
break;
|
|
}
|
|
|
|
// this can happen when the buffers are full, we wait till some buffers
|
|
// are free. Why don't we wait inside the code. This is because we want
|
|
// to support both pipelined and non-pipelined mode
|
|
buffer_allocator_.WaitUntilUsable();
|
|
retry++;
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
stats_.insert_dropped_++;
|
|
}
|
|
}
|
|
}
|
|
|
|
Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {
|
|
// pre-condition
|
|
assert(key.size());
|
|
assert(data.size());
|
|
assert(cache_file_);
|
|
|
|
StopWatchNano timer(opt_.clock, /*auto_start=*/true);
|
|
|
|
WriteLock _(&lock_);
|
|
|
|
LBA lba;
|
|
if (metadata_.Lookup(key, &lba)) {
|
|
// the key already exists, this is duplicate insert
|
|
return Status::OK();
|
|
}
|
|
|
|
while (!cache_file_->Append(key, data, &lba)) {
|
|
if (!cache_file_->Eof()) {
|
|
ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d",
|
|
cache_file_->cacheid());
|
|
stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
|
|
return Status::TryAgain();
|
|
}
|
|
|
|
assert(cache_file_->Eof());
|
|
Status status = NewCacheFile();
|
|
if (!status.ok()) {
|
|
return status;
|
|
}
|
|
}
|
|
|
|
// Insert into lookup index
|
|
BlockInfo* info = metadata_.Insert(key, lba);
|
|
assert(info);
|
|
if (!info) {
|
|
return Status::IOError("Unexpected error inserting to index");
|
|
}
|
|
|
|
// insert to cache file reverse mapping
|
|
cache_file_->Add(info);
|
|
|
|
// update stats
|
|
stats_.bytes_written_.Add(data.size());
|
|
stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val,
|
|
size_t* size) {
|
|
StopWatchNano timer(opt_.clock, /*auto_start=*/true);
|
|
|
|
LBA lba;
|
|
bool status;
|
|
status = metadata_.Lookup(key, &lba);
|
|
if (!status) {
|
|
stats_.cache_misses_++;
|
|
stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
|
|
return Status::NotFound("blockcache: key not found");
|
|
}
|
|
|
|
BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_);
|
|
if (!file) {
|
|
// this can happen because the block index and cache file index are
|
|
// different, and the cache file might be removed between the two lookups
|
|
stats_.cache_misses_++;
|
|
stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
|
|
return Status::NotFound("blockcache: cache file not found");
|
|
}
|
|
|
|
assert(file->refs_);
|
|
|
|
std::unique_ptr<char[]> scratch(new char[lba.size_]);
|
|
Slice blk_key;
|
|
Slice blk_val;
|
|
|
|
status = file->Read(lba, &blk_key, &blk_val, scratch.get());
|
|
--file->refs_;
|
|
if (!status) {
|
|
stats_.cache_misses_++;
|
|
stats_.cache_errors_++;
|
|
stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
|
|
return Status::NotFound("blockcache: error reading data");
|
|
}
|
|
|
|
assert(blk_key == key);
|
|
|
|
val->reset(new char[blk_val.size()]);
|
|
memcpy(val->get(), blk_val.data(), blk_val.size());
|
|
*size = blk_val.size();
|
|
|
|
stats_.bytes_read_.Add(*size);
|
|
stats_.cache_hits_++;
|
|
stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000);
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
bool BlockCacheTier::Erase(const Slice& key) {
|
|
WriteLock _(&lock_);
|
|
BlockInfo* info = metadata_.Remove(key);
|
|
assert(info);
|
|
delete info;
|
|
return true;
|
|
}
|
|
|
|
Status BlockCacheTier::NewCacheFile() {
|
|
lock_.AssertHeld();
|
|
|
|
TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
|
|
(void*)(GetCachePath().c_str()));
|
|
|
|
std::unique_ptr<WriteableCacheFile> f(
|
|
new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,
|
|
GetCachePath(), writer_cache_id_,
|
|
opt_.cache_file_size, opt_.log));
|
|
|
|
bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
|
|
if (!status) {
|
|
return Status::IOError("Error creating file");
|
|
}
|
|
|
|
Info(opt_.log, "Created cache file %d", writer_cache_id_);
|
|
|
|
writer_cache_id_++;
|
|
cache_file_ = f.release();
|
|
|
|
// insert to cache files tree
|
|
status = metadata_.Insert(cache_file_);
|
|
assert(status);
|
|
if (!status) {
|
|
Error(opt_.log, "Error inserting to metadata");
|
|
return Status::IOError("Error inserting to metadata");
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
bool BlockCacheTier::Reserve(const size_t size) {
|
|
WriteLock _(&lock_);
|
|
assert(size_ <= opt_.cache_size);
|
|
|
|
if (size + size_ <= opt_.cache_size) {
|
|
// there is enough space to write
|
|
size_ += size;
|
|
return true;
|
|
}
|
|
|
|
assert(size + size_ >= opt_.cache_size);
|
|
// there is not enough space to fit the requested data
|
|
// we can clear some space by evicting cold data
|
|
|
|
const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);
|
|
while (size + size_ > opt_.cache_size * retain_fac) {
|
|
std::unique_ptr<BlockCacheFile> f(metadata_.Evict());
|
|
if (!f) {
|
|
// nothing is evictable
|
|
return false;
|
|
}
|
|
assert(!f->refs_);
|
|
uint64_t file_size;
|
|
if (!f->Delete(&file_size).ok()) {
|
|
// unable to delete file
|
|
return false;
|
|
}
|
|
|
|
assert(file_size <= size_);
|
|
size_ -= file_size;
|
|
}
|
|
|
|
size_ += size;
|
|
assert(size_ <= opt_.cache_size * 0.9);
|
|
return true;
|
|
}
|
|
|
|
Status NewPersistentCache(Env* const env, const std::string& path,
|
|
const uint64_t size,
|
|
const std::shared_ptr<Logger>& log,
|
|
const bool optimized_for_nvm,
|
|
std::shared_ptr<PersistentCache>* cache) {
|
|
if (!cache) {
|
|
return Status::IOError("invalid argument cache");
|
|
}
|
|
|
|
auto opt = PersistentCacheConfig(env, path, size, log);
|
|
if (optimized_for_nvm) {
|
|
// the default settings are optimized for SSD
|
|
// NVM devices are better accessed with 4K direct IO and written with
|
|
// parallelism
|
|
opt.enable_direct_writes = true;
|
|
opt.writer_qdepth = 4;
|
|
opt.writer_dispatch_size = 4 * 1024;
|
|
}
|
|
|
|
auto pcache = std::make_shared<BlockCacheTier>(opt);
|
|
Status s = pcache->Open();
|
|
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
*cache = pcache;
|
|
return s;
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
#endif // ifndef ROCKSDB_LITE
|