From 09df74c540540a55dc978cae4a48a79cdcdb1c69 Mon Sep 17 00:00:00 2001 From: anand76 Date: Tue, 20 Apr 2021 21:33:09 -0700 Subject: [PATCH] Allow cache_bench and db_bench to use 3rd party tiered cache --- CMakeLists.txt | 3 +- Makefile | 5 +- TARGETS | 14 + buckifier/buckify_rocksdb.py | 5 + cache/cache_bench.cc | 377 +-------------------- cache/cache_bench_tool.cc | 506 +++++++++++++++++++++++++++++ cache/lru_cache.cc | 1 - include/rocksdb/cache.h | 144 ++++---- include/rocksdb/cache_bench_tool.h | 14 + include/rocksdb/configurable.h | 4 + include/rocksdb/tiered_cache.h | 2 + options/configurable.cc | 6 + src.mk | 3 + tools/db_bench_tool.cc | 39 ++- 14 files changed, 667 insertions(+), 456 deletions(-) create mode 100644 cache/cache_bench_tool.cc create mode 100644 include/rocksdb/cache_bench_tool.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 81cec6633..cc6aed647 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1315,7 +1315,8 @@ if(WITH_BENCHMARK_TOOLS) ${ROCKSDB_LIB} ${THIRDPARTY_LIBS}) add_executable(cache_bench${ARTIFACT_SUFFIX} - cache/cache_bench.cc) + cache/cache_bench.cc + cache/cache_bench_tool.cc) target_link_libraries(cache_bench${ARTIFACT_SUFFIX} ${ROCKSDB_LIB} ${GFLAGS_LIB}) diff --git a/Makefile b/Makefile index a974dc3c1..4f2972f9b 100644 --- a/Makefile +++ b/Makefile @@ -511,12 +511,13 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full TEST_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(TEST_LIB_SOURCES) $(MOCK_LIB_SOURCES)) $(GTEST) BENCH_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(BENCH_LIB_SOURCES)) +CACHE_BENCH_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(CACHE_BENCH_LIB_SOURCES)) TOOL_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(TOOL_LIB_SOURCES)) ANALYZE_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(ANALYZER_LIB_SOURCES)) STRESS_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(STRESS_LIB_SOURCES)) ALL_SOURCES = $(LIB_SOURCES) $(TEST_LIB_SOURCES) $(MOCK_LIB_SOURCES) $(GTEST_DIR)/gtest/gtest-all.cc -ALL_SOURCES += $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(ANALYZER_LIB_SOURCES) $(STRESS_LIB_SOURCES) +ALL_SOURCES += $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(CACHE_BENCH_LIB_SOURCES) $(ANALYZER_LIB_SOURCES) $(STRESS_LIB_SOURCES) ALL_SOURCES += $(TEST_MAIN_SOURCES) $(TOOL_MAIN_SOURCES) $(BENCH_MAIN_SOURCES) TESTS = $(patsubst %.cc, %, $(notdir $(TEST_MAIN_SOURCES))) @@ -1462,7 +1463,7 @@ folly_synchronization_distributed_mutex_test: $(OBJ_DIR)/third-party/folly/folly $(AM_LINK) endif -cache_bench: $(OBJ_DIR)/cache/cache_bench.o $(LIBRARY) +cache_bench: $(OBJ_DIR)/cache/cache_bench.o $(CACHE_BENCH_OBJECTS) $(LIBRARY) $(AM_LINK) persistent_cache_bench: $(OBJ_DIR)/utilities/persistent_cache/persistent_cache_bench.o $(LIBRARY) diff --git a/TARGETS b/TARGETS index 61c48ef2c..f2dff8a90 100644 --- a/TARGETS +++ b/TARGETS @@ -784,6 +784,20 @@ cpp_library( link_whole = False, ) +cpp_library( + name = "rocksdb_cache_bench_tools_lib", + srcs = ["cache/cache_bench_tool.cc"], + auto_headers = AutoHeaders.RECURSIVE_GLOB, + arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS, + compiler_flags = ROCKSDB_COMPILER_FLAGS, + os_deps = ROCKSDB_OS_DEPS, + os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS, + preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS, + deps = [":rocksdb_lib"], + external_deps = ROCKSDB_EXTERNAL_DEPS, + link_whole = False, +) + cpp_library( name = "rocksdb_stress_lib", srcs = [ diff --git a/buckifier/buckify_rocksdb.py b/buckifier/buckify_rocksdb.py index 6dfedbce1..3d10e2f6a 100644 --- a/buckifier/buckify_rocksdb.py +++ b/buckifier/buckify_rocksdb.py @@ -169,6 +169,11 @@ def generate_targets(repo_path, deps_map): src_mk.get("ANALYZER_LIB_SOURCES", []) + ["test_util/testutil.cc"], [":rocksdb_lib"]) + # rocksdb_cache_bench_tools_lib + TARGETS.add_library( + "rocksdb_cache_bench_tools_lib", + src_mk.get("CACHE_BENCH_LIB_SOURCES", []), + [":rocksdb_lib"]) # rocksdb_stress_lib TARGETS.add_rocksdb_library( "rocksdb_stress_lib", diff --git a/cache/cache_bench.cc b/cache/cache_bench.cc index 48cf2b44d..05fc252b4 100644 --- a/cache/cache_bench.cc +++ b/cache/cache_bench.cc @@ -1,7 +1,11 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// Copyright (c) 2013-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. #ifndef GFLAGS #include @@ -10,375 +14,8 @@ int main() { return 1; } #else - -#include - -#include -#include -#include - -#include "port/port.h" -#include "rocksdb/cache.h" -#include "rocksdb/db.h" -#include "rocksdb/env.h" -#include "rocksdb/system_clock.h" -#include "util/coding.h" -#include "util/gflags_compat.h" -#include "util/hash.h" -#include "util/mutexlock.h" -#include "util/random.h" - -using GFLAGS_NAMESPACE::ParseCommandLineFlags; - -static constexpr uint32_t KiB = uint32_t{1} << 10; -static constexpr uint32_t MiB = KiB << 10; -static constexpr uint64_t GiB = MiB << 10; - -DEFINE_uint32(threads, 16, "Number of concurrent threads to run."); -DEFINE_uint64(cache_size, 1 * GiB, - "Number of bytes to use as a cache of uncompressed data."); -DEFINE_uint32(num_shard_bits, 6, "shard_bits."); - -DEFINE_double(resident_ratio, 0.25, - "Ratio of keys fitting in cache to keyspace."); -DEFINE_uint64(ops_per_thread, 0, - "Number of operations per thread. (Default: 5 * keyspace size)"); -DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added."); - -DEFINE_uint32(skew, 5, "Degree of skew in key selection"); -DEFINE_bool(populate_cache, true, "Populate cache before operations"); - -DEFINE_uint32(lookup_insert_percent, 87, - "Ratio of lookup (+ insert on not found) to total workload " - "(expressed as a percentage)"); -DEFINE_uint32(insert_percent, 2, - "Ratio of insert to total workload (expressed as a percentage)"); -DEFINE_uint32(lookup_percent, 10, - "Ratio of lookup to total workload (expressed as a percentage)"); -DEFINE_uint32(erase_percent, 1, - "Ratio of erase to total workload (expressed as a percentage)"); - -DEFINE_bool(use_clock_cache, false, ""); - -namespace ROCKSDB_NAMESPACE { - -class CacheBench; -namespace { -// State shared by all concurrent executions of the same benchmark. -class SharedState { - public: - explicit SharedState(CacheBench* cache_bench) - : cv_(&mu_), - num_initialized_(0), - start_(false), - num_done_(0), - cache_bench_(cache_bench) {} - - ~SharedState() {} - - port::Mutex* GetMutex() { - return &mu_; - } - - port::CondVar* GetCondVar() { - return &cv_; - } - - CacheBench* GetCacheBench() const { - return cache_bench_; - } - - void IncInitialized() { - num_initialized_++; - } - - void IncDone() { - num_done_++; - } - - bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; } - - bool AllDone() const { return num_done_ >= FLAGS_threads; } - - void SetStart() { - start_ = true; - } - - bool Started() const { - return start_; - } - - private: - port::Mutex mu_; - port::CondVar cv_; - - uint64_t num_initialized_; - bool start_; - uint64_t num_done_; - - CacheBench* cache_bench_; -}; - -// Per-thread state for concurrent executions of the same benchmark. -struct ThreadState { - uint32_t tid; - Random64 rnd; - SharedState* shared; - - ThreadState(uint32_t index, SharedState* _shared) - : tid(index), rnd(1000 + index), shared(_shared) {} -}; - -struct KeyGen { - char key_data[27]; - - Slice GetRand(Random64& rnd, uint64_t max_key) { - uint64_t raw = rnd.Next(); - // Skew according to setting - for (uint32_t i = 0; i < FLAGS_skew; ++i) { - raw = std::min(raw, rnd.Next()); - } - uint64_t key = FastRange64(raw, max_key); - // Variable size and alignment - size_t off = key % 8; - key_data[0] = char{42}; - EncodeFixed64(key_data + 1, key); - key_data[9] = char{11}; - EncodeFixed64(key_data + 10, key); - key_data[18] = char{4}; - EncodeFixed64(key_data + 19, key); - return Slice(&key_data[off], sizeof(key_data) - off); - } -}; - -char* createValue(Random64& rnd) { - char* rv = new char[FLAGS_value_bytes]; - // Fill with some filler data, and take some CPU time - for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) { - EncodeFixed64(rv + i, rnd.Next()); - } - return rv; -} - -void deleter(const Slice& /*key*/, void* value) { - delete[] static_cast(value); -} -} // namespace - -class CacheBench { - static constexpr uint64_t kHundredthUint64 = - std::numeric_limits::max() / 100U; - - public: - CacheBench() - : max_key_(static_cast(FLAGS_cache_size / FLAGS_resident_ratio / - FLAGS_value_bytes)), - lookup_insert_threshold_(kHundredthUint64 * - FLAGS_lookup_insert_percent), - insert_threshold_(lookup_insert_threshold_ + - kHundredthUint64 * FLAGS_insert_percent), - lookup_threshold_(insert_threshold_ + - kHundredthUint64 * FLAGS_lookup_percent), - erase_threshold_(lookup_threshold_ + - kHundredthUint64 * FLAGS_erase_percent) { - if (erase_threshold_ != 100U * kHundredthUint64) { - fprintf(stderr, "Percentages must add to 100.\n"); - exit(1); - } - if (FLAGS_use_clock_cache) { - cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits); - if (!cache_) { - fprintf(stderr, "Clock cache not supported.\n"); - exit(1); - } - } else { - cache_ = NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits); - } - if (FLAGS_ops_per_thread == 0) { - FLAGS_ops_per_thread = 5 * max_key_; - } - } - - ~CacheBench() {} - - void PopulateCache() { - Random64 rnd(1); - KeyGen keygen; - for (uint64_t i = 0; i < 2 * FLAGS_cache_size; i += FLAGS_value_bytes) { - cache_->Insert(keygen.GetRand(rnd, max_key_), createValue(rnd), - FLAGS_value_bytes, &deleter); - } - } - - bool Run() { - ROCKSDB_NAMESPACE::Env* env = ROCKSDB_NAMESPACE::Env::Default(); - const auto& clock = env->GetSystemClock(); - - PrintEnv(); - SharedState shared(this); - std::vector > threads(FLAGS_threads); - for (uint32_t i = 0; i < FLAGS_threads; i++) { - threads[i].reset(new ThreadState(i, &shared)); - env->StartThread(ThreadBody, threads[i].get()); - } - { - MutexLock l(shared.GetMutex()); - while (!shared.AllInitialized()) { - shared.GetCondVar()->Wait(); - } - // Record start time - uint64_t start_time = clock->NowMicros(); - - // Start all threads - shared.SetStart(); - shared.GetCondVar()->SignalAll(); - - // Wait threads to complete - while (!shared.AllDone()) { - shared.GetCondVar()->Wait(); - } - - // Record end time - uint64_t end_time = clock->NowMicros(); - double elapsed = static_cast(end_time - start_time) * 1e-6; - uint32_t qps = static_cast( - static_cast(FLAGS_threads * FLAGS_ops_per_thread) / elapsed); - fprintf(stdout, "Complete in %.3f s; QPS = %u\n", elapsed, qps); - } - return true; - } - - private: - std::shared_ptr cache_; - const uint64_t max_key_; - // Cumulative thresholds in the space of a random uint64_t - const uint64_t lookup_insert_threshold_; - const uint64_t insert_threshold_; - const uint64_t lookup_threshold_; - const uint64_t erase_threshold_; - - static void ThreadBody(void* v) { - ThreadState* thread = static_cast(v); - SharedState* shared = thread->shared; - - { - MutexLock l(shared->GetMutex()); - shared->IncInitialized(); - if (shared->AllInitialized()) { - shared->GetCondVar()->SignalAll(); - } - while (!shared->Started()) { - shared->GetCondVar()->Wait(); - } - } - thread->shared->GetCacheBench()->OperateCache(thread); - - { - MutexLock l(shared->GetMutex()); - shared->IncDone(); - if (shared->AllDone()) { - shared->GetCondVar()->SignalAll(); - } - } - } - - void OperateCache(ThreadState* thread) { - // To use looked-up values - uint64_t result = 0; - // To hold handles for a non-trivial amount of time - Cache::Handle* handle = nullptr; - KeyGen gen; - for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { - Slice key = gen.GetRand(thread->rnd, max_key_); - uint64_t random_op = thread->rnd.Next(); - if (random_op < lookup_insert_threshold_) { - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - // do lookup - handle = cache_->Lookup(key); - if (handle) { - // do something with the data - result += NPHash64(static_cast(cache_->Value(handle)), - FLAGS_value_bytes); - } else { - // do insert - cache_->Insert(key, createValue(thread->rnd), FLAGS_value_bytes, - &deleter, &handle); - } - } else if (random_op < insert_threshold_) { - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - // do insert - cache_->Insert(key, createValue(thread->rnd), FLAGS_value_bytes, - &deleter, &handle); - } else if (random_op < lookup_threshold_) { - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - // do lookup - handle = cache_->Lookup(key); - if (handle) { - // do something with the data - result += NPHash64(static_cast(cache_->Value(handle)), - FLAGS_value_bytes); - } - } else if (random_op < erase_threshold_) { - // do erase - cache_->Erase(key); - } else { - // Should be extremely unlikely (noop) - assert(random_op >= kHundredthUint64 * 100U); - } - } - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - } - - void PrintEnv() const { - printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); - printf("Number of threads : %u\n", FLAGS_threads); - printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); - printf("Cache size : %" PRIu64 "\n", FLAGS_cache_size); - printf("Num shard bits : %u\n", FLAGS_num_shard_bits); - printf("Max key : %" PRIu64 "\n", max_key_); - printf("Resident ratio : %g\n", FLAGS_resident_ratio); - printf("Skew degree : %u\n", FLAGS_skew); - printf("Populate cache : %d\n", int{FLAGS_populate_cache}); - printf("Lookup+Insert pct : %u%%\n", FLAGS_lookup_insert_percent); - printf("Insert percentage : %u%%\n", FLAGS_insert_percent); - printf("Lookup percentage : %u%%\n", FLAGS_lookup_percent); - printf("Erase percentage : %u%%\n", FLAGS_erase_percent); - printf("----------------------------\n"); - } -}; -} // namespace ROCKSDB_NAMESPACE - +#include int main(int argc, char** argv) { - ParseCommandLineFlags(&argc, &argv, true); - - if (FLAGS_threads <= 0) { - fprintf(stderr, "threads number <= 0\n"); - exit(1); - } - - ROCKSDB_NAMESPACE::CacheBench bench; - if (FLAGS_populate_cache) { - bench.PopulateCache(); - printf("Population complete\n"); - printf("----------------------------\n"); - } - if (bench.Run()) { - return 0; - } else { - return 1; - } + return ROCKSDB_NAMESPACE::cache_bench_tool(argc, argv); } - #endif // GFLAGS diff --git a/cache/cache_bench_tool.cc b/cache/cache_bench_tool.cc new file mode 100644 index 000000000..db40ddcc5 --- /dev/null +++ b/cache/cache_bench_tool.cc @@ -0,0 +1,506 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef GFLAGS +#include +int main() { + fprintf(stderr, "Please install gflags to run rocksdb tools\n"); + return 1; +} +#else + +#include + +#include +#include +#include + +#include "options/configurable_helper.h" +#include "port/port.h" +#include "rocksdb/cache.h" +#include "rocksdb/cache_bench_tool.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/system_clock.h" +#include "rocksdb/tiered_cache.h" +#include "rocksdb/utilities/object_registry.h" +#include "rocksdb/utilities/options_type.h" +#include "util/coding.h" +#include "util/gflags_compat.h" +#include "util/hash.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "util/stop_watch.h" + +using GFLAGS_NAMESPACE::ParseCommandLineFlags; + +static constexpr uint32_t KiB = uint32_t{1} << 10; +static constexpr uint32_t MiB = KiB << 10; +static constexpr uint64_t GiB = MiB << 10; + +DEFINE_uint32(threads, 16, "Number of concurrent threads to run."); +DEFINE_uint64(cache_size, 1 * GiB, + "Number of bytes to use as a cache of uncompressed data."); +DEFINE_uint32(num_shard_bits, 6, "shard_bits."); + +DEFINE_double(resident_ratio, 0.25, + "Ratio of keys fitting in cache to keyspace."); +DEFINE_uint64(ops_per_thread, 0, + "Number of operations per thread. (Default: 5 * keyspace size)"); +DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added."); + +DEFINE_uint32(skew, 5, "Degree of skew in key selection"); +DEFINE_bool(populate_cache, true, "Populate cache before operations"); + +DEFINE_uint32(lookup_insert_percent, 87, + "Ratio of lookup (+ insert on not found) to total workload " + "(expressed as a percentage)"); +DEFINE_uint32(insert_percent, 2, + "Ratio of insert to total workload (expressed as a percentage)"); +DEFINE_uint32(lookup_percent, 10, + "Ratio of lookup to total workload (expressed as a percentage)"); +DEFINE_uint32(erase_percent, 1, + "Ratio of erase to total workload (expressed as a percentage)"); + +DEFINE_bool(use_clock_cache, false, ""); + +DEFINE_bool(skewed, false, "If true, skew the key access distribution"); +DEFINE_string(tiered_cache_uri, "", + "Full URI for creating a custom NVM cache object"); +static class std::shared_ptr tiered_cache; + +namespace ROCKSDB_NAMESPACE { +class CacheBench; + +namespace { +// State shared by all concurrent executions of the same benchmark. +class SharedState { + public: + explicit SharedState(CacheBench* cache_bench) + : cv_(&mu_), + num_initialized_(0), + start_(false), + num_done_(0), + cache_bench_(cache_bench) {} + + ~SharedState() {} + + port::Mutex* GetMutex() { return &mu_; } + + port::CondVar* GetCondVar() { return &cv_; } + + CacheBench* GetCacheBench() const { return cache_bench_; } + + void IncInitialized() { num_initialized_++; } + + void IncDone() { num_done_++; } + + bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; } + + bool AllDone() const { return num_done_ >= FLAGS_threads; } + + void SetStart() { start_ = true; } + + bool Started() const { return start_; } + + private: + port::Mutex mu_; + port::CondVar cv_; + + uint64_t num_initialized_; + bool start_; + uint64_t num_done_; + + CacheBench* cache_bench_; +}; + +class Stats { + private: + uint64_t hits; + uint64_t misses; + uint64_t lookup_us; + uint64_t insert_us; + + public: + Stats() : hits(0), misses(0), lookup_us(0), insert_us(0) {} + + void AddHits(uint64_t inc) { hits += inc; } + void AddMisses(uint64_t inc) { misses += inc; } + void AddLookupUs(uint64_t lookup) { lookup_us += lookup; } + void AddInsertUs(uint64_t insert) { insert_us += insert; } + void Merge(const Stats& other) { + hits += other.hits; + misses += other.misses; + lookup_us += other.lookup_us; + insert_us += other.insert_us; + } + void Report() { + fprintf(stdout, "%lu hits, %lu misses, %lu lookup_ns, %lu insert_ns\n", + hits, misses, lookup_us, insert_us); + fflush(stdout); + } +}; + +// Per-thread state for concurrent executions of the same benchmark. +struct ThreadState { + uint32_t tid; + Random64 rnd; + Stats stats; + SharedState* shared; + + ThreadState(uint32_t index, SharedState* _shared) + : tid(index), rnd(1000 + index), shared(_shared) {} +}; + +struct KeyGen { + char key_data[27]; + + Slice GetRand(Random64& rnd, uint64_t max_key, int max_log) { + uint64_t key = 0; + if (!FLAGS_skewed) { + uint64_t raw = rnd.Next(); + // Skew according to setting + for (uint32_t i = 0; i < FLAGS_skew; ++i) { + raw = std::min(raw, rnd.Next()); + } + key = FastRange64(raw, max_key); + } else { + key = rnd.Skewed(max_log); + if (key > max_key) { + key -= max_key; + } + } + // Variable size and alignment + size_t off = key % 8; + key_data[0] = char{42}; + EncodeFixed64(key_data + 1, key); + key_data[9] = char{11}; + EncodeFixed64(key_data + 10, key); + key_data[18] = char{4}; + EncodeFixed64(key_data + 19, key); + return Slice(&key_data[off], sizeof(key_data) - off); + } +}; + +char* createValue(Random64& rnd) { + char* rv = new char[FLAGS_value_bytes]; + // Fill with some filler data, and take some CPU time + for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) { + EncodeFixed64(rv + i, rnd.Next()); + } + return rv; +} + +void helperCallback(Cache::SizeCallback* size_cb, + Cache::SaveToCallback* save_cb, + Cache::DeletionCallback* del_cb) { + if (size_cb) { + *size_cb = [](void* /*obj*/) -> size_t { return FLAGS_value_bytes; }; + } + if (save_cb) { + *save_cb = [](void* obj, size_t /*offset*/, size_t size, + void* out) -> Status { + memcpy(out, obj, size); + return Status::OK(); + }; + } + if (del_cb) { + *del_cb = [](const Slice& /*key*/, void* obj) -> void { + delete[] static_cast(obj); + }; + } +} +} // namespace + +class CacheBench { + static constexpr uint64_t kHundredthUint64 = + std::numeric_limits::max() / 100U; + + public: + CacheBench() + : max_key_(static_cast(FLAGS_cache_size / FLAGS_resident_ratio / + FLAGS_value_bytes)), + lookup_insert_threshold_(kHundredthUint64 * + FLAGS_lookup_insert_percent), + insert_threshold_(lookup_insert_threshold_ + + kHundredthUint64 * FLAGS_insert_percent), + lookup_threshold_(insert_threshold_ + + kHundredthUint64 * FLAGS_lookup_percent), + erase_threshold_(lookup_threshold_ + + kHundredthUint64 * FLAGS_erase_percent), + skewed_(FLAGS_skewed) { + if (erase_threshold_ != 100U * kHundredthUint64) { + fprintf(stderr, "Percentages must add to 100.\n"); + exit(1); + } + + if (skewed_) { + uint64_t max_key = max_key_; + max_log_ = 0; + while (max_key >>= 1) max_log_++; + if (max_key > (1u << max_log_)) max_log_++; + } + + if (FLAGS_use_clock_cache) { + cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits); + if (!cache_) { + fprintf(stderr, "Clock cache not supported.\n"); + exit(1); + } + } else { + LRUCacheOptions opts(FLAGS_cache_size, FLAGS_num_shard_bits, false, 0.5); + if (!FLAGS_tiered_cache_uri.empty()) { + Status s = ObjectRegistry::NewInstance()->NewSharedObject( + FLAGS_tiered_cache_uri, &tiered_cache); + if (tiered_cache == nullptr) { + fprintf(stderr, + "No tiered cache registered matching string: %s status=%s\n", + FLAGS_tiered_cache_uri.c_str(), s.ToString().c_str()); + exit(1); + } + opts.tiered_cache = tiered_cache; + } + + cache_ = NewLRUCache(opts); + } + if (FLAGS_ops_per_thread == 0) { + FLAGS_ops_per_thread = 5 * max_key_; + } + } + + ~CacheBench() {} + + void PopulateCache() { + Random64 rnd(1); + KeyGen keygen; + for (uint64_t i = 0; i < 10 * FLAGS_cache_size; i += FLAGS_value_bytes) { + cache_->Insert(keygen.GetRand(rnd, max_key_, max_log_), createValue(rnd), + helperCallback, FLAGS_value_bytes); + } + } + + bool Run() { + ROCKSDB_NAMESPACE::Env* env = ROCKSDB_NAMESPACE::Env::Default(); + const auto& clock = env->GetSystemClock(); + + PrintEnv(); + SharedState shared(this); + std::vector > threads(FLAGS_threads); + for (uint32_t i = 0; i < FLAGS_threads; i++) { + threads[i].reset(new ThreadState(i, &shared)); + env->StartThread(ThreadBody, threads[i].get()); + } + { + MutexLock l(shared.GetMutex()); + while (!shared.AllInitialized()) { + shared.GetCondVar()->Wait(); + } + // Record start time + uint64_t start_time = clock->NowMicros(); + + // Start all threads + shared.SetStart(); + shared.GetCondVar()->SignalAll(); + + // Wait threads to complete + while (!shared.AllDone()) { + shared.GetCondVar()->Wait(); + } + + // Record end time + uint64_t end_time = clock->NowMicros(); + double elapsed = static_cast(end_time - start_time) * 1e-6; + uint32_t qps = static_cast( + static_cast(FLAGS_threads * FLAGS_ops_per_thread) / elapsed); + fprintf(stdout, "Complete in %.3f s; QPS = %u\n", elapsed, qps); + + Stats merge_stats; + for (uint32_t i = 0; i < FLAGS_threads; ++i) { + merge_stats.Merge(threads[i]->stats); + } + merge_stats.Report(); + } + return true; + } + + private: + std::shared_ptr cache_; + const uint64_t max_key_; + // Cumulative thresholds in the space of a random uint64_t + const uint64_t lookup_insert_threshold_; + const uint64_t insert_threshold_; + const uint64_t lookup_threshold_; + const uint64_t erase_threshold_; + const bool skewed_; + int max_log_; + + static void ThreadBody(void* v) { + ThreadState* thread = static_cast(v); + SharedState* shared = thread->shared; + + { + MutexLock l(shared->GetMutex()); + shared->IncInitialized(); + if (shared->AllInitialized()) { + shared->GetCondVar()->SignalAll(); + } + while (!shared->Started()) { + shared->GetCondVar()->Wait(); + } + } + thread->shared->GetCacheBench()->OperateCache(thread); + + { + MutexLock l(shared->GetMutex()); + shared->IncDone(); + if (shared->AllDone()) { + shared->GetCondVar()->SignalAll(); + } + } + } + + void OperateCache(ThreadState* thread) { + // To use looked-up values + uint64_t result = 0; + // To hold handles for a non-trivial amount of time + Cache::Handle* handle = nullptr; + KeyGen gen; + ROCKSDB_NAMESPACE::Env* env = ROCKSDB_NAMESPACE::Env::Default(); + SystemClock* clock = env->GetSystemClock().get(); + for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { + Slice key = gen.GetRand(thread->rnd, max_key_, max_log_); + uint64_t random_op = thread->rnd.Next(); + Cache::CreateCallback create_cb = + [](void* buf, size_t size, void** out_obj, size_t* charge) -> Status { + *out_obj = reinterpret_cast(new char[size]); + memcpy(*out_obj, buf, size); + *charge = size; + return Status::OK(); + }; + + if (random_op < lookup_insert_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // do lookup + { + uint64_t elapsed = 0; + { + StopWatch sw(clock, nullptr, 0, &elapsed); + handle = cache_->Lookup(key, helperCallback, create_cb, + Cache::Priority::LOW, true); + } + thread->stats.AddLookupUs(elapsed); + } + if (handle) { + thread->stats.AddHits(1); + // do something with the data + result += NPHash64(static_cast(cache_->Value(handle)), + FLAGS_value_bytes); + } else { + thread->stats.AddMisses(1); + // do insert + { + uint64_t elapsed = 0; + { + StopWatch sw(clock, nullptr, 0, &elapsed); + cache_->Insert(key, createValue(thread->rnd), helperCallback, + FLAGS_value_bytes, &handle); + } + thread->stats.AddInsertUs(elapsed); + } + } + } else if (random_op < insert_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // do insert + { + uint64_t elapsed = 0; + { + StopWatch sw(clock, nullptr, 0, &elapsed); + cache_->Insert(key, createValue(thread->rnd), helperCallback, + FLAGS_value_bytes, &handle); + } + thread->stats.AddInsertUs(elapsed); + } + } else if (random_op < lookup_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // do lookup + { + uint64_t elapsed = 0; + { + StopWatch sw(clock, nullptr, 0, &elapsed); + handle = cache_->Lookup(key, helperCallback, create_cb, + Cache::Priority::LOW, true); + } + thread->stats.AddLookupUs(elapsed); + } + if (handle) { + thread->stats.AddHits(1); + // do something with the data + result += NPHash64(static_cast(cache_->Value(handle)), + FLAGS_value_bytes); + } + } else if (random_op < erase_threshold_) { + // do erase + cache_->Erase(key); + } else { + // Should be extremely unlikely (noop) + assert(random_op >= kHundredthUint64 * 100U); + } + } + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + } + + void PrintEnv() const { + printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); + printf("Number of threads : %u\n", FLAGS_threads); + printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); + printf("Cache size : %" PRIu64 "\n", FLAGS_cache_size); + printf("Num shard bits : %u\n", FLAGS_num_shard_bits); + printf("Max key : %" PRIu64 "\n", max_key_); + printf("Resident ratio : %g\n", FLAGS_resident_ratio); + printf("Skew degree : %u\n", FLAGS_skew); + printf("Populate cache : %d\n", int{FLAGS_populate_cache}); + printf("Lookup+Insert pct : %u%%\n", FLAGS_lookup_insert_percent); + printf("Insert percentage : %u%%\n", FLAGS_insert_percent); + printf("Lookup percentage : %u%%\n", FLAGS_lookup_percent); + printf("Erase percentage : %u%%\n", FLAGS_erase_percent); + printf("----------------------------\n"); + } +}; + +int cache_bench_tool(int argc, char** argv) { + ParseCommandLineFlags(&argc, &argv, true); + + if (FLAGS_threads <= 0) { + fprintf(stderr, "threads number <= 0\n"); + exit(1); + } + + ROCKSDB_NAMESPACE::CacheBench bench; + if (FLAGS_populate_cache) { + bench.PopulateCache(); + printf("Population complete\n"); + printf("----------------------------\n"); + } + if (bench.Run()) { + return 0; + } else { + return 1; + } +} +} // namespace ROCKSDB_NAMESPACE + +#endif // GFLAGS diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index b5301976c..a5b49218a 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -377,7 +377,6 @@ Cache::Handle* LRUCacheShard::Lookup( e->SetPromoted(true); e->SetTieredCacheCompatible(true); e->info_.helper_cb = helper_cb; - e->charge = tiered_handle->Size(); e->key_length = key.size(); e->hash = hash; e->refs = 0; diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index bae4f0d91..9b9164eb6 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -248,38 +248,6 @@ class Cache { Handle** handle = nullptr, Priority priority = Priority::LOW) = 0; - // Insert a mapping from key->value into the volatile cache and assign it - // the specified charge against the total cache capacity. - // If strict_capacity_limit is true and cache reaches its full capacity, - // return Status::Incomplete. - // - // If handle is not nullptr, returns a handle that corresponds to the - // mapping. The caller must call this->Release(handle) when the returned - // mapping is no longer needed. In case of error caller is responsible to - // cleanup the value (i.e. calling "deleter"). - // - // If handle is nullptr, it is as if Release is called immediately after - // insert. In case of error value will be cleanup. - // - // Regardless of whether the item was inserted into the volatile cache, - // it will attempt to insert it into the NVM cache if one is configured. - // The block cache implementation must support the NVM tier, otherwise - // the item is only inserted into the volatile tier. It may - // defer the insertion to NVM as it sees fit. The NVM - // cache may or may not write it to NVM depending on its admission - // policy. - // - // When the inserted entry is no longer needed, the key and - // value will be passed to "deleter". - virtual Status Insert(const Slice& key, void* value, - CacheItemHelperCallback helper_cb, size_t charge, - Handle** handle = nullptr, - Priority priority = Priority::LOW) { - DeletionCallback delete_cb = nullptr; - (*helper_cb)(nullptr, nullptr, &delete_cb); - return Insert(key, value, charge, delete_cb, handle, priority); - } - // If the cache has no mapping for "key", returns nullptr. // // Else return a handle that corresponds to the mapping. The caller @@ -289,25 +257,6 @@ class Cache { // function. virtual Handle* Lookup(const Slice& key, Statistics* stats = nullptr) = 0; - // Lookup the key in the volatile and NVM tiers (if one is configured). - // The create_cb callback function object will be used to contruct the - // cached object. - // If none of the tiers have the mapping for the key, rturns nullptr. - // Else, returns a handle that corresponds to the mapping. - // - // The handle returned may not be ready. The caller should call isReady() - // to check if the item value is ready, and call Wait() or WaitAll() if - // its not ready. The caller should then call Value() to check if the - // item was successfully retrieved. If unsuccessful (perhaps due to an - // IO error), Value() will return nullptr. - virtual Handle* Lookup(const Slice& key, - CacheItemHelperCallback /*helper_cb*/, - const CreateCallback& /*create_cb*/, - Priority /*priority*/, bool /*wait*/, - Statistics* stats = nullptr) { - return Lookup(key, stats); - } - // Increments the reference count for the handle if it refers to an entry in // the cache. Returns true if refcount was incremented; otherwise, returns // false. @@ -328,27 +277,6 @@ class Cache { // REQUIRES: handle must have been returned by a method on *this. virtual bool Release(Handle* handle, bool force_erase = false) = 0; - // Release a mapping returned by a previous Lookup(). The "useful" - // parameter specifies whether the data was actually used or not, - // which may be used by the cache implementation to decide whether - // to consider it as a hit for retention purposes. - virtual bool Release(Handle* handle, bool /*useful*/, bool force_erase) { - return Release(handle, force_erase); - } - - // Determines if the handle returned by Lookup() has a valid value yet. - virtual bool isReady(Handle* /*handle*/) { return true; } - - // If the handle returned by Lookup() is not ready yet, wait till it - // becomes ready. - // Note: A ready handle doesn't necessarily mean it has a valid value. The - // user should call Value() and check for nullptr. - virtual void Wait(Handle* /*handle*/) {} - - // Wait for a vector of handles to become ready. As with Wait(), the user - // should check the Value() of each handle for nullptr - virtual void WaitAll(std::vector& /*handles*/) {} - // Return the value encapsulated in a handle returned by a // successful Lookup(). // REQUIRES: handle must not have been released yet. @@ -417,6 +345,78 @@ class Cache { MemoryAllocator* memory_allocator() const { return memory_allocator_.get(); } + // Insert a mapping from key->value into the volatile cache and assign it + // the specified charge against the total cache capacity. + // If strict_capacity_limit is true and cache reaches its full capacity, + // return Status::Incomplete. + // + // If handle is not nullptr, returns a handle that corresponds to the + // mapping. The caller must call this->Release(handle) when the returned + // mapping is no longer needed. In case of error caller is responsible to + // cleanup the value (i.e. calling "deleter"). + // + // If handle is nullptr, it is as if Release is called immediately after + // insert. In case of error value will be cleanup. + // + // Regardless of whether the item was inserted into the volatile cache, + // it will attempt to insert it into the NVM cache if one is configured. + // The block cache implementation must support the NVM tier, otherwise + // the item is only inserted into the volatile tier. It may + // defer the insertion to NVM as it sees fit. The NVM + // cache may or may not write it to NVM depending on its admission + // policy. + // + // When the inserted entry is no longer needed, the key and + // value will be passed to "deleter". + virtual Status Insert(const Slice& key, void* value, + CacheItemHelperCallback helper_cb, size_t charge, + Handle** handle = nullptr, + Priority priority = Priority::LOW) { + DeletionCallback delete_cb = nullptr; + (*helper_cb)(nullptr, nullptr, &delete_cb); + return Insert(key, value, charge, delete_cb, handle, priority); + } + + // Lookup the key in the volatile and NVM tiers (if one is configured). + // The create_cb callback function object will be used to contruct the + // cached object. + // If none of the tiers have the mapping for the key, rturns nullptr. + // Else, returns a handle that corresponds to the mapping. + // + // The handle returned may not be ready. The caller should call isReady() + // to check if the item value is ready, and call Wait() or WaitAll() if + // its not ready. The caller should then call Value() to check if the + // item was successfully retrieved. If unsuccessful (perhaps due to an + // IO error), Value() will return nullptr. + virtual Handle* Lookup(const Slice& key, + CacheItemHelperCallback /*helper_cb*/, + const CreateCallback& /*create_cb*/, + Priority /*priority*/, bool /*wait*/, + Statistics* stats = nullptr) { + return Lookup(key, stats); + } + + // Release a mapping returned by a previous Lookup(). The "useful" + // parameter specifies whether the data was actually used or not, + // which may be used by the cache implementation to decide whether + // to consider it as a hit for retention purposes. + virtual bool Release(Handle* handle, bool /*useful*/, bool force_erase) { + return Release(handle, force_erase); + } + + // Determines if the handle returned by Lookup() has a valid value yet. + virtual bool isReady(Handle* /*handle*/) { return true; } + + // If the handle returned by Lookup() is not ready yet, wait till it + // becomes ready. + // Note: A ready handle doesn't necessarily mean it has a valid value. The + // user should call Value() and check for nullptr. + virtual void Wait(Handle* /*handle*/) {} + + // Wait for a vector of handles to become ready. As with Wait(), the user + // should check the Value() of each handle for nullptr + virtual void WaitAll(std::vector& /*handles*/) {} + private: std::shared_ptr memory_allocator_; }; diff --git a/include/rocksdb/cache_bench_tool.h b/include/rocksdb/cache_bench_tool.h new file mode 100644 index 000000000..413ce1593 --- /dev/null +++ b/include/rocksdb/cache_bench_tool.h @@ -0,0 +1,14 @@ +// Copyright (c) 2013-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" + +namespace ROCKSDB_NAMESPACE { + +int cache_bench_tool(int argc, char** argv); +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/configurable.h b/include/rocksdb/configurable.h index f6f5cc726..5971ec44e 100644 --- a/include/rocksdb/configurable.h +++ b/include/rocksdb/configurable.h @@ -356,4 +356,8 @@ class Configurable { // Configurable option via std::vector options_; }; + +extern void RegisterConfigurableOptions( + Configurable& configurable, const std::string& name, void* opt_ptr, + const std::unordered_map* opt_map); } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/tiered_cache.h b/include/rocksdb/tiered_cache.h index a66e83e0c..e022db909 100644 --- a/include/rocksdb/tiered_cache.h +++ b/include/rocksdb/tiered_cache.h @@ -48,6 +48,8 @@ class TieredCache { virtual std::string Name() = 0; + static const char* Type() { return "TieredCache"; } + // Insert the given value into this tier. The value is not written // directly. Rather, the SaveToCallback provided by helper_cb will be // used to extract the persistable data in value, which will be written diff --git a/options/configurable.cc b/options/configurable.cc index aa2b957c4..f1e1e789b 100644 --- a/options/configurable.cc +++ b/options/configurable.cc @@ -31,6 +31,12 @@ void ConfigurableHelper::RegisterOptions( configurable.options_.emplace_back(opts); } +void RegisterConfigurableOptions( + Configurable& configurable, const std::string& name, void* opt_ptr, + const std::unordered_map* type_map) { + ConfigurableHelper::RegisterOptions(configurable, name, opt_ptr, type_map); +} + //************************************************************************* // // Methods for Initializing and Validating Configurable Objects diff --git a/src.mk b/src.mk index 1e487f4e8..2841e355f 100644 --- a/src.mk +++ b/src.mk @@ -320,6 +320,9 @@ MOCK_LIB_SOURCES = \ BENCH_LIB_SOURCES = \ tools/db_bench_tool.cc \ +CACHE_BENCH_LIB_SOURCES = \ + cache/cache_bench_tool.cc \ + STRESS_LIB_SOURCES = \ db_stress_tool/batched_ops_stress.cc \ db_stress_tool/cf_consistency_stress.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index ac5677407..561aadaef 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -53,8 +53,10 @@ #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/stats_history.h" +#include "rocksdb/tiered_cache.h" #include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/options_type.h" #include "rocksdb/utilities/options_util.h" #include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/transaction.h" @@ -1409,6 +1411,10 @@ DEFINE_bool(read_with_latest_user_timestamp, true, "If true, always use the current latest timestamp for read. If " "false, choose a random timestamp from the past."); +DEFINE_string(tiered_cache_uri, "", + "Full URI for creating a custom tiered cache object"); +static class std::shared_ptr tiered_cache; + static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit); @@ -2757,22 +2763,35 @@ class Benchmark { } return cache; } else { - if (FLAGS_use_cache_memkind_kmem_allocator) { + LRUCacheOptions opts( + static_cast(capacity), FLAGS_cache_numshardbits, + false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio, #ifdef MEMKIND - return NewLRUCache( - static_cast(capacity), FLAGS_cache_numshardbits, - false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio, - std::make_shared()); - + FLAGS_use_cache_memkind_kmem_allocator + ? std::make_shared() + : nullptr #else + nullptr +#endif + ); + if (FLAGS_use_cache_memkind_kmem_allocator) { +#ifndef MEMKIND fprintf(stderr, "Memkind library is not linked with the binary."); exit(1); #endif - } else { - return NewLRUCache( - static_cast(capacity), FLAGS_cache_numshardbits, - false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio); } + if (!FLAGS_tiered_cache_uri.empty()) { + Status s = ObjectRegistry::NewInstance()->NewSharedObject( + FLAGS_tiered_cache_uri, &tiered_cache); + if (tiered_cache == nullptr) { + fprintf(stderr, + "No tiered cache registered matching string: %s status=%s\n", + FLAGS_tiered_cache_uri.c_str(), s.ToString().c_str()); + exit(1); + } + opts.tiered_cache = tiered_cache; + } + return NewLRUCache(opts); } }