Persistent Read Cache (part 6) Block Cache Tier Implementation

Summary:
The patch is a continuation of part 5. It glues the abstraction for
file layout and metadata, and flush out the implementation of the API. It
adds unit tests for the implementation.

Test Plan: Run unit tests

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D57549
This commit is contained in:
krad 2016-05-03 11:05:42 -07:00
parent 64046e581c
commit c116b47804
13 changed files with 1085 additions and 204 deletions

1
src.mk
View File

@ -136,6 +136,7 @@ LIB_SOURCES = \
utilities/persistent_cache/volatile_tier_impl.cc \
utilities/persistent_cache/block_cache_tier_file.cc \
utilities/persistent_cache/block_cache_tier_metadata.cc \
utilities/persistent_cache/block_cache_tier.cc \
utilities/redis/redis_lists.cc \
utilities/simulator_cache/sim_cache.cc \
utilities/spatialdb/spatial_db.cc \

View File

@ -145,8 +145,7 @@ Status ReadUnaligned(int fd, Slice* data, const uint64_t offset,
Status DirectIORead(int fd, Slice* result, size_t off, size_t n,
char* scratch) {
if (IsSectorAligned(off) && IsSectorAligned(n) &&
IsPageAligned(result->data())) {
if (IsSectorAligned(off) && IsSectorAligned(n) && IsPageAligned(scratch)) {
return ReadAligned(fd, result, off, n, scratch);
}
return ReadUnaligned(fd, result, off, n, scratch);

View File

@ -0,0 +1,358 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "utilities/persistent_cache/block_cache_tier.h"
#include <regex>
#include <utility>
#include <vector>
#include "util/stop_watch.h"
#include "utilities/persistent_cache/block_cache_tier_file.h"
namespace rocksdb {
//
// BlockCacheImpl
//
Status BlockCacheTier::Open() {
Status status;
WriteLock _(&lock_);
assert(!size_);
// Check the validity of the options
status = opt_.ValidateSettings();
assert(status.ok());
if (!status.ok()) {
Error(opt_.log, "Invalid block cache options");
return status;
}
// Create base directory or cleanup existing directory
status = opt_.env->CreateDirIfMissing(opt_.path);
if (!status.ok()) {
Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
status.ToString().c_str());
return status;
}
// Create base/<cache dir> directory
status = opt_.env->CreateDir(GetCachePath());
if (!status.ok()) {
// directory already exisits, clean it up
status = CleanupCacheFolder(GetCachePath());
assert(status.ok());
if (!status.ok()) {
Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
status.ToString().c_str());
return status;
}
}
assert(!cache_file_);
NewCacheFile();
assert(cache_file_);
if (opt_.pipeline_writes_) {
assert(!insert_th_.joinable());
insert_th_ = std::thread(&BlockCacheTier::InsertMain, this);
}
return Status::OK();
}
Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) {
std::vector<std::string> files;
Status status = opt_.env->GetChildren(folder, &files);
if (!status.ok()) {
Error(opt_.log, "Error getting files for %s. %s", folder.c_str(),
status.ToString().c_str());
return status;
}
// cleanup files with the patter :digi:.rc
for (auto file : files) {
try {
const std::regex cache_file_regex("(0-9)+\\.rc$");
if (std::regex_match(file, cache_file_regex)) {
// cache file
Info(opt_.log, "Removing file %s.", file.c_str());
status = opt_.env->DeleteFile(folder + "/" + file);
if (!status.ok()) {
Error(opt_.log, "Error deleting file %s. %s", file.c_str(),
status.ToString().c_str());
return Status::IOError("Error deleting file " + file);
}
} else {
Info(opt_.log, "Skipping file %s.", file.c_str());
}
} catch (const std::regex_error& e) {
// Since std library is evolving, you can potentially get an exception for
// certain older compiler version. It is safer to exit cleanly.
return Status::IOError(e.what());
}
}
return Status::OK();
}
Status BlockCacheTier::Close() {
// stop the insert thread
if (opt_.pipeline_writes_ && insert_th_.joinable()) {
InsertOp op(/*quit=*/true);
insert_ops_.Push(std::move(op));
insert_th_.join();
}
// stop the writer before
writer_.Stop();
// clear all metadata
WriteLock _(&lock_);
metadata_.Clear();
return Status::OK();
}
std::string BlockCacheTier::PrintStats() {
std::ostringstream os;
os << "persistentcache.blockcachetier.bytes_piplined: "
<< stats_.bytes_pipelined_.ToString() << std::endl
<< "persistentcache.blockcachetier.bytes_written: "
<< stats_.bytes_written_.ToString() << std::endl
<< "persistentcache.blockcachetier.bytes_read: "
<< stats_.bytes_read_.ToString() << std::endl
<< "persistentcache.blockcachetier.insert_dropped"
<< stats_.insert_dropped_ << std::endl
<< "persistentcache.blockcachetier.cache_hits: " << stats_.cache_hits_
<< std::endl
<< "persistentcache.blockcachetier.cache_misses: " << stats_.cache_misses_
<< std::endl
<< "persistentcache.blockcachetier.cache_errors: " << stats_.cache_errors_
<< std::endl
<< "persistentcache.blockcachetier.cache_hits_pct: "
<< stats_.CacheHitPct() << std::endl
<< "persistentcache.blockcachetier.cache_misses_pct: "
<< stats_.CacheMissPct() << std::endl
<< "persistentcache.blockcachetier.read_hit_latency: "
<< stats_.read_hit_latency_.ToString() << std::endl
<< "persistentcache.blockcachetier.read_miss_latency: "
<< stats_.read_miss_latency_.ToString() << std::endl
<< "persistenetcache.blockcachetier.write_latency: "
<< stats_.write_latency_.ToString() << std::endl
<< PersistentCacheTier::PrintStats();
return os.str();
}
Status BlockCacheTier::Insert(const Slice& key, const char* data,
const size_t size) {
// update stats
stats_.bytes_pipelined_.Add(size);
if (opt_.pipeline_writes_) {
// off load the write to the write thread
insert_ops_.Push(
InsertOp(key.ToString(), std::move(std::string(data, size))));
return Status::OK();
}
assert(!opt_.pipeline_writes_);
return InsertImpl(key, Slice(data, size));
}
void BlockCacheTier::InsertMain() {
while (true) {
InsertOp op(insert_ops_.Pop());
if (op.signal_) {
// that is a secret signal to exit
break;
}
size_t retry = 0;
Status s;
while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) {
if (retry > kMaxRetry) {
break;
}
// this can happen when the buffers are full, we wait till some buffers
// are free. Why don't we wait inside the code. This is because we want
// to support both pipelined and non-pipelined mode
buffer_allocator_.WaitUntilUsable();
retry++;
}
if (!s.ok()) {
stats_.insert_dropped_++;
}
}
}
Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {
// pre-condition
assert(key.size());
assert(data.size());
assert(cache_file_);
StopWatchNano timer(opt_.env);
WriteLock _(&lock_);
LBA lba;
if (metadata_.Lookup(key, &lba)) {
// the key already exisits, this is duplicate insert
return Status::OK();
}
while (!cache_file_->Append(key, data, &lba)) {
if (!cache_file_->Eof()) {
Debug(opt_.log, "Error inserting to cache file %d",
cache_file_->cacheid());
stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
return Status::TryAgain();
}
assert(cache_file_->Eof());
NewCacheFile();
}
// Insert into lookup index
BlockInfo* info = metadata_.Insert(key, lba);
assert(info);
if (!info) {
return Status::IOError("Unexpected error inserting to index");
}
// insert to cache file reverse mapping
cache_file_->Add(info);
// update stats
stats_.bytes_written_.Add(data.size());
stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
return Status::OK();
}
Status BlockCacheTier::Lookup(const Slice& key, unique_ptr<char[]>* val,
size_t* size) {
StopWatchNano timer(opt_.env);
LBA lba;
bool status;
status = metadata_.Lookup(key, &lba);
if (!status) {
stats_.cache_misses_++;
stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
return Status::NotFound("blockcache: key not found");
}
BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_);
if (!file) {
// this can happen because the block index and cache file index are
// different, and the cache file might be removed between the two lookups
stats_.cache_misses_++;
stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
return Status::NotFound("blockcache: cache file not found");
}
assert(file->refs_);
unique_ptr<char[]> scratch(new char[lba.size_]);
Slice blk_key;
Slice blk_val;
status = file->Read(lba, &blk_key, &blk_val, scratch.get());
--file->refs_;
assert(status);
if (!status) {
stats_.cache_misses_++;
stats_.cache_errors_++;
stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
return Status::NotFound("blockcache: error reading data");
}
assert(blk_key == key);
val->reset(new char[blk_val.size()]);
memcpy(val->get(), blk_val.data(), blk_val.size());
*size = blk_val.size();
stats_.bytes_read_.Add(*size);
stats_.cache_hits_++;
stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000);
return Status::OK();
}
bool BlockCacheTier::Erase(const Slice& key) {
WriteLock _(&lock_);
BlockInfo* info = metadata_.Remove(key);
assert(info);
delete info;
return true;
}
void BlockCacheTier::NewCacheFile() {
lock_.AssertHeld();
Info(opt_.log, "Creating cache file %d", writer_cache_id_);
writer_cache_id_++;
cache_file_ = new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,
GetCachePath(), writer_cache_id_,
opt_.cache_file_size, opt_.log);
bool status;
status =
cache_file_->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
assert(status);
// insert to cache files tree
status = metadata_.Insert(cache_file_);
(void)status;
assert(status);
}
bool BlockCacheTier::Reserve(const size_t size) {
WriteLock _(&lock_);
assert(size_ <= opt_.cache_size);
if (size + size_ <= opt_.cache_size) {
// there is enough space to write
size_ += size;
return true;
}
assert(size + size_ >= opt_.cache_size);
// there is not enough space to fit the requested data
// we can clear some space by evicting cold data
const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);
while (size + size_ > opt_.cache_size * retain_fac) {
unique_ptr<BlockCacheFile> f(metadata_.Evict());
if (!f) {
// nothing is evictable
return false;
}
assert(!f->refs_);
size_t file_size;
if (!f->Delete(&file_size).ok()) {
// unable to delete file
return false;
}
assert(file_size <= size_);
size_ -= file_size;
}
size_ += size;
assert(size_ <= opt_.cache_size * 0.9);
return true;
}
} // namespace rocksdb
#endif // ifndef ROCKSDB_LITE

