From 4cc37f59e5bae1ebacd2c9b7dc4a8c635bb8d99a Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 19 Aug 2016 12:28:19 -0700 Subject: [PATCH] Introduce ClockCache Summary: Clock-based cache implemenetation aim to have better concurreny than default LRU cache. See inline comments for implementation details. Test Plan: Update cache_test to run on both LRUCache and ClockCache. Adding some new tests to catch some of the bugs that I fixed while implementing the cache. Reviewers: kradhakrishnan, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D61647 --- CMakeLists.txt | 1 + HISTORY.md | 2 + include/rocksdb/cache.h | 15 +- src.mk | 1 + tools/db_bench_tool.cc | 48 +-- tools/db_crashtest.py | 2 + tools/db_stress.cc | 33 +- util/cache_bench.cc | 16 +- util/cache_test.cc | 107 ++++-- util/clock_cache.cc | 700 ++++++++++++++++++++++++++++++++++++++++ util/clock_cache.h | 16 + 11 files changed, 874 insertions(+), 67 deletions(-) create mode 100644 util/clock_cache.cc create mode 100644 util/clock_cache.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f985c2a46..b2be6ff07 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -193,6 +193,7 @@ set(SOURCES util/arena.cc util/bloom.cc util/build_version.cc + util/clock_cache.cc util/coding.cc util/compaction_job_stats_impl.cc util/comparator.cc diff --git a/HISTORY.md b/HISTORY.md index ea9227aff..3c499fff3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,7 @@ # Rocksdb Change Log ## Unreleased +### New Features +* Introduce NewClockCache, which is based on CLOCK algorithm with better concurrent performance in some cases. It can be used to replace the default LRU-based block cache and table cache. To use it, RocksDB need to be linked with TBB lib. ## 4.11.0 (8/1/2016) ### Public API Change diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index c74de986d..3373d6008 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -2,6 +2,7 @@ // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +// // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. @@ -19,8 +20,7 @@ // they want something more sophisticated (like scan-resistance, a // custom eviction policy, variable cache sizing, etc.) -#ifndef STORAGE_ROCKSDB_INCLUDE_CACHE_H_ -#define STORAGE_ROCKSDB_INCLUDE_CACHE_H_ +#pragma once #include #include @@ -38,6 +38,15 @@ extern std::shared_ptr NewLRUCache(size_t capacity, int num_shard_bits = 6, bool strict_capacity_limit = false); +// Similar to NewLRUCache, but create a cache based on CLOCK algorithm with +// better concurrent performance in some cases. See util/clock_cache.cc for +// more detail. +// +// Return nullptr if it is not supported. +extern std::shared_ptr NewClockCache(size_t capacity, + int num_shard_bits = 6, + bool strict_capacity_limit = false); + class Cache { public: Cache() {} @@ -153,5 +162,3 @@ class Cache { }; } // namespace rocksdb - -#endif // STORAGE_ROCKSDB_UTIL_CACHE_H_ diff --git a/src.mk b/src.mk index fcea98db3..b8a9e3132 100644 --- a/src.mk +++ b/src.mk @@ -88,6 +88,7 @@ LIB_SOURCES = \ util/arena.cc \ util/bloom.cc \ util/build_version.cc \ + util/clock_cache.cc \ util/coding.cc \ util/comparator.cc \ util/compaction_job_stats_impl.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 6795096ac..a1ac63e69 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -349,13 +349,20 @@ DEFINE_int32(universal_compression_size_percent, -1, DEFINE_bool(universal_allow_trivial_move, false, "Allow trivial move in universal compaction."); -DEFINE_int64(cache_size, -1, - "Number of bytes to use as a cache of uncompressed" - " data. Negative means use default settings."); +DEFINE_int64(cache_size, 8 << 20, // 8MB + "Number of bytes to use as a cache of uncompressed data"); + +DEFINE_int32(cache_numshardbits, 6, + "Number of shards for the block cache" + " is 2 ** cache_numshardbits. Negative means use default settings." + " This is applied only if FLAGS_cache_size is non-negative."); + +DEFINE_bool(use_clock_cache, false, + "Replace default LRU block cache with clock cache."); DEFINE_int64(simcache_size, -1, "Number of bytes to use as a simcache of " - "uncompressed data. Negative means use default settings."); + "uncompressed data. Nagative value disables simcache."); DEFINE_bool(cache_index_and_filter_blocks, false, "Cache index/filter blocks in block cache."); @@ -433,9 +440,6 @@ static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) { } return true; } -DEFINE_int32(cache_numshardbits, -1, "Number of shards for the block cache" - " is 2 ** cache_numshardbits. Negative means use default settings." - " This is applied only if FLAGS_cache_size is non-negative."); DEFINE_bool(verify_checksum, false, "Verify checksum for every block read" " from storage"); @@ -1877,20 +1881,26 @@ class Benchmark { std::shared_ptr timestamp_emulator_; }; + std::shared_ptr NewCache(int64_t capacity) { + if (capacity <= 0) { + return nullptr; + } + if (FLAGS_use_clock_cache) { + auto cache = NewClockCache((size_t)capacity, FLAGS_cache_numshardbits); + if (!cache) { + fprintf(stderr, "Clock cache not supported."); + exit(1); + } + return cache; + } else { + return NewLRUCache((size_t)capacity, FLAGS_cache_numshardbits); + } + } + public: Benchmark() - : cache_( - FLAGS_cache_size >= 0 - ? (FLAGS_cache_numshardbits >= 1 - ? NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits) - : NewLRUCache(FLAGS_cache_size)) - : nullptr), - compressed_cache_(FLAGS_compressed_cache_size >= 0 - ? (FLAGS_cache_numshardbits >= 1 - ? NewLRUCache(FLAGS_compressed_cache_size, - FLAGS_cache_numshardbits) - : NewLRUCache(FLAGS_compressed_cache_size)) - : nullptr), + : cache_(NewCache(FLAGS_cache_size)), + compressed_cache_(NewCache(FLAGS_compressed_cache_size)), filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits, FLAGS_use_block_based_filter) diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index b9dd78087..37d112030 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -19,6 +19,7 @@ import argparse default_params = { "block_size": 16384, "cache_size": 1048576, + "use_clock_cache": lambda: random.choice(["true", "false"]), "delpercent": 5, "destroy_db_initially": 0, "disable_data_sync": 0, @@ -84,6 +85,7 @@ whitebox_default_params = { simple_default_params = { "block_size": 16384, "cache_size": 1048576, + "use_clock_cache": lambda: random.choice(["true", "false"]), "column_families": 1, "delpercent": 5, "destroy_db_initially": 0, diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 9795230fe..e3724ffa8 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -228,6 +228,9 @@ DEFINE_int32(set_in_place_one_in, 0, DEFINE_int64(cache_size, 2LL * KB * KB * KB, "Number of bytes to use as a cache of uncompressed data."); +DEFINE_bool(use_clock_cache, false, + "Replace default LRU block cache with clock cache."); + DEFINE_uint64(subcompactions, 1, "Maximum number of subcompactions to divide L0-L1 compactions " "into."); @@ -993,15 +996,13 @@ class DbStressListener : public EventListener { class StressTest { public: StressTest() - : cache_(NewLRUCache(FLAGS_cache_size)), - compressed_cache_(FLAGS_compressed_cache_size >= 0 - ? NewLRUCache(FLAGS_compressed_cache_size) - : nullptr), + : cache_(NewCache(FLAGS_cache_size)), + compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)), filter_policy_(FLAGS_bloom_bits >= 0 - ? FLAGS_use_block_based_filter - ? NewBloomFilterPolicy(FLAGS_bloom_bits, true) - : NewBloomFilterPolicy(FLAGS_bloom_bits, false) - : nullptr), + ? FLAGS_use_block_based_filter + ? NewBloomFilterPolicy(FLAGS_bloom_bits, true) + : NewBloomFilterPolicy(FLAGS_bloom_bits, false) + : nullptr), db_(nullptr), new_column_family_name_(1), num_times_reopened_(0) { @@ -1025,6 +1026,22 @@ class StressTest { delete db_; } + std::shared_ptr NewCache(size_t capacity) { + if (capacity <= 0) { + return nullptr; + } + if (FLAGS_use_clock_cache) { + auto cache = NewClockCache((size_t)capacity); + if (!cache) { + fprintf(stderr, "Clock cache not supported."); + exit(1); + } + return cache; + } else { + return NewLRUCache((size_t)capacity); + } + } + bool BuildOptionsTable() { if (FLAGS_set_options_one_in <= 0) { return true; diff --git a/util/cache_bench.cc b/util/cache_bench.cc index 266c9e1c5..905d6bdd6 100644 --- a/util/cache_bench.cc +++ b/util/cache_bench.cc @@ -46,6 +46,8 @@ DEFINE_int32(lookup_percent, 50, DEFINE_int32(erase_percent, 10, "Ratio of erase to total workload (expressed as a percentage)"); +DEFINE_bool(use_clock_cache, false, ""); + namespace rocksdb { class CacheBench; @@ -129,9 +131,17 @@ struct ThreadState { class CacheBench { public: - CacheBench() : - cache_(NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits)), - num_threads_(FLAGS_threads) {} + CacheBench() : num_threads_(FLAGS_threads) { + if (FLAGS_use_clock_cache) { + cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits); + if (!cache_) { + fprintf(stderr, "Clock cache not supported.\n"); + exit(1); + } + } else { + cache_ = NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits); + } + } ~CacheBench() {} diff --git a/util/cache_test.cc b/util/cache_test.cc index 51cac7294..a7feb55f3 100644 --- a/util/cache_test.cc +++ b/util/cache_test.cc @@ -10,9 +10,11 @@ #include "rocksdb/cache.h" #include -#include -#include +#include #include +#include +#include +#include "util/clock_cache.h" #include "util/coding.h" #include "util/string_util.h" #include "util/testharness.h" @@ -34,7 +36,16 @@ static int DecodeValue(void* v) { return static_cast(reinterpret_cast(v)); } -class CacheTest : public testing::Test { +typedef std::function(size_t, int, bool)> NewCache; + +void dumbDeleter(const Slice& key, void* value) {} + +void eraseDeleter(const Slice& key, void* value) { + Cache* cache = reinterpret_cast(value); + cache->Erase("foo"); +} + +class CacheTest : public testing::TestWithParam { public: static CacheTest* current_; @@ -54,15 +65,17 @@ class CacheTest : public testing::Test { shared_ptr cache_; shared_ptr cache2_; - CacheTest() : - cache_(NewLRUCache(kCacheSize, kNumShardBits)), - cache2_(NewLRUCache(kCacheSize2, kNumShardBits2)) { + CacheTest() + : cache_(GetNewCache()(kCacheSize, kNumShardBits, false)), + cache2_(GetNewCache()(kCacheSize2, kNumShardBits2, false)) { current_ = this; } ~CacheTest() { } + NewCache GetNewCache() { return GetParam(); } + int Lookup(shared_ptr cache, int key) { Cache::Handle* handle = cache->Lookup(EncodeKey(key)); const int r = (handle == nullptr) ? -1 : DecodeValue(cache->Value(handle)); @@ -108,14 +121,10 @@ class CacheTest : public testing::Test { }; CacheTest* CacheTest::current_; -namespace { -void dumbDeleter(const Slice& key, void* value) { } -} // namespace - -TEST_F(CacheTest, UsageTest) { +TEST_P(CacheTest, UsageTest) { // cache is shared_ptr and will be automatically cleaned up. const uint64_t kCapacity = 100000; - auto cache = NewLRUCache(kCapacity, 8); + auto cache = GetNewCache()(kCapacity, 8, false); size_t usage = 0; char value[10] = "abcdef"; @@ -140,10 +149,10 @@ TEST_F(CacheTest, UsageTest) { ASSERT_LT(kCapacity * 0.95, cache->GetUsage()); } -TEST_F(CacheTest, PinnedUsageTest) { +TEST_P(CacheTest, PinnedUsageTest) { // cache is shared_ptr and will be automatically cleaned up. const uint64_t kCapacity = 100000; - auto cache = NewLRUCache(kCapacity, 8); + auto cache = GetNewCache()(kCapacity, 8, false); size_t pinned_usage = 0; char value[10] = "abcdef"; @@ -192,7 +201,7 @@ TEST_F(CacheTest, PinnedUsageTest) { } } -TEST_F(CacheTest, HitAndMiss) { +TEST_P(CacheTest, HitAndMiss) { ASSERT_EQ(-1, Lookup(100)); Insert(100, 101); @@ -215,7 +224,13 @@ TEST_F(CacheTest, HitAndMiss) { ASSERT_EQ(101, deleted_values_[0]); } -TEST_F(CacheTest, Erase) { +TEST_P(CacheTest, InsertSameKey) { + Insert(1, 1); + Insert(1, 2); + ASSERT_EQ(2, Lookup(1)); +} + +TEST_P(CacheTest, Erase) { Erase(200); ASSERT_EQ(0U, deleted_keys_.size()); @@ -234,7 +249,7 @@ TEST_F(CacheTest, Erase) { ASSERT_EQ(1U, deleted_keys_.size()); } -TEST_F(CacheTest, EntriesArePinned) { +TEST_P(CacheTest, EntriesArePinned) { Insert(100, 101); Cache::Handle* h1 = cache_->Lookup(EncodeKey(100)); ASSERT_EQ(101, DecodeValue(cache_->Value(h1))); @@ -264,21 +279,20 @@ TEST_F(CacheTest, EntriesArePinned) { ASSERT_EQ(0U, cache_->GetUsage()); } -TEST_F(CacheTest, EvictionPolicy) { +TEST_P(CacheTest, EvictionPolicy) { Insert(100, 101); Insert(200, 201); // Frequently used entry must be kept around for (int i = 0; i < kCacheSize + 100; i++) { Insert(1000+i, 2000+i); - ASSERT_EQ(2000+i, Lookup(1000+i)); ASSERT_EQ(101, Lookup(100)); } ASSERT_EQ(101, Lookup(100)); ASSERT_EQ(-1, Lookup(200)); } -TEST_F(CacheTest, EvictionPolicyRef) { +TEST_P(CacheTest, EvictionPolicyRef) { Insert(100, 101); Insert(101, 102); Insert(102, 103); @@ -326,7 +340,24 @@ TEST_F(CacheTest, EvictionPolicyRef) { cache_->Release(h204); } -TEST_F(CacheTest, ErasedHandleState) { +TEST_P(CacheTest, EvictEmptyCache) { + // Insert item large than capacity to trigger eviction on empty cache. + auto cache = GetNewCache()(1, 0, false); + ASSERT_OK(cache->Insert("foo", nullptr, 10, dumbDeleter)); +} + +TEST_P(CacheTest, EraseFromDeleter) { + // Have deleter which will erase item from cache, which will re-enter + // the cache at that point. + std::shared_ptr cache = GetNewCache()(10, 0, false); + ASSERT_OK(cache->Insert("foo", nullptr, 1, dumbDeleter)); + ASSERT_OK(cache->Insert("bar", cache.get(), 1, eraseDeleter)); + cache->Erase("bar"); + ASSERT_EQ(nullptr, cache->Lookup("foo")); + ASSERT_EQ(nullptr, cache->Lookup("bar")); +} + +TEST_P(CacheTest, ErasedHandleState) { // insert a key and get two handles Insert(100, 1000); Cache::Handle* h1 = cache_->Lookup(EncodeKey(100)); @@ -348,7 +379,7 @@ TEST_F(CacheTest, ErasedHandleState) { cache_->Release(h2); } -TEST_F(CacheTest, HeavyEntries) { +TEST_P(CacheTest, HeavyEntries) { // Add a bunch of light and heavy entries and then count the combined // size of items still in the cache, which must be approximately the // same as the total capacity. @@ -375,7 +406,7 @@ TEST_F(CacheTest, HeavyEntries) { ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10); } -TEST_F(CacheTest, NewId) { +TEST_P(CacheTest, NewId) { uint64_t a = cache_->NewId(); uint64_t b = cache_->NewId(); ASSERT_NE(a, b); @@ -383,12 +414,10 @@ TEST_F(CacheTest, NewId) { class Value { - private: - size_t v_; public: explicit Value(size_t v) : v_(v) { } - ~Value() { std::cout << v_ << " is destructed\n"; } + size_t v_; }; namespace { @@ -397,12 +426,12 @@ void deleter(const Slice& key, void* value) { } } // namespace -TEST_F(CacheTest, SetCapacity) { +TEST_P(CacheTest, SetCapacity) { // test1: increase capacity // lets create a cache with capacity 5, // then, insert 5 elements, then increase capacity // to 10, returned capacity should be 10, usage=5 - std::shared_ptr cache = NewLRUCache(5, 0); + std::shared_ptr cache = GetNewCache()(5, 0, false); std::vector handles(10); // Insert 5 entries, but not releasing. for (size_t i = 0; i < 5; i++) { @@ -442,7 +471,7 @@ TEST_F(CacheTest, SetCapacity) { } } -TEST_F(CacheTest, SetStrictCapacityLimit) { +TEST_P(CacheTest, SetStrictCapacityLimit) { // test1: set the flag to false. Insert more keys than capacity. See if they // all go through. std::shared_ptr cache = NewLRUCache(5, 0, false); @@ -489,11 +518,11 @@ TEST_F(CacheTest, SetStrictCapacityLimit) { } } -TEST_F(CacheTest, OverCapacity) { +TEST_P(CacheTest, OverCapacity) { size_t n = 10; // a LRUCache with n entries and one shard only - std::shared_ptr cache = NewLRUCache(n, 0); + std::shared_ptr cache = GetNewCache()(n, 0, false); std::vector handles(n+1); @@ -508,7 +537,6 @@ TEST_F(CacheTest, OverCapacity) { for (size_t i = 0; i < n + 1; i++) { std::string key = ToString(i+1); auto h = cache->Lookup(key); - std::cout << key << (h?" found\n":" not found\n"); ASSERT_TRUE(h != nullptr); if (h) cache->Release(h); } @@ -518,6 +546,8 @@ TEST_F(CacheTest, OverCapacity) { for (size_t i = 0; i < n + 1; i++) { cache->Release(handles[i]); } + // Make sure eviction is triggered. + cache->SetCapacity(n); // cache is under capacity now since elements were released ASSERT_EQ(n, cache->GetUsage()); @@ -544,7 +574,7 @@ void callback(void* entry, size_t charge) { } }; -TEST_F(CacheTest, ApplyToAllCacheEntiresTest) { +TEST_P(CacheTest, ApplyToAllCacheEntiresTest) { std::vector> inserted; callback_state.clear(); @@ -559,6 +589,17 @@ TEST_F(CacheTest, ApplyToAllCacheEntiresTest) { ASSERT_TRUE(inserted == callback_state); } +shared_ptr (*newLRUCache)(size_t, int, bool) = NewLRUCache; +#ifdef SUPPORT_CLOCK_CACHE +shared_ptr (*newClockCache)(size_t, int, bool) = NewClockCache; +INSTANTIATE_TEST_CASE_P(CacheTestInstance, CacheTest, + testing::Values(NewCache(newLRUCache), + NewCache(newClockCache))); +#else +INSTANTIATE_TEST_CASE_P(CacheTestInstance, CacheTest, + testing::Values(NewCache(newLRUCache))); +#endif // SUPPORT_CLOCK_CACHE + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/util/clock_cache.cc b/util/clock_cache.cc new file mode 100644 index 000000000..b5477e80a --- /dev/null +++ b/util/clock_cache.cc @@ -0,0 +1,700 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/clock_cache.h" + +#ifndef SUPPORT_CLOCK_CACHE + +namespace rocksdb { + +std::shared_ptr NewClockCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) { + // Clock cache not supported. + return nullptr; +} + +} // namespace rocksdb + +#else + +#include +#include +#include +#include + +#include "tbb/concurrent_hash_map.h" + +#include "port/port.h" +#include "util/autovector.h" +#include "util/mutexlock.h" +#include "util/sharded_cache.h" + +namespace rocksdb { + +namespace { + +// An implementation of the Cache interface based on CLOCK algorithm, with +// better concurrent performance than LRUCache. The idea of CLOCK algorithm +// is to maintain all cache entries in a circular list, and an iterator +// (the "head") pointing to the last examined entry. Eviction starts from the +// current head. Each entry is given a second chance before eviction, if it +// has been access since last examine. In contrast to LRU, no modification +// to the internal data-structure (except for flipping the usage bit) needs +// to be done upon lookup. This gives us oppertunity to implement a cache +// with better concurrency. +// +// Each cache entry is represented by a cache handle, and all the handles +// are arranged in a circular list, as describe above. Upon erase of an entry, +// we never remove the handle. Instead, the handle is put into a recycle bin +// to be re-use. This is to avoid memory dealocation, which is hard to deal +// with in concurrent environment. +// +// The cache also maintains a concurrent hash map for lookup. Any concurrent +// hash map implementation should do the work. We currently use +// tbb::concurrent_hash_map because it supports concurrent erase. +// +// Each cache handle has the following flags and counters, which are squeeze +// in an atomic interger, to make sure the handle always be in a consistent +// state: +// +// * In-cache bit: whether the entry is reference by the cache itself. If +// an entry is in cache, its key would also be available in the hash map. +// * Usage bit: whether the entry has been access by user since last +// examine for eviction. Can be reset by eviction. +// * Reference count: reference count by user. +// +// An entry can be reference only when it's in cache. An entry can be evicted +// only when it is in cache, has no usage since last examine, and reference +// count is zero. +// +// The follow figure shows a possible layout of the cache. Boxes represents +// cache handles and numbers in each box being in-cache bit, usage bit and +// reference count respectively. +// +// hash map: +// +-------+--------+ +// | key | handle | +// +-------+--------+ +// | "foo" | 5 |-------------------------------------+ +// +-------+--------+ | +// | "bar" | 2 |--+ | +// +-------+--------+ | | +// | | +// head | | +// | | | +// circular list: | | | +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// |(0,0,0)|---|(1,1,0)|---|(0,0,0)|---|(0,1,3)|---|(1,0,0)|---| ... +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// | | +// +-------+ +-----------+ +// | | +// +---+---+ +// recycle bin: | 1 | 3 | +// +---+---+ +// +// Suppose we try to insert "baz" into the cache at this point and the cache is +// full. The cache will first look for entries to evict, starting from where +// head points to (the second entry). It resets usage bit of the second entry, +// skips the third and fourth entry since they are not in cache, and finally +// evict the fifth entry ("foo"). It looks at recycle bin for available handle, +// grabs handle 3, and insert the key into the handle. The following figure +// shows the resulting layout. +// +// hash map: +// +-------+--------+ +// | key | handle | +// +-------+--------+ +// | "baz" | 3 |-------------+ +// +-------+--------+ | +// | "bar" | 2 |--+ | +// +-------+--------+ | | +// | | +// | | head +// | | | +// circular list: | | | +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// |(0,0,0)|---|(1,0,0)|---|(1,0,0)|---|(0,1,3)|---|(0,0,0)|---| ... +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// | | +// +-------+ +-----------------------------------+ +// | | +// +---+---+ +// recycle bin: | 1 | 5 | +// +---+---+ +// +// A global mutex guards the circular list, the head, and the recycle bin. +// We additionally require that modifying the hash map needs to hold the mutex. +// As such, Modifying the cache (such as Insert() and Erase()) require to +// hold the mutex. Lookup() only access the hash map and the flags associated +// with each handle, and don't require explicit locking. Release() has to +// acquire the mutex only when it releases the last reference to the entry and +// the entry has been erased from cache explicitly. A future improvement could +// be to remove the mutex completely. +// +// Benchmark: +// We run readrandom db_bench on a test DB of size 13GB, with size of each +// level: +// +// Level Files Size(MB) +// ------------------------- +// L0 1 0.01 +// L1 18 17.32 +// L2 230 182.94 +// L3 1186 1833.63 +// L4 4602 8140.30 +// +// We test with both 32 and 16 read threads, with 2GB cache size (the whole DB +// doesn't fits in) and 64GB cache size (the whole DB can fit in cache), and +// whether to put index and filter blocks in block cache. The benchmark runs +// with +// with RocksDB 4.10. We got the following result: +// +// Threads Cache Cache ClockCache LRUCache +// Size Index/Filter Throughput(MB/s) Hit Throughput(MB/s) Hit +// 32 2GB yes 466.7 85.9% 433.7 86.5% +// 32 2GB no 529.9 72.7% 532.7 73.9% +// 32 64GB yes 649.9 99.9% 507.9 99.9% +// 32 64GB no 740.4 99.9% 662.8 99.9% +// 16 2GB yes 278.4 85.9% 283.4 86.5% +// 16 2GB no 318.6 72.7% 335.8 73.9% +// 16 64GB yes 391.9 99.9% 353.3 99.9% +// 16 64GB no 433.8 99.8% 419.4 99.8% + +// Cache entry meta data. +struct CacheHandle { + Slice key; + uint32_t hash; + void* value; + size_t charge; + void (*deleter)(const Slice&, void* value); + + // Flags and counters associated with the cache handle: + // lowest bit: n-cache bit + // second lowest bit: usage bit + // the rest bits: reference count + // The handle is unused when flags equals to 0. The thread decreases the count + // to 0 is responsible to put the handle back to recycle_ and cleanup memory. + std::atomic flags; + + CacheHandle() = default; + + CacheHandle(const CacheHandle& a) { *this = a; } + + CacheHandle& operator=(const CacheHandle& a) { + // Only copy members needed for deletion. + key = a.key; + value = a.value; + deleter = a.deleter; + return *this; + } +}; + +// Key of hash map. We store hash value with the key for convenience. +struct CacheKey { + Slice key; + uint32_t hash_value; + + CacheKey() = default; + + CacheKey(const Slice& k, uint32_t h) { + key = k; + hash_value = h; + } + + static bool equal(const CacheKey& a, const CacheKey& b) { + return a.hash_value == b.hash_value && a.key == b.key; + } + + static size_t hash(const CacheKey& a) { + return static_cast(a.hash_value); + } +}; + +struct CleanupContext { + // List of values to be deleted, along with the key and deleter. + autovector to_delete_value; + + // List of keys to be deleted. + autovector to_delete_key; +}; + +// A cache shard which maintains its own CLOCK cache. +class ClockCacheShard : public CacheShard { + public: + // Hash map type. + typedef tbb::concurrent_hash_map HashTable; + + ClockCacheShard(); + ~ClockCacheShard() = default; + + // Interfaces + virtual void SetCapacity(size_t capacity) override; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle) override; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + virtual void Release(Cache::Handle* handle) override; + virtual void Erase(const Slice& key, uint32_t hash) override; + virtual size_t GetUsage() const override; + virtual size_t GetPinnedUsage() const override; + virtual void EraseUnRefEntries() override; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + + private: + static const uint32_t kInCacheBit = 1; + static const uint32_t kUsageBit = 2; + static const uint32_t kRefsOffset = 2; + static const uint32_t kOneRef = 1 << kRefsOffset; + + // Helper functions to extract cache handle flags and counters. + static bool InCache(uint32_t flags) { return flags & kInCacheBit; } + static bool HasUsage(uint32_t flags) { return flags & kUsageBit; } + static uint32_t CountRefs(uint32_t flags) { return flags >> kRefsOffset; } + + // If the entry in in cache, increase reference count and return true. + // Return false otherwise. + // + // Not necessary to hold mutex_ before being called. + bool Ref(CacheHandle* handle); + + // Decrease reference count of the entry. If this decreases the count to 0, + // recycle the entry. If set_usage is true, also set the usage bit. + // + // Not necessary to hold mutex_ before being called. + void Unref(CacheHandle* handle, bool set_usage, CleanupContext* context); + + // Unset in-cache bit of the entry. Recycle the handle if necessary. + // + // Has to hold mutex_ before being called. + void UnsetInCache(CacheHandle* handle, CleanupContext* context); + + // Put the handle back to recycle_ list, and put the value associated with + // it into to-be-deleted list. It doesn't cleanup the key as it might be + // reused by another handle. + // + // Has to hold mutex_ before being called. + void RecycleHandle(CacheHandle* handle, CleanupContext* context); + + // Remove the key from hash map. Put the key associated with the entry into + // to be deleted list. + // + // Has to hold mutex_ before being called. + void EraseKey(CacheHandle* handle, CleanupContext* context); + + // Delete keys and values in to-be-deleted list. Call the method without + // holding mutex, as destructors can be expensive. + void Cleanup(const CleanupContext& context); + + // Examine the handle for eviction. If the handle is in cache, usage bit is + // not set, and referece count is 0, evict it from cache. Otherwise unset + // the usage bit. + // + // Has to hold mutex_ before being called. + bool TryEvict(CacheHandle* value, CleanupContext* context); + + // Scan through the circular list, evict entries until we get enough capacity + // for new cache entry of specific size. Return true if success, false + // otherwise. + // + // Has to hold mutex_ before being called. + bool EvictFromCache(size_t charge, CleanupContext* context); + + CacheHandle* Insert(const Slice& key, uint32_t hash, void* value, + size_t change, + void (*deleter)(const Slice& key, void* value), + bool hold_reference, CleanupContext* context); + + // Guards list_, head_, and recycle_. In addition, updating table_ also has + // to hold the mutex, to avoid the cache being in inconsistent state. + mutable port::Mutex mutex_; + + // The circular list of cache handles. Initially the list is empty. Once a + // handle is needed by insertion, and no more handles are available in + // recycle bin, one more handle is appended to the end. + // + // We use std::deque for the circular list because we want to make sure + // pointers to handles are valid through out the life-cycle of the cache + // (in contrast to std::vector), and be able to grow the list (in contrast + // to statically allocated arrays). + std::deque list_; + + // Pointer to the next handle in the circular list to be examine for + // eviction. + size_t head_; + + // Recycle bin of cache handles. + autovector recycle_; + + // Maximum cache size. + std::atomic capacity_; + + // Current total size of the cache. + std::atomic usage_; + + // Total un-released cache size. + std::atomic pinned_usage_; + + // Whether allow insert into cache if cache is full. + std::atomic strict_capacity_limit_; + + // Hash table (tbb::concurrent_hash_map) for lookup. + HashTable table_; +}; + +ClockCacheShard::ClockCacheShard() + : head_(0), usage_(0), pinned_usage_(0), strict_capacity_limit_(false) {} + +size_t ClockCacheShard::GetUsage() const { + return usage_.load(std::memory_order_relaxed); +} + +size_t ClockCacheShard::GetPinnedUsage() const { + return pinned_usage_.load(std::memory_order_relaxed); +} + +void ClockCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + if (thread_safe) { + mutex_.Lock(); + } + for (auto& handle : list_) { + // Use relaxed semantics instead of acquire semantics since we are either + // holding mutex, or don't have thread safe requirement. + uint32_t flags = handle.flags.load(std::memory_order_relaxed); + if (InCache(flags)) { + callback(handle.value, handle.charge); + } + } + if (thread_safe) { + mutex_.Unlock(); + } +} + +void ClockCacheShard::RecycleHandle(CacheHandle* handle, + CleanupContext* context) { + mutex_.AssertHeld(); + assert(!InCache(handle->flags) && CountRefs(handle->flags) == 0); + // Only cleanup the value. The key may be reused by another handle. + context->to_delete_value.emplace_back(*handle); + recycle_.push_back(handle); + usage_.fetch_sub(handle->charge, std::memory_order_relaxed); +} + +void ClockCacheShard::EraseKey(CacheHandle* handle, CleanupContext* context) { + mutex_.AssertHeld(); + assert(!InCache(handle->flags)); + bool erased __attribute__((__unused__)) = + table_.erase(CacheKey(handle->key, handle->hash)); + assert(erased); + context->to_delete_key.push_back(handle->key.data()); +} + +void ClockCacheShard::Cleanup(const CleanupContext& context) { + for (const CacheHandle& handle : context.to_delete_value) { + if (handle.deleter) { + (*handle.deleter)(handle.key, handle.value); + } + } + for (const char* key : context.to_delete_key) { + delete[] key; + } +} + +bool ClockCacheShard::Ref(CacheHandle* handle) { + // CAS loop to increase reference count. + uint32_t flags = handle->flags.load(std::memory_order_relaxed); + while (InCache(flags)) { + // Use acquire semantics on success, as further operations on the cache + // entry has to be order after reference count is increased. + if (handle->flags.compare_exchange_weak(flags, flags + kOneRef, + std::memory_order_acquire, + std::memory_order_relaxed)) { + if (CountRefs(flags) == 0) { + // No reference count before the operation. + pinned_usage_.fetch_add(handle->charge, std::memory_order_relaxed); + } + return true; + } + } + return false; +} + +void ClockCacheShard::Unref(CacheHandle* handle, bool set_usage, + CleanupContext* context) { + if (set_usage) { + handle->flags.fetch_or(kUsageBit, std::memory_order_relaxed); + } + // Use acquire-release semantics as previous operations on the cache entry + // has to be order before reference count is decreased, and potential cleanup + // of the entry has to be order after. + uint32_t flags = handle->flags.fetch_sub(kOneRef, std::memory_order_acq_rel); + assert(CountRefs(flags) > 0); + if (CountRefs(flags) == 1) { + // this is the last reference. + pinned_usage_.fetch_sub(handle->charge, std::memory_order_relaxed); + // Cleanup if it is the last reference. + if (!InCache(flags)) { + MutexLock l(&mutex_); + RecycleHandle(handle, context); + } + } +} + +void ClockCacheShard::UnsetInCache(CacheHandle* handle, + CleanupContext* context) { + mutex_.AssertHeld(); + // Use acquire-release semantics as previous operations on the cache entry + // has to be order before reference count is decreased, and potential cleanup + // of the entry has to be order after. + uint32_t flags = + handle->flags.fetch_and(~kInCacheBit, std::memory_order_acq_rel); + // Cleanup if it is the last reference. + if (InCache(flags) && CountRefs(flags) == 0) { + RecycleHandle(handle, context); + } +} + +bool ClockCacheShard::TryEvict(CacheHandle* handle, CleanupContext* context) { + mutex_.AssertHeld(); + uint32_t flags = kInCacheBit; + if (handle->flags.compare_exchange_strong(flags, 0, std::memory_order_acquire, + std::memory_order_relaxed)) { + RecycleHandle(handle, context); + EraseKey(handle, context); + return true; + } + handle->flags.fetch_and(~kUsageBit, std::memory_order_relaxed); + return false; +} + +bool ClockCacheShard::EvictFromCache(size_t charge, CleanupContext* context) { + size_t usage = usage_.load(std::memory_order_relaxed); + size_t capacity = capacity_.load(std::memory_order_relaxed); + if (usage == 0) { + return charge <= capacity; + } + size_t new_head = head_; + bool second_iteration = false; + while (usage + charge > capacity) { + assert(new_head < list_.size()); + if (TryEvict(&list_[new_head], context)) { + usage = usage_.load(std::memory_order_relaxed); + } + new_head = (new_head + 1 >= list_.size()) ? 0 : new_head + 1; + if (new_head == head_) { + if (second_iteration) { + return false; + } else { + second_iteration = true; + } + } + } + head_ = new_head; + return true; +} + +void ClockCacheShard::SetCapacity(size_t capacity) { + CleanupContext context; + { + MutexLock l(&mutex_); + capacity_.store(capacity, std::memory_order_relaxed); + EvictFromCache(0, &context); + } + Cleanup(context); +} + +void ClockCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { + strict_capacity_limit_.store(strict_capacity_limit, + std::memory_order_relaxed); +} + +CacheHandle* ClockCacheShard::Insert( + const Slice& key, uint32_t hash, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), bool hold_reference, + CleanupContext* context) { + MutexLock l(&mutex_); + bool success = EvictFromCache(charge, context); + bool strict = strict_capacity_limit_.load(std::memory_order_relaxed); + if (!success && strict) { + return nullptr; + } + // Grab available handle from recycle bin. If recycle bin is empty, create + // and append new handle to end of circular list. + CacheHandle* handle = nullptr; + if (!recycle_.empty()) { + handle = recycle_.back(); + recycle_.pop_back(); + } else { + list_.emplace_back(); + handle = &list_.back(); + } + // Fill handle. + handle->key = key; + handle->hash = hash; + handle->value = value; + handle->charge = charge; + handle->deleter = deleter; + uint32_t flags = hold_reference ? kInCacheBit + kOneRef : kInCacheBit; + handle->flags.store(flags, std::memory_order_relaxed); + HashTable::accessor accessor; + if (table_.find(accessor, CacheKey(key, hash))) { + // Key exists. Replace with new handle, but keep the existing key since + // the key in hash table is back by the existing one. The new key will be + // deleted by Cleanup(). + CacheHandle* existing_handle = accessor->second; + context->to_delete_key.push_back(handle->key.data()); + handle->key = existing_handle->key; + accessor->second = handle; + accessor.release(); + UnsetInCache(existing_handle, context); + } else { + table_.insert(HashTable::value_type(CacheKey(key, hash), handle)); + } + if (hold_reference) { + pinned_usage_.fetch_add(charge, std::memory_order_relaxed); + } + usage_.fetch_add(charge, std::memory_order_relaxed); + return handle; +} + +Status ClockCacheShard::Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** h) { + CleanupContext context; + HashTable::accessor accessor; + char* key_data = new char[key.size()]; + memcpy(key_data, key.data(), key.size()); + Slice key_copy(key_data, key.size()); + CacheHandle* handle = + Insert(key_copy, hash, value, charge, deleter, h != nullptr, &context); + Status s; + if (h != nullptr) { + *h = reinterpret_cast(handle); + } + if (handle == nullptr) { + s = Status::Incomplete("Insert failed due to LRU cache being full."); + } + Cleanup(context); + return s; +} + +Cache::Handle* ClockCacheShard::Lookup(const Slice& key, uint32_t hash) { + HashTable::const_accessor accessor; + if (!table_.find(accessor, CacheKey(key, hash))) { + return nullptr; + } + CacheHandle* handle = accessor->second; + accessor.release(); + // Ref() could fail if another thread sneak in and evict/erase the cache + // entry before we are able to hold reference. + if (!Ref(handle)) { + return nullptr; + } + // Double check the key since the handle may now representing another key + // if other threads sneak in, evict/erase the entry and re-used the handle + // for another cache entry. + if (hash != handle->hash || key != handle->key) { + CleanupContext context; + Unref(handle, false, &context); + // It is possible Unref() delete the entry, so we need to cleanup. + Cleanup(context); + return nullptr; + } + return reinterpret_cast(handle); +} + +void ClockCacheShard::Release(Cache::Handle* h) { + CleanupContext context; + CacheHandle* handle = reinterpret_cast(h); + Unref(handle, true, &context); + Cleanup(context); +} + +void ClockCacheShard::Erase(const Slice& key, uint32_t hash) { + CleanupContext context; + { + MutexLock l(&mutex_); + HashTable::accessor accessor; + if (table_.find(accessor, CacheKey(key, hash))) { + CacheHandle* handle = accessor->second; + table_.erase(accessor); + UnsetInCache(handle, &context); + } + } + Cleanup(context); +} + +void ClockCacheShard::EraseUnRefEntries() { + CleanupContext context; + { + MutexLock l(&mutex_); + table_.clear(); + for (auto& handle : list_) { + UnsetInCache(&handle, &context); + } + } + Cleanup(context); +} + +class ClockCache : public ShardedCache { + public: + ClockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) { + int num_shards = 1 << num_shard_bits; + shards_ = new ClockCacheShard[num_shards]; + SetCapacity(capacity); + SetStrictCapacityLimit(strict_capacity_limit); + } + + virtual ~ClockCache() { delete[] shards_; } + + virtual const char* Name() const override { return "ClockCache"; } + + virtual CacheShard* GetShard(int shard) override { + return reinterpret_cast(&shards_[shard]); + } + + virtual const CacheShard* GetShard(int shard) const override { + return reinterpret_cast(&shards_[shard]); + } + + virtual void* Value(Handle* handle) override { + return reinterpret_cast(handle)->value; + } + + virtual size_t GetCharge(Handle* handle) const override { + return reinterpret_cast(handle)->charge; + } + + virtual uint32_t GetHash(Handle* handle) const override { + return reinterpret_cast(handle)->hash; + } + + virtual void DisownData() override { shards_ = nullptr; } + + private: + ClockCacheShard* shards_; +}; + +} // end anonymous namespace + +std::shared_ptr NewClockCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) { + return std::make_shared(capacity, num_shard_bits, + strict_capacity_limit); +} + +} // namespace rocksdb + +#endif // SUPPORT_CLOCK_CACHE diff --git a/util/clock_cache.h b/util/clock_cache.h new file mode 100644 index 000000000..2e5389d5c --- /dev/null +++ b/util/clock_cache.h @@ -0,0 +1,16 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/cache.h" + +#if defined(TBB) && !defined(ROCKSDB_LITE) +#define SUPPORT_CLOCK_CACHE +#endif