mrambacher 4a09d632c4 Remove Legacy and Custom FileWrapper classes from header files (#7851)
Summary:
Removed the uses of the Legacy FileWrapper classes from the source code.  The wrappers were creating an additional layer of indirection/wrapping, as the Env already has a FileSystem.

Moved the Custom FileWrapper classes into the CustomEnv, as these classes are really for the private use the the CustomEnv class.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7851

Reviewed By: anand1976

Differential Revision: D26114816

Pulled By: mrambacher

fbshipit-source-id: db32840e58d969d3a0fa6c25aaf13d6dcdc74150
2021-01-28 22:10:32 -08:00

356 lines
10 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/utilities/sim_cache.h"
#include <atomic>
#include "file/writable_file_writer.h"
#include "monitoring/statistics.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "util/mutexlock.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
namespace {
class CacheActivityLogger {
public:
CacheActivityLogger()
: activity_logging_enabled_(false), max_logging_size_(0) {}
~CacheActivityLogger() {
MutexLock l(&mutex_);
StopLoggingInternal();
bg_status_.PermitUncheckedError();
}
Status StartLogging(const std::string& activity_log_file, Env* env,
uint64_t max_logging_size = 0) {
assert(activity_log_file != "");
assert(env != nullptr);
Status status;
FileOptions file_opts;
MutexLock l(&mutex_);
// Stop existing logging if any
StopLoggingInternal();
// Open log file
status = WritableFileWriter::Create(env->GetFileSystem(), activity_log_file,
file_opts, &file_writer_, nullptr);
if (!status.ok()) {
return status;
}
max_logging_size_ = max_logging_size;
activity_logging_enabled_.store(true);
return status;
}
void StopLogging() {
MutexLock l(&mutex_);
StopLoggingInternal();
}
void ReportLookup(const Slice& key) {
if (activity_logging_enabled_.load() == false) {
return;
}
std::string log_line = "LOOKUP - " + key.ToString(true) + "\n";
// line format: "LOOKUP - <KEY>"
MutexLock l(&mutex_);
Status s = file_writer_->Append(log_line);
if (!s.ok() && bg_status_.ok()) {
bg_status_ = s;
}
if (MaxLoggingSizeReached() || !bg_status_.ok()) {
// Stop logging if we have reached the max file size or
// encountered an error
StopLoggingInternal();
}
}
void ReportAdd(const Slice& key, size_t size) {
if (activity_logging_enabled_.load() == false) {
return;
}
std::string log_line = "ADD - ";
log_line += key.ToString(true);
log_line += " - ";
AppendNumberTo(&log_line, size);
log_line += "\n";
// line format: "ADD - <KEY> - <KEY-SIZE>"
MutexLock l(&mutex_);
Status s = file_writer_->Append(log_line);
if (!s.ok() && bg_status_.ok()) {
bg_status_ = s;
}
if (MaxLoggingSizeReached() || !bg_status_.ok()) {
// Stop logging if we have reached the max file size or
// encountered an error
StopLoggingInternal();
}
}
Status& bg_status() {
MutexLock l(&mutex_);
return bg_status_;
}
private:
bool MaxLoggingSizeReached() {
mutex_.AssertHeld();
return (max_logging_size_ > 0 &&
file_writer_->GetFileSize() >= max_logging_size_);
}
void StopLoggingInternal() {
mutex_.AssertHeld();
if (!activity_logging_enabled_) {
return;
}
activity_logging_enabled_.store(false);
Status s = file_writer_->Close();
if (!s.ok() && bg_status_.ok()) {
bg_status_ = s;
}
}
// Mutex to sync writes to file_writer, and all following
// class data members
port::Mutex mutex_;
// Indicates if logging is currently enabled
// atomic to allow reads without mutex
std::atomic<bool> activity_logging_enabled_;
// When reached, we will stop logging and close the file
// Value of 0 means unlimited
uint64_t max_logging_size_;
std::unique_ptr<WritableFileWriter> file_writer_;
Status bg_status_;
};
// SimCacheImpl definition
class SimCacheImpl : public SimCache {
public:
// capacity for real cache (ShardedLRUCache)
// test_capacity for key only cache
SimCacheImpl(std::shared_ptr<Cache> sim_cache, std::shared_ptr<Cache> cache)
: cache_(cache),
key_only_cache_(sim_cache),
miss_times_(0),
hit_times_(0),
stats_(nullptr) {}
~SimCacheImpl() override {}
void SetCapacity(size_t capacity) override { cache_->SetCapacity(capacity); }
void SetStrictCapacityLimit(bool strict_capacity_limit) override {
cache_->SetStrictCapacityLimit(strict_capacity_limit);
}
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), Handle** handle,
Priority priority) override {
// The handle and value passed in are for real cache, so we pass nullptr
// to key_only_cache_ for both instead. Also, the deleter function pointer
// will be called by user to perform some external operation which should
// be applied only once. Thus key_only_cache accepts an empty function.
// *Lambda function without capture can be assgined to a function pointer
Handle* h = key_only_cache_->Lookup(key);
if (h == nullptr) {
// TODO: Check for error here?
auto s = key_only_cache_->Insert(
key, nullptr, charge, [](const Slice& /*k*/, void* /*v*/) {}, nullptr,
priority);
s.PermitUncheckedError();
} else {
key_only_cache_->Release(h);
}
cache_activity_logger_.ReportAdd(key, charge);
if (!cache_) {
return Status::OK();
}
return cache_->Insert(key, value, charge, deleter, handle, priority);
}
Handle* Lookup(const Slice& key, Statistics* stats) override {
Handle* h = key_only_cache_->Lookup(key);
if (h != nullptr) {
key_only_cache_->Release(h);
inc_hit_counter();
RecordTick(stats, SIM_BLOCK_CACHE_HIT);
} else {
inc_miss_counter();
RecordTick(stats, SIM_BLOCK_CACHE_MISS);
}
cache_activity_logger_.ReportLookup(key);
if (!cache_) {
return nullptr;
}
return cache_->Lookup(key, stats);
}
bool Ref(Handle* handle) override { return cache_->Ref(handle); }
bool Release(Handle* handle, bool force_erase = false) override {
return cache_->Release(handle, force_erase);
}
void Erase(const Slice& key) override {
cache_->Erase(key);
key_only_cache_->Erase(key);
}
void* Value(Handle* handle) override { return cache_->Value(handle); }
uint64_t NewId() override { return cache_->NewId(); }
size_t GetCapacity() const override { return cache_->GetCapacity(); }
bool HasStrictCapacityLimit() const override {
return cache_->HasStrictCapacityLimit();
}
size_t GetUsage() const override { return cache_->GetUsage(); }
size_t GetUsage(Handle* handle) const override {
return cache_->GetUsage(handle);
}
size_t GetCharge(Handle* handle) const override {
return cache_->GetCharge(handle);
}
size_t GetPinnedUsage() const override { return cache_->GetPinnedUsage(); }
void DisownData() override {
cache_->DisownData();
key_only_cache_->DisownData();
}
void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) override {
// only apply to _cache since key_only_cache doesn't hold value
cache_->ApplyToAllCacheEntries(callback, thread_safe);
}
void EraseUnRefEntries() override {
cache_->EraseUnRefEntries();
key_only_cache_->EraseUnRefEntries();
}
size_t GetSimCapacity() const override {
return key_only_cache_->GetCapacity();
}
size_t GetSimUsage() const override { return key_only_cache_->GetUsage(); }
void SetSimCapacity(size_t capacity) override {
key_only_cache_->SetCapacity(capacity);
}
uint64_t get_miss_counter() const override {
return miss_times_.load(std::memory_order_relaxed);
}
uint64_t get_hit_counter() const override {
return hit_times_.load(std::memory_order_relaxed);
}
void reset_counter() override {
miss_times_.store(0, std::memory_order_relaxed);
hit_times_.store(0, std::memory_order_relaxed);
SetTickerCount(stats_, SIM_BLOCK_CACHE_HIT, 0);
SetTickerCount(stats_, SIM_BLOCK_CACHE_MISS, 0);
}
std::string ToString() const override {
std::string res;
res.append("SimCache MISSes: " + std::to_string(get_miss_counter()) + "\n");
res.append("SimCache HITs: " + std::to_string(get_hit_counter()) + "\n");
char buff[350];
auto lookups = get_miss_counter() + get_hit_counter();
snprintf(buff, sizeof(buff), "SimCache HITRATE: %.2f%%\n",
(lookups == 0 ? 0 : get_hit_counter() * 100.0f / lookups));
res.append(buff);
return res;
}
std::string GetPrintableOptions() const override {
std::string ret;
ret.reserve(20000);
ret.append(" cache_options:\n");
ret.append(cache_->GetPrintableOptions());
ret.append(" sim_cache_options:\n");
ret.append(key_only_cache_->GetPrintableOptions());
return ret;
}
Status StartActivityLogging(const std::string& activity_log_file, Env* env,
uint64_t max_logging_size = 0) override {
return cache_activity_logger_.StartLogging(activity_log_file, env,
max_logging_size);
}
void StopActivityLogging() override { cache_activity_logger_.StopLogging(); }
Status GetActivityLoggingStatus() override {
return cache_activity_logger_.bg_status();
}
private:
std::shared_ptr<Cache> cache_;
std::shared_ptr<Cache> key_only_cache_;
std::atomic<uint64_t> miss_times_;
std::atomic<uint64_t> hit_times_;
Statistics* stats_;
CacheActivityLogger cache_activity_logger_;
void inc_miss_counter() {
miss_times_.fetch_add(1, std::memory_order_relaxed);
}
void inc_hit_counter() { hit_times_.fetch_add(1, std::memory_order_relaxed); }
};
} // end anonymous namespace
// For instrumentation purpose, use NewSimCache instead
std::shared_ptr<SimCache> NewSimCache(std::shared_ptr<Cache> cache,
size_t sim_capacity, int num_shard_bits) {
LRUCacheOptions co;
co.capacity = sim_capacity;
co.num_shard_bits = num_shard_bits;
co.metadata_charge_policy = kDontChargeCacheMetadata;
return NewSimCache(NewLRUCache(co), cache, num_shard_bits);
}
std::shared_ptr<SimCache> NewSimCache(std::shared_ptr<Cache> sim_cache,
std::shared_ptr<Cache> cache,
int num_shard_bits) {
if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces
}
return std::make_shared<SimCacheImpl>(sim_cache, cache);
}
} // namespace ROCKSDB_NAMESPACE