rocksdb/utilities/persistent_cache/persistent_cache_bench.cc
mrambacher 12f1137355 Add a SystemClock class to capture the time functions of an Env (#7858)
Summary:
Introduces and uses a SystemClock class to RocksDB.  This class contains the time-related functions of an Env and these functions can be redirected from the Env to the SystemClock.

Many of the places that used an Env (Timer, PerfStepTimer, RepeatableThread, RateLimiter, WriteController) for time-related functions have been changed to use SystemClock instead.  There are likely more places that can be changed, but this is a start to show what can/should be done.  Over time it would be nice to migrate most (if not all) of the uses of the time functions from the Env to the SystemClock.

There are several Env classes that implement these functions.  Most of these have not been converted yet to SystemClock implementations; that will come in a subsequent PR.  It would be good to unify many of the Mock Timer implementations, so that they behave similarly and be tested similarly (some override Sleep, some use a MockSleep, etc).

Additionally, this change will allow new methods to be introduced to the SystemClock (like https://github.com/facebook/rocksdb/issues/7101 WaitFor) in a consistent manner across a smaller number of classes.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7858

Reviewed By: pdillinger

Differential Revision: D26006406

Pulled By: mrambacher

fbshipit-source-id: ed10a8abbdab7ff2e23d69d85bd25b3e7e899e90
2021-01-25 22:09:11 -08:00

360 lines
11 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#ifndef ROCKSDB_LITE
#ifndef GFLAGS
#include <cstdio>
int main() { fprintf(stderr, "Please install gflags to run tools\n"); }
#else
#include <atomic>
#include <functional>
#include <memory>
#include <sstream>
#include <unordered_map>
#include "monitoring/histogram.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/system_clock.h"
#include "table/block_based/block_builder.h"
#include "util/gflags_compat.h"
#include "util/mutexlock.h"
#include "util/stop_watch.h"
#include "utilities/persistent_cache/block_cache_tier.h"
#include "utilities/persistent_cache/persistent_cache_tier.h"
#include "utilities/persistent_cache/volatile_tier_impl.h"
DEFINE_int32(nsec, 10, "nsec");
DEFINE_int32(nthread_write, 1, "Insert threads");
DEFINE_int32(nthread_read, 1, "Lookup threads");
DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile");
DEFINE_string(log_path, "/tmp/log", "Path for the log file");
DEFINE_uint64(cache_size, std::numeric_limits<uint64_t>::max(), "Cache size");
DEFINE_int32(iosize, 4 * 1024, "Read IO size");
DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size");
DEFINE_int32(writer_qdepth, 1, "File writer qdepth");
DEFINE_bool(enable_pipelined_writes, false, "Enable async writes");
DEFINE_string(cache_type, "block_cache",
"Cache type. (block_cache, volatile, tiered)");
DEFINE_bool(benchmark, false, "Benchmark mode");
DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier.");
namespace ROCKSDB_NAMESPACE {
std::unique_ptr<PersistentCacheTier> NewVolatileCache() {
assert(FLAGS_cache_size != std::numeric_limits<uint64_t>::max());
std::unique_ptr<PersistentCacheTier> pcache(
new VolatileCacheTier(FLAGS_cache_size));
return pcache;
}
std::unique_ptr<PersistentCacheTier> NewBlockCache() {
std::shared_ptr<Logger> log;
if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
return nullptr;
}
PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log);
opt.writer_dispatch_size = FLAGS_writer_iosize;
opt.writer_qdepth = FLAGS_writer_qdepth;
opt.pipeline_writes = FLAGS_enable_pipelined_writes;
opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
std::unique_ptr<PersistentCacheTier> cache(new BlockCacheTier(opt));
Status status = cache->Open();
return cache;
}
// create a new cache tier
// construct a tiered RAM+Block cache
std::unique_ptr<PersistentTieredCache> NewTieredCache(
const size_t mem_size, const PersistentCacheConfig& opt) {
std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache());
// create primary tier
assert(mem_size);
auto pcache =
std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(mem_size));
tcache->AddTier(pcache);
// create secondary tier
auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt));
tcache->AddTier(scache);
Status s = tcache->Open();
assert(s.ok());
return tcache;
}
std::unique_ptr<PersistentTieredCache> NewTieredCache() {
std::shared_ptr<Logger> log;
if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
abort();
}
auto pct = FLAGS_volatile_cache_pct / static_cast<double>(100);
PersistentCacheConfig opt(Env::Default(), FLAGS_path,
(1 - pct) * FLAGS_cache_size, log);
opt.writer_dispatch_size = FLAGS_writer_iosize;
opt.writer_qdepth = FLAGS_writer_qdepth;
opt.pipeline_writes = FLAGS_enable_pipelined_writes;
opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
return NewTieredCache(FLAGS_cache_size * pct, opt);
}
//
// Benchmark driver
//
class CacheTierBenchmark {
public:
explicit CacheTierBenchmark(std::shared_ptr<PersistentCacheTier>&& cache)
: cache_(cache) {
if (FLAGS_nthread_read) {
fprintf(stdout, "Pre-populating\n");
Prepop();
fprintf(stdout, "Pre-population completed\n");
}
stats_.Clear();
// Start IO threads
std::list<port::Thread> threads;
Spawn(FLAGS_nthread_write, &threads,
std::bind(&CacheTierBenchmark::Write, this));
Spawn(FLAGS_nthread_read, &threads,
std::bind(&CacheTierBenchmark::Read, this));
// Wait till FLAGS_nsec and then signal to quit
StopWatchNano t(SystemClock::Default(), /*auto_start=*/true);
size_t sec = t.ElapsedNanos() / 1000000000ULL;
while (!quit_) {
sec = t.ElapsedNanos() / 1000000000ULL;
quit_ = sec > size_t(FLAGS_nsec);
/* sleep override */ sleep(1);
}
// Wait for threads to exit
Join(&threads);
// Print stats
PrintStats(sec);
// Close the cache
cache_->TEST_Flush();
cache_->Close();
}
private:
void PrintStats(const size_t sec) {
std::ostringstream msg;
msg << "Test stats" << std::endl
<< "* Elapsed: " << sec << " s" << std::endl
<< "* Write Latency:" << std::endl
<< stats_.write_latency_.ToString() << std::endl
<< "* Read Latency:" << std::endl
<< stats_.read_latency_.ToString() << std::endl
<< "* Bytes written:" << std::endl
<< stats_.bytes_written_.ToString() << std::endl
<< "* Bytes read:" << std::endl
<< stats_.bytes_read_.ToString() << std::endl
<< "Cache stats:" << std::endl
<< cache_->PrintStats() << std::endl;
fprintf(stderr, "%s\n", msg.str().c_str());
}
//
// Insert implementation and corresponding helper functions
//
void Prepop() {
for (uint64_t i = 0; i < 1024 * 1024; ++i) {
InsertKey(i);
insert_key_limit_++;
read_key_limit_++;
}
// Wait until data is flushed
cache_->TEST_Flush();
// warmup the cache
for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) {
}
}
void Write() {
while (!quit_) {
InsertKey(insert_key_limit_++);
}
}
void InsertKey(const uint64_t key) {
// construct key
uint64_t k[3];
Slice block_key = FillKey(k, key);
// construct value
auto block = NewBlock(key);
// insert
StopWatchNano timer(SystemClock::Default(), /*auto_start=*/true);
while (true) {
Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize);
if (status.ok()) {
break;
}
// transient error is possible if we run without pipelining
assert(!FLAGS_enable_pipelined_writes);
}
// adjust stats
const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
stats_.write_latency_.Add(elapsed_micro);
stats_.bytes_written_.Add(FLAGS_iosize);
}
//
// Read implementation
//
void Read() {
while (!quit_) {
ReadKey(random() % read_key_limit_);
}
}
void ReadKey(const uint64_t val) {
// construct key
uint64_t k[3];
Slice key = FillKey(k, val);
// Lookup in cache
StopWatchNano timer(SystemClock::Default(), /*auto_start=*/true);
std::unique_ptr<char[]> block;
size_t size;
Status status = cache_->Lookup(key, &block, &size);
if (!status.ok()) {
fprintf(stderr, "%s\n", status.ToString().c_str());
}
assert(status.ok());
assert(size == (size_t) FLAGS_iosize);
// adjust stats
const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
stats_.read_latency_.Add(elapsed_micro);
stats_.bytes_read_.Add(FLAGS_iosize);
// verify content
if (!FLAGS_benchmark) {
auto expected_block = NewBlock(val);
assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0);
}
}
// create data for a key by filling with a certain pattern
std::unique_ptr<char[]> NewBlock(const uint64_t val) {
std::unique_ptr<char[]> data(new char[FLAGS_iosize]);
memset(data.get(), val % 255, FLAGS_iosize);
return data;
}
// spawn threads
void Spawn(const size_t n, std::list<port::Thread>* threads,
const std::function<void()>& fn) {
for (size_t i = 0; i < n; ++i) {
threads->emplace_back(fn);
}
}
// join threads
void Join(std::list<port::Thread>* threads) {
for (auto& th : *threads) {
th.join();
}
}
// construct key
Slice FillKey(uint64_t (&k)[3], const uint64_t val) {
k[0] = k[1] = 0;
k[2] = val;
void* p = static_cast<void*>(&k);
return Slice(static_cast<char*>(p), sizeof(k));
}
// benchmark stats
struct Stats {
void Clear() {
bytes_written_.Clear();
bytes_read_.Clear();
read_latency_.Clear();
write_latency_.Clear();
}
HistogramImpl bytes_written_;
HistogramImpl bytes_read_;
HistogramImpl read_latency_;
HistogramImpl write_latency_;
};
std::shared_ptr<PersistentCacheTier> cache_; // cache implementation
std::atomic<uint64_t> insert_key_limit_{0}; // data inserted upto
std::atomic<uint64_t> read_key_limit_{0}; // data can be read safely upto
bool quit_ = false; // Quit thread ?
mutable Stats stats_; // Stats
};
} // namespace ROCKSDB_NAMESPACE
//
// main
//
int main(int argc, char** argv) {
GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") +
std::string(argv[0]) + " [OPTIONS]...");
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false);
std::ostringstream msg;
msg << "Config" << std::endl
<< "======" << std::endl
<< "* nsec=" << FLAGS_nsec << std::endl
<< "* nthread_write=" << FLAGS_nthread_write << std::endl
<< "* path=" << FLAGS_path << std::endl
<< "* cache_size=" << FLAGS_cache_size << std::endl
<< "* iosize=" << FLAGS_iosize << std::endl
<< "* writer_iosize=" << FLAGS_writer_iosize << std::endl
<< "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl
<< "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes
<< std::endl
<< "* cache_type=" << FLAGS_cache_type << std::endl
<< "* benchmark=" << FLAGS_benchmark << std::endl
<< "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl;
fprintf(stderr, "%s\n", msg.str().c_str());
std::shared_ptr<ROCKSDB_NAMESPACE::PersistentCacheTier> cache;
if (FLAGS_cache_type == "block_cache") {
fprintf(stderr, "Using block cache implementation\n");
cache = ROCKSDB_NAMESPACE::NewBlockCache();
} else if (FLAGS_cache_type == "volatile") {
fprintf(stderr, "Using volatile cache implementation\n");
cache = ROCKSDB_NAMESPACE::NewVolatileCache();
} else if (FLAGS_cache_type == "tiered") {
fprintf(stderr, "Using tiered cache implementation\n");
cache = ROCKSDB_NAMESPACE::NewTieredCache();
} else {
fprintf(stderr, "Unknown option for cache\n");
}
assert(cache);
if (!cache) {
fprintf(stderr, "Error creating cache\n");
abort();
}
std::unique_ptr<ROCKSDB_NAMESPACE::CacheTierBenchmark> benchmark(
new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache)));
return 0;
}
#endif // #ifndef GFLAGS
#else
int main(int, char**) { return 0; }
#endif