View File

@ -0,0 +1,145 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <unistd.h>
#include <list>
#include <memory>
#include <set>
#include <sstream>
#include <stdexcept>
#include <string>
#include <thread>
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/persistent_cache.h"
#include "utilities/persistent_cache/block_cache_tier_file.h"
#include "utilities/persistent_cache/block_cache_tier_metadata.h"
#include "utilities/persistent_cache/persistent_cache_util.h"
#include "db/skiplist.h"
#include "port/port_posix.h"
#include "util/arena.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/histogram.h"
#include "util/mutexlock.h"
namespace rocksdb {
//
// Block cache tier implementation
//
class BlockCacheTier : public PersistentCacheTier {
public:
explicit BlockCacheTier(const PersistentCacheConfig& opt)
: opt_(opt),
insert_ops_(opt_.max_write_pipeline_backlog_size),
buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()),
writer_(this, opt_.writer_qdepth, opt_.writer_dispatch_size) {
Info(opt_.log, "Initializing allocator. size=%d B count=%d",
opt_.write_buffer_size, opt_.write_buffer_count());
}
virtual ~BlockCacheTier() {
// By contract, the user should have called stop before destroying the
// object
assert(!insert_th_.joinable());
}
Status Insert(const Slice& key, const char* data, const size_t size) override;
Status Lookup(const Slice& key, std::unique_ptr<char[]>* data,
size_t* size) override;
Status Open() override;
Status Close() override;
bool Erase(const Slice& key) override;
bool Reserve(const size_t size) override;
bool IsCompressed() override { return opt_.is_compressed; }
std::string PrintStats() override;
void TEST_Flush() override {
while (insert_ops_.Size()) {
/* sleep override */ sleep(1);
}
}
private:
// Percentage of cache to be evicted when the cache is full
static const size_t kEvictPct = 10;
// Max attempts to insert key, value to cache in pipelined mode
static const size_t kMaxRetry = 3;
// Pipelined operation
struct InsertOp {
explicit InsertOp(const bool signal) : signal_(signal) {}
explicit InsertOp(std::string&& key, const std::string& data)
: key_(std::move(key)), data_(data) {}
~InsertOp() {}
InsertOp() = delete;
InsertOp(InsertOp&& rhs) = default;
InsertOp& operator=(InsertOp&& rhs) = default;
// used for estimating size by bounded queue
size_t Size() { return data_.size() + key_.size(); }
std::string key_;
std::string data_;
const bool signal_ = false; // signal to request processing thread to exit
};
// entry point for insert thread
void InsertMain();
// insert implementation
Status InsertImpl(const Slice& key, const Slice& data);
// Create a new cache file
void NewCacheFile();
// Get cache directory path
std::string GetCachePath() const { return opt_.path + "/cache"; }
// Cleanup folder
Status CleanupCacheFolder(const std::string& folder);
// Statistics
struct Stats {
HistogramImpl bytes_pipelined_;
HistogramImpl bytes_written_;
HistogramImpl bytes_read_;
HistogramImpl read_hit_latency_;
HistogramImpl read_miss_latency_;
HistogramImpl write_latency_;
uint64_t cache_hits_ = 0;
uint64_t cache_misses_ = 0;
uint64_t cache_errors_ = 0;
uint64_t insert_dropped_ = 0;
double CacheHitPct() const {
const auto lookups = cache_hits_ + cache_misses_;
return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0;
}
double CacheMissPct() const {
const auto lookups = cache_hits_ + cache_misses_;
return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0;
}
};
port::RWMutex lock_; // Synchronization
const PersistentCacheConfig opt_; // BlockCache options
BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert
std::thread insert_th_; // Insert thread
uint32_t writer_cache_id_ = 0; // Current cache file identifier
WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference
CacheWriteBufferAllocator buffer_allocator_; // Buffer provider
ThreadedWriter writer_; // Writer threads
BlockCacheTierMetadata metadata_; // Cache meta data manager
std::atomic<uint64_t> size_{0}; // Size of the cache
Stats stats_; // Statistics
};
} // namespace rocksdb

View File

@ -189,12 +189,12 @@ bool CacheRecord::Deserialize(const Slice& data) {
// RandomAccessFile
//
bool RandomAccessCacheFile::Open() {
bool RandomAccessCacheFile::Open(const bool enable_direct_reads) {
WriteLock _(&rwlock_);
return OpenImpl();
return OpenImpl(enable_direct_reads);
}
bool RandomAccessCacheFile::OpenImpl() {
bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
rwlock_.AssertHeld();
Debug(log_, "Opening cache file %s", Path().c_str());
@ -265,9 +265,12 @@ WriteableCacheFile::~WriteableCacheFile() {
ClearBuffers();
}
bool WriteableCacheFile::Create() {
bool WriteableCacheFile::Create(const bool enable_direct_writes,
const bool enable_direct_reads) {
WriteLock _(&rwlock_);
enable_direct_reads_ = enable_direct_reads;
Debug(log_, "Creating new cache %s (max size is %d B)", Path().c_str(),
max_size_);
@ -388,7 +391,7 @@ void WriteableCacheFile::DispatchBuffer() {
// pad it with zero for direct IO
buf->FillTrailingZeros();
assert(buf->Used() % FILE_ALIGNMENT_SIZE == 0);
assert(buf->Used() % kFileAlignmentSize == 0);
writer_->Write(file_.get(), buf, file_off,
std::bind(&WriteableCacheFile::BufferWriteDone, this));
@ -417,7 +420,7 @@ void WriteableCacheFile::CloseAndOpenForReading() {
// Our env abstraction do not allow reading from a file opened for appending
// We need close the file and re-open it for reading
Close();
RandomAccessCacheFile::OpenImpl();
RandomAccessCacheFile::OpenImpl(enable_direct_reads_);
}
bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block,
@ -523,7 +526,9 @@ void ThreadedWriter::Stop() {
// wait for all threads to exit
for (auto& th : threads_) {
th.join();
assert(!th.joinable());
}
threads_.clear();
}
void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf,

View File

@ -114,7 +114,9 @@ class BlockCacheFile : public LRUElement<BlockCacheFile> {
}
// get file path
std::string Path() const { return dir_ + "/" + std::to_string(cache_id_); }
std::string Path() const {
return dir_ + "/" + std::to_string(cache_id_) + ".rc";
}
// get cache ID
uint32_t cacheid() const { return cache_id_; }
// Add block information to file data
@ -150,7 +152,7 @@ class RandomAccessCacheFile : public BlockCacheFile {
virtual ~RandomAccessCacheFile() {}
// open file for reading
bool Open();
bool Open(const bool enable_direct_reads);
// read data from the disk
bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override;
@ -158,7 +160,7 @@ class RandomAccessCacheFile : public BlockCacheFile {
std::unique_ptr<RandomAccessFile> file_;
protected:
bool OpenImpl();
bool OpenImpl(const bool enable_direct_reads);
bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch);
std::shared_ptr<Logger> log_; // log file
@ -183,7 +185,7 @@ class WriteableCacheFile : public RandomAccessCacheFile {
virtual ~WriteableCacheFile();
// create file on disk
bool Create();
bool Create(const bool enable_direct_writes, const bool enable_direct_reads);
// read data from logical file
bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override {
@ -205,7 +207,7 @@ class WriteableCacheFile : public RandomAccessCacheFile {
private:
friend class ThreadedWriter;
static const size_t FILE_ALIGNMENT_SIZE = 4 * 1024; // align file size
static const size_t kFileAlignmentSize = 4 * 1024; // align file size
bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch);
bool ReadBuffer(const LBA& lba, char* data);
@ -240,6 +242,8 @@ class WriteableCacheFile : public RandomAccessCacheFile {
size_t buf_woff_ = 0; // off into bufs_ to write
size_t buf_doff_ = 0; // off into bufs_ to dispatch
size_t pending_ios_ = 0; // Number of ios to disk in-progress
bool enable_direct_reads_ = false; // Should we enable direct reads
// when reading from disk
};
//
@ -267,7 +271,7 @@ class ThreadedWriter : public Writer {
explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth,
const size_t io_size);
virtual ~ThreadedWriter() {}
virtual ~ThreadedWriter() { assert(threads_.empty()); }
void Stop() override;
void Write(WritableFile* const file, CacheWriteBuffer* buf,

View File

@ -61,9 +61,9 @@ class CacheWriteBuffer {
//
class CacheWriteBufferAllocator {
public:
explicit CacheWriteBufferAllocator(const uint32_t buffer_size,
const uint32_t buffer_count)
: buffer_size_(buffer_size) {
explicit CacheWriteBufferAllocator(const size_t buffer_size,
const size_t buffer_count)
: cond_empty_(&lock_), buffer_size_(buffer_size) {
MutexLock _(&lock_);
buffer_size_ = buffer_size;
for (uint32_t i = 0; i < buffer_count; i++) {
@ -71,6 +71,7 @@ class CacheWriteBufferAllocator {
assert(buf);
if (buf) {
bufs_.push_back(buf);
cond_empty_.Signal();
}
}
}
@ -93,7 +94,6 @@ class CacheWriteBufferAllocator {
assert(!bufs_.empty());
CacheWriteBuffer* const buf = bufs_.front();
bufs_.pop_front();
return buf;
}
@ -102,6 +102,15 @@ class CacheWriteBufferAllocator {
MutexLock _(&lock_);
buf->Reset();
bufs_.push_back(buf);
cond_empty_.Signal();
}
void WaitUntilUsable() {
// We are asked to wait till we have buffers available
MutexLock _(&lock_);
while (bufs_.empty()) {
cond_empty_.Wait();
}
}
size_t Capacity() const { return bufs_.size() * buffer_size_; }
@ -110,6 +119,7 @@ class CacheWriteBufferAllocator {
private:
port::Mutex lock_; // Sync lock
port::CondVar cond_empty_; // Condition var for empty buffers
size_t buffer_size_; // Size of each buffer
std::list<CacheWriteBuffer*> bufs_; // Buffer stash
};

View File

@ -36,8 +36,12 @@ void BlockCacheTierMetadata::Clear() {
block_index_.Clear([](BlockInfo* arg){ delete arg; });
}
bool BlockCacheTierMetadata::Insert(BlockInfo* binfo) {
return block_index_.Insert(binfo);
BlockInfo* BlockCacheTierMetadata::Insert(const Slice& key, const LBA& lba) {
std::unique_ptr<BlockInfo> binfo(new BlockInfo(key, lba));
if (!block_index_.Insert(binfo.get())) {
return nullptr;
}
return binfo.release();
}
bool BlockCacheTierMetadata::Lookup(const Slice& key, LBA* lba) {
@ -59,10 +63,8 @@ bool BlockCacheTierMetadata::Lookup(const Slice& key, LBA* lba) {
BlockInfo* BlockCacheTierMetadata::Remove(const Slice& key) {
BlockInfo lookup_key(key);
BlockInfo* binfo = nullptr;
bool status __attribute__((__unused__)) =
block_index_.Erase(&lookup_key, &binfo);
(void)status;
assert(status);
bool ok __attribute__((__unused__)) = block_index_.Erase(&lookup_key, &binfo);
assert(ok);
return binfo;
}

View File

@ -60,7 +60,8 @@ class BlockCacheTierMetadata {
BlockCacheFile* Lookup(const uint32_t cache_id);
// Insert block information to block index
bool Insert(BlockInfo* binfo);
BlockInfo* Insert(const Slice& key, const LBA& lba);
// bool Insert(BlockInfo* binfo);
// Lookup block information from block index
bool Lookup(const Slice& key, LBA* lba);

View File

@ -14,36 +14,356 @@
#include <memory>
#include <thread>
#include "utilities/persistent_cache/block_cache_tier.h"
namespace rocksdb {
#if !(defined(__clang__) && defined(OS_LINUX))
static const double kStressFactor = .125;
static void OnOpenForRead(void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewRandomAccessFile:O_DIRECT",
std::bind(OnOpenForRead, std::placeholders::_1));
}
static void OnOpenForWrite(void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"NewWritableFile:O_DIRECT",
std::bind(OnOpenForWrite, std::placeholders::_1));
}
//
// Simple logger that prints message on stdout
//
class ConsoleLogger : public Logger {
public:
using Logger::Logv;
ConsoleLogger() : Logger(InfoLogLevel::ERROR_LEVEL) {}
void Logv(const char* format, va_list ap) override {
MutexLock _(&lock_);
vprintf(format, ap);
printf("\n");
}
port::Mutex lock_;
};
// construct a tiered RAM+Block cache
std::unique_ptr<PersistentTieredCache> NewTieredCache(
const size_t mem_size, const PersistentCacheConfig& opt) {
std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache());
// create primary tier
assert(mem_size);
auto pcache = std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(
/*is_compressed*/ true, mem_size));
tcache->AddTier(pcache);
// create secondary tier
auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt));
tcache->AddTier(scache);
Status s = tcache->Open();
assert(s.ok());
return tcache;
}
// create block cache
std::unique_ptr<PersistentCacheTier> NewBlockCache(
Env* env, const std::string& path,
const uint64_t max_size = std::numeric_limits<uint64_t>::max(),
const bool enable_direct_writes = false) {
const uint32_t max_file_size = 12 * 1024 * 1024 * kStressFactor;
auto log = std::make_shared<ConsoleLogger>();
PersistentCacheConfig opt(env, path, max_size, log);
opt.cache_file_size = max_file_size;
opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
opt.enable_direct_writes = enable_direct_writes;
std::unique_ptr<PersistentCacheTier> scache(new BlockCacheTier(opt));
Status s = scache->Open();
assert(s.ok());
return scache;
}
// create a new cache tier
std::unique_ptr<PersistentTieredCache> NewTieredCache(
Env* env, const std::string& path, const uint64_t max_volatile_cache_size,
const uint64_t max_block_cache_size =
std::numeric_limits<uint64_t>::max()) {
const uint32_t max_file_size = 12 * 1024 * 1024 * kStressFactor;
auto log = std::make_shared<ConsoleLogger>();
auto opt = PersistentCacheConfig(env, path, max_block_cache_size, log);
opt.cache_file_size = max_file_size;
opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
// create tier out of the two caches
auto cache = NewTieredCache(max_volatile_cache_size, opt);
return cache;
}
PersistentCacheTierTest::PersistentCacheTierTest()
: path_(test::TmpDir(Env::Default()) + "/cache_test") {
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::SyncPoint::GetInstance()->SetCallBack("NewRandomAccessFile:O_DIRECT",
OnOpenForRead);
rocksdb::SyncPoint::GetInstance()->SetCallBack("NewWritableFile:O_DIRECT",
OnOpenForWrite);
}
// Volatile cache tests
TEST_F(PersistentCacheTierTest, VolatileCacheInsert) {
for (auto nthreads : {1, 5}) {
for (auto max_keys : {10 * 1024, 1 * 1024 * 1024}) {
for (auto max_keys :
{10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) {
cache_ = std::make_shared<VolatileCacheTier>();
RunInsertTest(nthreads, max_keys);
}
}
}
#endif // !(defined(__clang__) && defined(OS_LINUX))
TEST_F(PersistentCacheTierTest, VolatileCacheInsertWithEviction) {
for (auto nthreads : {1, 5}) {
for (auto max_keys : {1 * 1024 * 1024}) {
cache_ = std::make_shared<VolatileCacheTier>(/*compressed=*/true,
/*size=*/1 * 1024 * 1024);
for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) {
cache_ = std::make_shared<VolatileCacheTier>(
/*compressed=*/true, /*size=*/1 * 1024 * 1024 * kStressFactor);
RunInsertTestWithEviction(nthreads, max_keys);
}
}
}
#if !(defined(__clang__) && defined(OS_LINUX))
// Block cache tests
TEST_F(PersistentCacheTierTest, BlockCacheInsert) {
for (auto direct_writes : {true, false}) {
for (auto nthreads : {1, 5}) {
for (auto max_keys :
{10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) {
cache_ = NewBlockCache(Env::Default(), path_,
/*size=*/std::numeric_limits<uint64_t>::max(),
direct_writes);
RunInsertTest(nthreads, max_keys);
}
}
}
}
TEST_F(PersistentCacheTierTest, BlockCacheInsertWithEviction) {
for (auto nthreads : {1, 5}) {
for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) {
cache_ = NewBlockCache(Env::Default(), path_,
/*max_size=*/200 * 1024 * 1024 * kStressFactor);
RunInsertTestWithEviction(nthreads, max_keys);
}
}
}
// Tiered cache tests
TEST_F(PersistentCacheTierTest, TieredCacheInsert) {
for (auto nthreads : {1, 5}) {
for (auto max_keys :
{10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) {
cache_ = NewTieredCache(Env::Default(), path_,
/*memory_size=*/1 * 1024 * 1024 * kStressFactor);
RunInsertTest(nthreads, max_keys);
}
}
}
TEST_F(PersistentCacheTierTest, TieredCacheInsertWithEviction) {
for (auto nthreads : {1, 5}) {
for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) {
cache_ = NewTieredCache(
Env::Default(), path_,
/*memory_size=*/1 * 1024 * 1024 * kStressFactor,
/*block_cache_size*/ 200 * 1024 * 1024 * kStressFactor);
RunInsertTestWithEviction(nthreads, max_keys);
}
}
}
std::shared_ptr<PersistentCacheTier> MakeVolatileCache(
const std::string& /*dbname*/) {
return std::make_shared<VolatileCacheTier>();
}
std::shared_ptr<PersistentCacheTier> MakeBlockCache(const std::string& dbname) {
return NewBlockCache(Env::Default(), dbname);
}
std::shared_ptr<PersistentCacheTier> MakeTieredCache(
const std::string& dbname) {
const auto memory_size = 1 * 1024 * 1024 * kStressFactor;
return NewTieredCache(Env::Default(), dbname, memory_size);
}
static void UniqueIdCallback(void* arg) {
int* result = reinterpret_cast<int*>(arg);
if (*result == -1) {
*result = 0;
}
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
}
PersistentCacheDBTest::PersistentCacheDBTest() : DBTestBase("/cache_test") {
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
rocksdb::SyncPoint::GetInstance()->SetCallBack("NewRandomAccessFile:O_DIRECT",
OnOpenForRead);
}
// test template
void PersistentCacheDBTest::RunTest(
const std::function<std::shared_ptr<PersistentCacheTier>(bool)>&
new_pcache) {
if (!Snappy_Supported()) {
return;
}
// number of insertion interations
int num_iter = 100 * 1024 * kStressFactor;
for (int iter = 0; iter < 5; iter++) {
Options options;
options.write_buffer_size =
64 * 1024 * kStressFactor; // small write buffer
options.statistics = rocksdb::CreateDBStatistics();
options = CurrentOptions(options);
// setup page cache
std::shared_ptr<PersistentCacheTier> pcache;
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
const uint64_t uint64_max = std::numeric_limits<uint64_t>::max();
switch (iter) {
case 0:
// page cache, block cache, no-compressed cache
pcache = new_pcache(/*is_compressed=*/true);
table_options.persistent_cache = pcache;
table_options.block_cache = NewLRUCache(uint64_max);
table_options.block_cache_compressed = nullptr;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
break;
case 1:
// page cache, block cache, compressed cache
pcache = new_pcache(/*is_compressed=*/true);
table_options.persistent_cache = pcache;
table_options.block_cache = NewLRUCache(uint64_max);
table_options.block_cache_compressed = NewLRUCache(uint64_max);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
break;
case 2:
// page cache, block cache, compressed cache + KNoCompression
// both block cache and compressed cache, but DB is not compressed
// also, make block cache sizes bigger, to trigger block cache hits
pcache = new_pcache(/*is_compressed=*/true);
table_options.persistent_cache = pcache;
table_options.block_cache = NewLRUCache(uint64_max);
table_options.block_cache_compressed = NewLRUCache(uint64_max);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.compression = kNoCompression;
break;
case 3:
// page cache, no block cache, no compressed cache
pcache = new_pcache(/*is_compressed=*/false);
table_options.persistent_cache = pcache;
table_options.block_cache = nullptr;
table_options.block_cache_compressed = nullptr;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
break;
case 4:
// page cache, no block cache, no compressed cache
// Page cache caches compressed blocks
pcache = new_pcache(/*is_compressed=*/true);
table_options.persistent_cache = pcache;
table_options.block_cache = nullptr;
table_options.block_cache_compressed = nullptr;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
break;
default:
ASSERT_TRUE(false);
}
std::vector<std::string> values;
// insert data
Insert(options, table_options, num_iter, &values);
// flush all data in cache to device
pcache->TEST_Flush();
// verify data
Verify(num_iter, values);
auto block_miss = TestGetTickerCount(options, BLOCK_CACHE_MISS);
auto compressed_block_hit =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT);
auto compressed_block_miss =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS);
auto page_hit = TestGetTickerCount(options, PERSISTENT_CACHE_HIT);
auto page_miss = TestGetTickerCount(options, PERSISTENT_CACHE_MISS);
// check that we triggered the appropriate code paths in the cache
switch (iter) {
case 0:
// page cache, block cache, no-compressed cache
ASSERT_GT(page_miss, 0);
ASSERT_GT(page_hit, 0);
ASSERT_GT(block_miss, 0);
ASSERT_EQ(compressed_block_miss, 0);
ASSERT_EQ(compressed_block_hit, 0);
break;
case 1:
// page cache, block cache, compressed cache
ASSERT_GT(page_miss, 0);
ASSERT_GT(block_miss, 0);
ASSERT_GT(compressed_block_miss, 0);
break;
case 2:
// page cache, block cache, compressed cache + KNoCompression
ASSERT_GT(page_miss, 0);
ASSERT_GT(page_hit, 0);
ASSERT_GT(block_miss, 0);
ASSERT_GT(compressed_block_miss, 0);
// remember kNoCompression
ASSERT_EQ(compressed_block_hit, 0);
break;
case 3:
case 4:
// page cache, no block cache, no compressed cache
ASSERT_GT(page_miss, 0);
ASSERT_GT(page_hit, 0);
ASSERT_EQ(compressed_block_hit, 0);
ASSERT_EQ(compressed_block_miss, 0);
break;
default:
ASSERT_TRUE(false);
}
options.create_if_missing = true;
DestroyAndReopen(options);
pcache->Close();
}
}
// test table with volatile page cache
TEST_F(PersistentCacheDBTest, VolatileCacheTest) {
RunTest(std::bind(&PersistentCacheDBTest::MakeVolatileCache, this));
RunTest(std::bind(&MakeVolatileCache, dbname_));
}
// test table with block page cache
TEST_F(PersistentCacheDBTest, BlockCacheTest) {
RunTest(std::bind(&MakeBlockCache, dbname_));
}
// test table with tiered page cache
TEST_F(PersistentCacheDBTest, TieredCacheTest) {
RunTest(std::bind(&MakeTieredCache, dbname_));
}
#endif // !(defined(__clang__) && defined(OS_LINUX))
} // namespace rocksdb

View File

@ -32,9 +32,7 @@ namespace rocksdb {
//
class PersistentCacheTierTest : public testing::Test {
public:
explicit PersistentCacheTierTest()
: path_(test::TmpDir(Env::Default()) + "/cache_test") {}
PersistentCacheTierTest();
virtual ~PersistentCacheTierTest() {
if (cache_) {
Status s = cache_->Close();
@ -46,7 +44,7 @@ class PersistentCacheTierTest : public testing::Test {
// Flush cache
void Flush() {
if (cache_) {
cache_->Flush();
cache_->TEST_Flush();
}
}
@ -208,27 +206,7 @@ class PersistentCacheTierTest : public testing::Test {
//
class PersistentCacheDBTest : public DBTestBase {
public:
PersistentCacheDBTest() : DBTestBase("/cache_test") {
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"GetUniqueIdFromFile:FS_IOC_GETVERSION",
PersistentCacheDBTest::UniqueIdCallback);
}
static void UniqueIdCallback(void* arg) {
int* result = reinterpret_cast<int*>(arg);
if (*result == -1) {
*result = 0;
}
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
}
std::shared_ptr<PersistentCacheTier> MakeVolatileCache() {
return std::make_shared<VolatileCacheTier>();
}
PersistentCacheDBTest();
static uint64_t TestGetTickerCount(const Options& options,
Tickers ticker_type) {
@ -281,135 +259,7 @@ class PersistentCacheDBTest : public DBTestBase {
// test template
void RunTest(const std::function<std::shared_ptr<PersistentCacheTier>(bool)>&
new_pcache) {
if (!Snappy_Supported()) {
return;
}
// number of insertion interations
int num_iter = 100 * 1024;
for (int iter = 0; iter < 5; iter++) {
Options options;
options.write_buffer_size = 64 * 1024; // small write buffer
options.statistics = rocksdb::CreateDBStatistics();
options = CurrentOptions(options);
// setup page cache
std::shared_ptr<PersistentCacheTier> pcache;
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
const uint64_t uint64_max = std::numeric_limits<uint64_t>::max();
switch (iter) {
case 0:
// page cache, block cache, no-compressed cache
pcache = new_pcache(/*is_compressed=*/true);
table_options.persistent_cache = pcache;
table_options.block_cache = NewLRUCache(uint64_max);
table_options.block_cache_compressed = nullptr;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
break;
case 1:
// page cache, block cache, compressed cache
pcache = new_pcache(/*is_compressed=*/true);
table_options.persistent_cache = pcache;
table_options.block_cache = NewLRUCache(uint64_max);
table_options.block_cache_compressed = NewLRUCache(uint64_max);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
break;
case 2:
// page cache, block cache, compressed cache + KNoCompression
// both block cache and compressed cache, but DB is not compressed
// also, make block cache sizes bigger, to trigger block cache hits
pcache = new_pcache(/*is_compressed=*/true);
table_options.persistent_cache = pcache;
table_options.block_cache = NewLRUCache(uint64_max);
table_options.block_cache_compressed = NewLRUCache(uint64_max);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.compression = kNoCompression;
break;
case 3:
// page cache, no block cache, no compressed cache
pcache = new_pcache(/*is_compressed=*/false);
table_options.persistent_cache = pcache;
table_options.block_cache = nullptr;
table_options.block_cache_compressed = nullptr;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
break;
case 4:
// page cache, no block cache, no compressed cache
// Page cache caches compressed blocks
pcache = new_pcache(/*is_compressed=*/true);
table_options.persistent_cache = pcache;
table_options.block_cache = nullptr;
table_options.block_cache_compressed = nullptr;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
break;
default:
ASSERT_TRUE(false);
}
std::vector<std::string> values;
// insert data
Insert(options, table_options, num_iter, &values);
// flush all data in cache to device
pcache->Flush();
// verify data
Verify(num_iter, values);
auto block_miss = TestGetTickerCount(options, BLOCK_CACHE_MISS);
auto compressed_block_hit =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT);
auto compressed_block_miss =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS);
auto page_hit = TestGetTickerCount(options, PERSISTENT_CACHE_HIT);
auto page_miss = TestGetTickerCount(options, PERSISTENT_CACHE_MISS);
// check that we triggered the appropriate code paths in the cache
switch (iter) {
case 0:
// page cache, block cache, no-compressed cache
ASSERT_GT(page_miss, 0);
ASSERT_GT(page_hit, 0);
ASSERT_GT(block_miss, 0);
ASSERT_EQ(compressed_block_miss, 0);
ASSERT_EQ(compressed_block_hit, 0);
break;
case 1:
// page cache, block cache, compressed cache
ASSERT_GT(page_miss, 0);
ASSERT_GT(block_miss, 0);
ASSERT_GT(compressed_block_miss, 0);
break;
case 2:
// page cache, block cache, compressed cache + KNoCompression
ASSERT_GT(page_miss, 0);
ASSERT_GT(page_hit, 0);
ASSERT_GT(block_miss, 0);
ASSERT_GT(compressed_block_miss, 0);
// remember kNoCompression
ASSERT_EQ(compressed_block_hit, 0);
break;
case 3:
case 4:
// page cache, no block cache, no compressed cache
ASSERT_GT(page_miss, 0);
ASSERT_GT(page_hit, 0);
ASSERT_EQ(compressed_block_hit, 0);
ASSERT_EQ(compressed_block_miss, 0);
break;
default:
ASSERT_TRUE(false);
}
options.create_if_missing = true;
DestroyAndReopen(options);
pcache->Close();
}
}
new_pcache);
};
} // namespace rocksdb

View File

@ -28,12 +28,6 @@ Status PersistentCacheTier::Close() {
return Status::OK();
}
void PersistentCacheTier::Flush() {
if (next_tier_) {
next_tier_->Flush();
}
}
bool PersistentCacheTier::Reserve(const size_t size) {
// default implementation is a pass through
return true;
@ -52,6 +46,13 @@ std::string PersistentCacheTier::PrintStats() {
return std::string();
}
std::vector<PersistentCacheTier::TierStats> PersistentCacheTier::Stats() {
if (next_tier_) {
return next_tier_->Stats();
}
return std::vector<TierStats>{};
}
//
// PersistentTieredCache implementation
//
@ -71,16 +72,16 @@ Status PersistentTieredCache::Close() {
return status;
}
void PersistentTieredCache::Flush() {
assert(!tiers_.empty());
tiers_.front()->Flush();
}
bool PersistentTieredCache::Erase(const Slice& key) {
assert(!tiers_.empty());
return tiers_.front()->Erase(key);
}
std::vector<PersistentCacheTier::TierStats> PersistentTieredCache::Stats() {
assert(!tiers_.empty());
return tiers_.front()->Stats();
}
std::string PersistentTieredCache::PrintStats() {
assert(!tiers_.empty());
return tiers_.front()->PrintStats();
@ -106,6 +107,11 @@ void PersistentTieredCache::AddTier(const Tier& tier) {
tiers_.push_back(tier);
}
bool PersistentTieredCache::IsCompressed() {
assert(tiers_.size());
return tiers_.front()->IsCompressed();
}
} // namespace rocksdb
#endif

View File

@ -54,6 +54,173 @@
// null
namespace rocksdb {
// Persistent Cache Config
//
// This struct captures all the options that are used to configure persistent
// cache. Some of the terminologies used in naming the options are
//
// dispatch size :
// This is the size in which IO is dispatched to the device
//
// write buffer size :
// This is the size of an individual write buffer size. Write buffers are
// grouped to form buffered file.
//
// cache size :
// This is the logical maximum for the cache size
//
// qdepth :
// This is the max number of IOs that can issues to the device in parallel
//
// pepeling :
// The writer code path follows pipelined architecture, which means the
// operations are handed off from one stage to another
//
// pipelining backlog size :
// With the pipelined architecture, there can always be backlogging of ops in
// pipeline queues. This is the maximum backlog size after which ops are dropped
// from queue
struct PersistentCacheConfig {
explicit PersistentCacheConfig(
Env* const _env, const std::string& _path, const uint64_t _cache_size,
const std::shared_ptr<Logger>& _log,
const uint32_t _write_buffer_size = 1 * 1024 * 1024 /*1MB*/) {
env = _env;
path = _path;
log = _log;
cache_size = _cache_size;
writer_dispatch_size = write_buffer_size = _write_buffer_size;
}
//
// Validate the settings. Our intentions are to catch erroneous settings ahead
// of time instead going violating invariants or causing dead locks.
//
Status ValidateSettings() const {
// (1) check pre-conditions for variables
if (!env || path.empty()) {
return Status::InvalidArgument("empty or null args");
}
// (2) assert size related invariants
// - cache size cannot be less than cache file size
// - individual write buffer size cannot be greater than cache file size
// - total write buffer size cannot be less than 2X cache file size
if (cache_size < cache_file_size || write_buffer_size >= cache_file_size ||
write_buffer_size * write_buffer_count() < 2 * cache_file_size) {
return Status::InvalidArgument("invalid cache size");
}
// (2) check writer settings
// - Queue depth cannot be 0
// - writer_dispatch_size cannot be greater than writer_buffer_size
// - dispatch size and buffer size need to be aligned
if (!writer_qdepth || writer_dispatch_size > write_buffer_size ||
write_buffer_size % writer_dispatch_size) {
return Status::InvalidArgument("invalid writer settings");
}
return Status::OK();
}
//
// Env abstraction to use for systmer level operations
//
Env* env;
//
// Path for the block cache where blocks are persisted
//
std::string path;
//
// Log handle for logging messages
//
std::shared_ptr<Logger> log;
//
// Enable direct IO for reading
//
bool enable_direct_reads = true;
//
// Enable direct IO for writing
//
bool enable_direct_writes = false;
//
// Logical cache size
//
uint64_t cache_size = std::numeric_limits<uint64_t>::max();
// cache-file-size
//
// Cache consists of multiples of small files. This parameter defines the
// size of an individual cache file
//
// default: 1M
uint32_t cache_file_size = 100ULL * 1024 * 1024;
// writer-qdepth
//
// The writers can issues IO to the devices in parallel. This parameter
// controls the max number if IOs that can issues in parallel to the block
// device
//
// default :1
uint32_t writer_qdepth = 1;
// pipeline-writes
//
// The write optionally follow pipelined architecture. This helps
// avoid regression in the eviction code path of the primary tier. This
// parameter defines if pipelining is enabled or disabled
//
// default: true
bool pipeline_writes_ = true;
// max-write-pipeline-backlog-size
//
// Max pipeline buffer size. This is the maximum backlog we can accumulate
// while waiting for writes. After the limit, new ops will be dropped.
//
// Default: 1GiB
uint64_t max_write_pipeline_backlog_size = 1ULL * 1024 * 1024 * 1024;
// write-buffer-size
//
// This is the size in which buffer slabs are allocated.
//
// Default: 1M
uint32_t write_buffer_size = 1ULL * 1024 * 1024;
// write-buffer-count
//
// This is the total number of buffer slabs. This is calculated as a factor of
// file size in order to avoid dead lock.
size_t write_buffer_count() const {
assert(write_buffer_size);
return (writer_qdepth + 1.2) * cache_file_size / write_buffer_size;
}
// writer-dispatch-size
//
// The writer thread will dispatch the IO at the specified IO size
//
// default: 1M
uint64_t writer_dispatch_size = 1ULL * 1024 * 1024;
// is_compressed
//
// This option determines if the cache will run in compressed mode or
// uncompressed mode
bool is_compressed = true;
PersistentCacheConfig MakePersistentCacheConfig(
const std::string& path, const uint64_t size,
const std::shared_ptr<Logger>& log);
};
// Persistent Cache Tier
//
// This a logical abstraction that defines a tier of the persistent cache. Tiers
@ -73,9 +240,6 @@ class PersistentCacheTier : public PersistentCache {
// Close the persistent cache tier
virtual Status Close();
// Flush the pending writes
virtual void Flush();
// Reserve space up to 'size' bytes
virtual bool Reserve(const size_t size);
@ -86,7 +250,7 @@ class PersistentCacheTier : public PersistentCache {
virtual std::string PrintStats();
// Expose stats
virtual std::vector<TierStats> Stats() = 0;
virtual std::vector<TierStats> Stats();
// Insert to page cache
virtual Status Insert(const Slice& page_key, const char* data,
@ -96,6 +260,9 @@ class PersistentCacheTier : public PersistentCache {
virtual Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
size_t* size) = 0;
// Does it store compressed data ?
virtual bool IsCompressed() = 0;
// Return a reference to next tier
virtual Tier& next_tier() { return next_tier_; }
@ -105,6 +272,12 @@ class PersistentCacheTier : public PersistentCache {
next_tier_ = tier;
}
virtual void TEST_Flush() {
if (next_tier_) {
next_tier_->TEST_Flush();
}
}
private:
Tier next_tier_; // next tier
};
@ -120,13 +293,14 @@ class PersistentTieredCache : public PersistentCacheTier {
Status Open() override;
Status Close() override;
void Flush() override;
bool Erase(const Slice& key) override;
std::string PrintStats() override;
std::vector<TierStats> Stats() override;
Status Insert(const Slice& page_key, const char* data,
const size_t size) override;
Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
size_t* size) override;
bool IsCompressed() override;
void AddTier(const Tier& tier);
@ -140,6 +314,12 @@ class PersistentTieredCache : public PersistentCacheTier {
(*it)->set_next_tier(tier);
}
void TEST_Flush() override {
assert(!tiers_.empty());
tiers_.front()->TEST_Flush();
PersistentCacheTier::TEST_Flush();
}
protected:
std::list<Tier> tiers_; // list of tiers top-down
};