Introduce ClockCache
Summary: Clock-based cache implemenetation aim to have better concurreny than default LRU cache. See inline comments for implementation details. Test Plan: Update cache_test to run on both LRUCache and ClockCache. Adding some new tests to catch some of the bugs that I fixed while implementing the cache. Reviewers: kradhakrishnan, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D61647
This commit is contained in:
parent
ff17a2abf3
commit
4cc37f59e5
@ -193,6 +193,7 @@ set(SOURCES
|
|||||||
util/arena.cc
|
util/arena.cc
|
||||||
util/bloom.cc
|
util/bloom.cc
|
||||||
util/build_version.cc
|
util/build_version.cc
|
||||||
|
util/clock_cache.cc
|
||||||
util/coding.cc
|
util/coding.cc
|
||||||
util/compaction_job_stats_impl.cc
|
util/compaction_job_stats_impl.cc
|
||||||
util/comparator.cc
|
util/comparator.cc
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
# Rocksdb Change Log
|
# Rocksdb Change Log
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
### New Features
|
||||||
|
* Introduce NewClockCache, which is based on CLOCK algorithm with better concurrent performance in some cases. It can be used to replace the default LRU-based block cache and table cache. To use it, RocksDB need to be linked with TBB lib.
|
||||||
|
|
||||||
## 4.11.0 (8/1/2016)
|
## 4.11.0 (8/1/2016)
|
||||||
### Public API Change
|
### Public API Change
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
// This source code is licensed under the BSD-style license found in the
|
// This source code is licensed under the BSD-style license found in the
|
||||||
// LICENSE file in the root directory of this source tree. An additional grant
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
// of patent rights can be found in the PATENTS file in the same directory.
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||||
// Use of this source code is governed by a BSD-style license that can be
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
@ -19,8 +20,7 @@
|
|||||||
// they want something more sophisticated (like scan-resistance, a
|
// they want something more sophisticated (like scan-resistance, a
|
||||||
// custom eviction policy, variable cache sizing, etc.)
|
// custom eviction policy, variable cache sizing, etc.)
|
||||||
|
|
||||||
#ifndef STORAGE_ROCKSDB_INCLUDE_CACHE_H_
|
#pragma once
|
||||||
#define STORAGE_ROCKSDB_INCLUDE_CACHE_H_
|
|
||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
@ -38,6 +38,15 @@ extern std::shared_ptr<Cache> NewLRUCache(size_t capacity,
|
|||||||
int num_shard_bits = 6,
|
int num_shard_bits = 6,
|
||||||
bool strict_capacity_limit = false);
|
bool strict_capacity_limit = false);
|
||||||
|
|
||||||
|
// Similar to NewLRUCache, but create a cache based on CLOCK algorithm with
|
||||||
|
// better concurrent performance in some cases. See util/clock_cache.cc for
|
||||||
|
// more detail.
|
||||||
|
//
|
||||||
|
// Return nullptr if it is not supported.
|
||||||
|
extern std::shared_ptr<Cache> NewClockCache(size_t capacity,
|
||||||
|
int num_shard_bits = 6,
|
||||||
|
bool strict_capacity_limit = false);
|
||||||
|
|
||||||
class Cache {
|
class Cache {
|
||||||
public:
|
public:
|
||||||
Cache() {}
|
Cache() {}
|
||||||
@ -153,5 +162,3 @@ class Cache {
|
|||||||
};
|
};
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
#endif // STORAGE_ROCKSDB_UTIL_CACHE_H_
|
|
||||||
|
1
src.mk
1
src.mk
@ -88,6 +88,7 @@ LIB_SOURCES = \
|
|||||||
util/arena.cc \
|
util/arena.cc \
|
||||||
util/bloom.cc \
|
util/bloom.cc \
|
||||||
util/build_version.cc \
|
util/build_version.cc \
|
||||||
|
util/clock_cache.cc \
|
||||||
util/coding.cc \
|
util/coding.cc \
|
||||||
util/comparator.cc \
|
util/comparator.cc \
|
||||||
util/compaction_job_stats_impl.cc \
|
util/compaction_job_stats_impl.cc \
|
||||||
|
@ -349,13 +349,20 @@ DEFINE_int32(universal_compression_size_percent, -1,
|
|||||||
DEFINE_bool(universal_allow_trivial_move, false,
|
DEFINE_bool(universal_allow_trivial_move, false,
|
||||||
"Allow trivial move in universal compaction.");
|
"Allow trivial move in universal compaction.");
|
||||||
|
|
||||||
DEFINE_int64(cache_size, -1,
|
DEFINE_int64(cache_size, 8 << 20, // 8MB
|
||||||
"Number of bytes to use as a cache of uncompressed"
|
"Number of bytes to use as a cache of uncompressed data");
|
||||||
" data. Negative means use default settings.");
|
|
||||||
|
DEFINE_int32(cache_numshardbits, 6,
|
||||||
|
"Number of shards for the block cache"
|
||||||
|
" is 2 ** cache_numshardbits. Negative means use default settings."
|
||||||
|
" This is applied only if FLAGS_cache_size is non-negative.");
|
||||||
|
|
||||||
|
DEFINE_bool(use_clock_cache, false,
|
||||||
|
"Replace default LRU block cache with clock cache.");
|
||||||
|
|
||||||
DEFINE_int64(simcache_size, -1,
|
DEFINE_int64(simcache_size, -1,
|
||||||
"Number of bytes to use as a simcache of "
|
"Number of bytes to use as a simcache of "
|
||||||
"uncompressed data. Negative means use default settings.");
|
"uncompressed data. Nagative value disables simcache.");
|
||||||
|
|
||||||
DEFINE_bool(cache_index_and_filter_blocks, false,
|
DEFINE_bool(cache_index_and_filter_blocks, false,
|
||||||
"Cache index/filter blocks in block cache.");
|
"Cache index/filter blocks in block cache.");
|
||||||
@ -433,9 +440,6 @@ static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) {
|
|||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
DEFINE_int32(cache_numshardbits, -1, "Number of shards for the block cache"
|
|
||||||
" is 2 ** cache_numshardbits. Negative means use default settings."
|
|
||||||
" This is applied only if FLAGS_cache_size is non-negative.");
|
|
||||||
|
|
||||||
DEFINE_bool(verify_checksum, false, "Verify checksum for every block read"
|
DEFINE_bool(verify_checksum, false, "Verify checksum for every block read"
|
||||||
" from storage");
|
" from storage");
|
||||||
@ -1877,20 +1881,26 @@ class Benchmark {
|
|||||||
std::shared_ptr<TimestampEmulator> timestamp_emulator_;
|
std::shared_ptr<TimestampEmulator> timestamp_emulator_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
std::shared_ptr<Cache> NewCache(int64_t capacity) {
|
||||||
|
if (capacity <= 0) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
if (FLAGS_use_clock_cache) {
|
||||||
|
auto cache = NewClockCache((size_t)capacity, FLAGS_cache_numshardbits);
|
||||||
|
if (!cache) {
|
||||||
|
fprintf(stderr, "Clock cache not supported.");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
return cache;
|
||||||
|
} else {
|
||||||
|
return NewLRUCache((size_t)capacity, FLAGS_cache_numshardbits);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Benchmark()
|
Benchmark()
|
||||||
: cache_(
|
: cache_(NewCache(FLAGS_cache_size)),
|
||||||
FLAGS_cache_size >= 0
|
compressed_cache_(NewCache(FLAGS_compressed_cache_size)),
|
||||||
? (FLAGS_cache_numshardbits >= 1
|
|
||||||
? NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits)
|
|
||||||
: NewLRUCache(FLAGS_cache_size))
|
|
||||||
: nullptr),
|
|
||||||
compressed_cache_(FLAGS_compressed_cache_size >= 0
|
|
||||||
? (FLAGS_cache_numshardbits >= 1
|
|
||||||
? NewLRUCache(FLAGS_compressed_cache_size,
|
|
||||||
FLAGS_cache_numshardbits)
|
|
||||||
: NewLRUCache(FLAGS_compressed_cache_size))
|
|
||||||
: nullptr),
|
|
||||||
filter_policy_(FLAGS_bloom_bits >= 0
|
filter_policy_(FLAGS_bloom_bits >= 0
|
||||||
? NewBloomFilterPolicy(FLAGS_bloom_bits,
|
? NewBloomFilterPolicy(FLAGS_bloom_bits,
|
||||||
FLAGS_use_block_based_filter)
|
FLAGS_use_block_based_filter)
|
||||||
|
@ -19,6 +19,7 @@ import argparse
|
|||||||
default_params = {
|
default_params = {
|
||||||
"block_size": 16384,
|
"block_size": 16384,
|
||||||
"cache_size": 1048576,
|
"cache_size": 1048576,
|
||||||
|
"use_clock_cache": lambda: random.choice(["true", "false"]),
|
||||||
"delpercent": 5,
|
"delpercent": 5,
|
||||||
"destroy_db_initially": 0,
|
"destroy_db_initially": 0,
|
||||||
"disable_data_sync": 0,
|
"disable_data_sync": 0,
|
||||||
@ -84,6 +85,7 @@ whitebox_default_params = {
|
|||||||
simple_default_params = {
|
simple_default_params = {
|
||||||
"block_size": 16384,
|
"block_size": 16384,
|
||||||
"cache_size": 1048576,
|
"cache_size": 1048576,
|
||||||
|
"use_clock_cache": lambda: random.choice(["true", "false"]),
|
||||||
"column_families": 1,
|
"column_families": 1,
|
||||||
"delpercent": 5,
|
"delpercent": 5,
|
||||||
"destroy_db_initially": 0,
|
"destroy_db_initially": 0,
|
||||||
|
@ -228,6 +228,9 @@ DEFINE_int32(set_in_place_one_in, 0,
|
|||||||
DEFINE_int64(cache_size, 2LL * KB * KB * KB,
|
DEFINE_int64(cache_size, 2LL * KB * KB * KB,
|
||||||
"Number of bytes to use as a cache of uncompressed data.");
|
"Number of bytes to use as a cache of uncompressed data.");
|
||||||
|
|
||||||
|
DEFINE_bool(use_clock_cache, false,
|
||||||
|
"Replace default LRU block cache with clock cache.");
|
||||||
|
|
||||||
DEFINE_uint64(subcompactions, 1,
|
DEFINE_uint64(subcompactions, 1,
|
||||||
"Maximum number of subcompactions to divide L0-L1 compactions "
|
"Maximum number of subcompactions to divide L0-L1 compactions "
|
||||||
"into.");
|
"into.");
|
||||||
@ -993,10 +996,8 @@ class DbStressListener : public EventListener {
|
|||||||
class StressTest {
|
class StressTest {
|
||||||
public:
|
public:
|
||||||
StressTest()
|
StressTest()
|
||||||
: cache_(NewLRUCache(FLAGS_cache_size)),
|
: cache_(NewCache(FLAGS_cache_size)),
|
||||||
compressed_cache_(FLAGS_compressed_cache_size >= 0
|
compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
|
||||||
? NewLRUCache(FLAGS_compressed_cache_size)
|
|
||||||
: nullptr),
|
|
||||||
filter_policy_(FLAGS_bloom_bits >= 0
|
filter_policy_(FLAGS_bloom_bits >= 0
|
||||||
? FLAGS_use_block_based_filter
|
? FLAGS_use_block_based_filter
|
||||||
? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
|
? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
|
||||||
@ -1025,6 +1026,22 @@ class StressTest {
|
|||||||
delete db_;
|
delete db_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<Cache> NewCache(size_t capacity) {
|
||||||
|
if (capacity <= 0) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
if (FLAGS_use_clock_cache) {
|
||||||
|
auto cache = NewClockCache((size_t)capacity);
|
||||||
|
if (!cache) {
|
||||||
|
fprintf(stderr, "Clock cache not supported.");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
return cache;
|
||||||
|
} else {
|
||||||
|
return NewLRUCache((size_t)capacity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool BuildOptionsTable() {
|
bool BuildOptionsTable() {
|
||||||
if (FLAGS_set_options_one_in <= 0) {
|
if (FLAGS_set_options_one_in <= 0) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -46,6 +46,8 @@ DEFINE_int32(lookup_percent, 50,
|
|||||||
DEFINE_int32(erase_percent, 10,
|
DEFINE_int32(erase_percent, 10,
|
||||||
"Ratio of erase to total workload (expressed as a percentage)");
|
"Ratio of erase to total workload (expressed as a percentage)");
|
||||||
|
|
||||||
|
DEFINE_bool(use_clock_cache, false, "");
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
class CacheBench;
|
class CacheBench;
|
||||||
@ -129,9 +131,17 @@ struct ThreadState {
|
|||||||
|
|
||||||
class CacheBench {
|
class CacheBench {
|
||||||
public:
|
public:
|
||||||
CacheBench() :
|
CacheBench() : num_threads_(FLAGS_threads) {
|
||||||
cache_(NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits)),
|
if (FLAGS_use_clock_cache) {
|
||||||
num_threads_(FLAGS_threads) {}
|
cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits);
|
||||||
|
if (!cache_) {
|
||||||
|
fprintf(stderr, "Clock cache not supported.\n");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cache_ = NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
~CacheBench() {}
|
~CacheBench() {}
|
||||||
|
|
||||||
|
@ -10,9 +10,11 @@
|
|||||||
#include "rocksdb/cache.h"
|
#include "rocksdb/cache.h"
|
||||||
|
|
||||||
#include <forward_list>
|
#include <forward_list>
|
||||||
#include <vector>
|
#include <functional>
|
||||||
#include <string>
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
#include "util/clock_cache.h"
|
||||||
#include "util/coding.h"
|
#include "util/coding.h"
|
||||||
#include "util/string_util.h"
|
#include "util/string_util.h"
|
||||||
#include "util/testharness.h"
|
#include "util/testharness.h"
|
||||||
@ -34,7 +36,16 @@ static int DecodeValue(void* v) {
|
|||||||
return static_cast<int>(reinterpret_cast<uintptr_t>(v));
|
return static_cast<int>(reinterpret_cast<uintptr_t>(v));
|
||||||
}
|
}
|
||||||
|
|
||||||
class CacheTest : public testing::Test {
|
typedef std::function<std::shared_ptr<Cache>(size_t, int, bool)> NewCache;
|
||||||
|
|
||||||
|
void dumbDeleter(const Slice& key, void* value) {}
|
||||||
|
|
||||||
|
void eraseDeleter(const Slice& key, void* value) {
|
||||||
|
Cache* cache = reinterpret_cast<Cache*>(value);
|
||||||
|
cache->Erase("foo");
|
||||||
|
}
|
||||||
|
|
||||||
|
class CacheTest : public testing::TestWithParam<NewCache> {
|
||||||
public:
|
public:
|
||||||
static CacheTest* current_;
|
static CacheTest* current_;
|
||||||
|
|
||||||
@ -54,15 +65,17 @@ class CacheTest : public testing::Test {
|
|||||||
shared_ptr<Cache> cache_;
|
shared_ptr<Cache> cache_;
|
||||||
shared_ptr<Cache> cache2_;
|
shared_ptr<Cache> cache2_;
|
||||||
|
|
||||||
CacheTest() :
|
CacheTest()
|
||||||
cache_(NewLRUCache(kCacheSize, kNumShardBits)),
|
: cache_(GetNewCache()(kCacheSize, kNumShardBits, false)),
|
||||||
cache2_(NewLRUCache(kCacheSize2, kNumShardBits2)) {
|
cache2_(GetNewCache()(kCacheSize2, kNumShardBits2, false)) {
|
||||||
current_ = this;
|
current_ = this;
|
||||||
}
|
}
|
||||||
|
|
||||||
~CacheTest() {
|
~CacheTest() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NewCache GetNewCache() { return GetParam(); }
|
||||||
|
|
||||||
int Lookup(shared_ptr<Cache> cache, int key) {
|
int Lookup(shared_ptr<Cache> cache, int key) {
|
||||||
Cache::Handle* handle = cache->Lookup(EncodeKey(key));
|
Cache::Handle* handle = cache->Lookup(EncodeKey(key));
|
||||||
const int r = (handle == nullptr) ? -1 : DecodeValue(cache->Value(handle));
|
const int r = (handle == nullptr) ? -1 : DecodeValue(cache->Value(handle));
|
||||||
@ -108,14 +121,10 @@ class CacheTest : public testing::Test {
|
|||||||
};
|
};
|
||||||
CacheTest* CacheTest::current_;
|
CacheTest* CacheTest::current_;
|
||||||
|
|
||||||
namespace {
|
TEST_P(CacheTest, UsageTest) {
|
||||||
void dumbDeleter(const Slice& key, void* value) { }
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
TEST_F(CacheTest, UsageTest) {
|
|
||||||
// cache is shared_ptr and will be automatically cleaned up.
|
// cache is shared_ptr and will be automatically cleaned up.
|
||||||
const uint64_t kCapacity = 100000;
|
const uint64_t kCapacity = 100000;
|
||||||
auto cache = NewLRUCache(kCapacity, 8);
|
auto cache = GetNewCache()(kCapacity, 8, false);
|
||||||
|
|
||||||
size_t usage = 0;
|
size_t usage = 0;
|
||||||
char value[10] = "abcdef";
|
char value[10] = "abcdef";
|
||||||
@ -140,10 +149,10 @@ TEST_F(CacheTest, UsageTest) {
|
|||||||
ASSERT_LT(kCapacity * 0.95, cache->GetUsage());
|
ASSERT_LT(kCapacity * 0.95, cache->GetUsage());
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, PinnedUsageTest) {
|
TEST_P(CacheTest, PinnedUsageTest) {
|
||||||
// cache is shared_ptr and will be automatically cleaned up.
|
// cache is shared_ptr and will be automatically cleaned up.
|
||||||
const uint64_t kCapacity = 100000;
|
const uint64_t kCapacity = 100000;
|
||||||
auto cache = NewLRUCache(kCapacity, 8);
|
auto cache = GetNewCache()(kCapacity, 8, false);
|
||||||
|
|
||||||
size_t pinned_usage = 0;
|
size_t pinned_usage = 0;
|
||||||
char value[10] = "abcdef";
|
char value[10] = "abcdef";
|
||||||
@ -192,7 +201,7 @@ TEST_F(CacheTest, PinnedUsageTest) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, HitAndMiss) {
|
TEST_P(CacheTest, HitAndMiss) {
|
||||||
ASSERT_EQ(-1, Lookup(100));
|
ASSERT_EQ(-1, Lookup(100));
|
||||||
|
|
||||||
Insert(100, 101);
|
Insert(100, 101);
|
||||||
@ -215,7 +224,13 @@ TEST_F(CacheTest, HitAndMiss) {
|
|||||||
ASSERT_EQ(101, deleted_values_[0]);
|
ASSERT_EQ(101, deleted_values_[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, Erase) {
|
TEST_P(CacheTest, InsertSameKey) {
|
||||||
|
Insert(1, 1);
|
||||||
|
Insert(1, 2);
|
||||||
|
ASSERT_EQ(2, Lookup(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(CacheTest, Erase) {
|
||||||
Erase(200);
|
Erase(200);
|
||||||
ASSERT_EQ(0U, deleted_keys_.size());
|
ASSERT_EQ(0U, deleted_keys_.size());
|
||||||
|
|
||||||
@ -234,7 +249,7 @@ TEST_F(CacheTest, Erase) {
|
|||||||
ASSERT_EQ(1U, deleted_keys_.size());
|
ASSERT_EQ(1U, deleted_keys_.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, EntriesArePinned) {
|
TEST_P(CacheTest, EntriesArePinned) {
|
||||||
Insert(100, 101);
|
Insert(100, 101);
|
||||||
Cache::Handle* h1 = cache_->Lookup(EncodeKey(100));
|
Cache::Handle* h1 = cache_->Lookup(EncodeKey(100));
|
||||||
ASSERT_EQ(101, DecodeValue(cache_->Value(h1)));
|
ASSERT_EQ(101, DecodeValue(cache_->Value(h1)));
|
||||||
@ -264,21 +279,20 @@ TEST_F(CacheTest, EntriesArePinned) {
|
|||||||
ASSERT_EQ(0U, cache_->GetUsage());
|
ASSERT_EQ(0U, cache_->GetUsage());
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, EvictionPolicy) {
|
TEST_P(CacheTest, EvictionPolicy) {
|
||||||
Insert(100, 101);
|
Insert(100, 101);
|
||||||
Insert(200, 201);
|
Insert(200, 201);
|
||||||
|
|
||||||
// Frequently used entry must be kept around
|
// Frequently used entry must be kept around
|
||||||
for (int i = 0; i < kCacheSize + 100; i++) {
|
for (int i = 0; i < kCacheSize + 100; i++) {
|
||||||
Insert(1000+i, 2000+i);
|
Insert(1000+i, 2000+i);
|
||||||
ASSERT_EQ(2000+i, Lookup(1000+i));
|
|
||||||
ASSERT_EQ(101, Lookup(100));
|
ASSERT_EQ(101, Lookup(100));
|
||||||
}
|
}
|
||||||
ASSERT_EQ(101, Lookup(100));
|
ASSERT_EQ(101, Lookup(100));
|
||||||
ASSERT_EQ(-1, Lookup(200));
|
ASSERT_EQ(-1, Lookup(200));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, EvictionPolicyRef) {
|
TEST_P(CacheTest, EvictionPolicyRef) {
|
||||||
Insert(100, 101);
|
Insert(100, 101);
|
||||||
Insert(101, 102);
|
Insert(101, 102);
|
||||||
Insert(102, 103);
|
Insert(102, 103);
|
||||||
@ -326,7 +340,24 @@ TEST_F(CacheTest, EvictionPolicyRef) {
|
|||||||
cache_->Release(h204);
|
cache_->Release(h204);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, ErasedHandleState) {
|
TEST_P(CacheTest, EvictEmptyCache) {
|
||||||
|
// Insert item large than capacity to trigger eviction on empty cache.
|
||||||
|
auto cache = GetNewCache()(1, 0, false);
|
||||||
|
ASSERT_OK(cache->Insert("foo", nullptr, 10, dumbDeleter));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(CacheTest, EraseFromDeleter) {
|
||||||
|
// Have deleter which will erase item from cache, which will re-enter
|
||||||
|
// the cache at that point.
|
||||||
|
std::shared_ptr<Cache> cache = GetNewCache()(10, 0, false);
|
||||||
|
ASSERT_OK(cache->Insert("foo", nullptr, 1, dumbDeleter));
|
||||||
|
ASSERT_OK(cache->Insert("bar", cache.get(), 1, eraseDeleter));
|
||||||
|
cache->Erase("bar");
|
||||||
|
ASSERT_EQ(nullptr, cache->Lookup("foo"));
|
||||||
|
ASSERT_EQ(nullptr, cache->Lookup("bar"));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(CacheTest, ErasedHandleState) {
|
||||||
// insert a key and get two handles
|
// insert a key and get two handles
|
||||||
Insert(100, 1000);
|
Insert(100, 1000);
|
||||||
Cache::Handle* h1 = cache_->Lookup(EncodeKey(100));
|
Cache::Handle* h1 = cache_->Lookup(EncodeKey(100));
|
||||||
@ -348,7 +379,7 @@ TEST_F(CacheTest, ErasedHandleState) {
|
|||||||
cache_->Release(h2);
|
cache_->Release(h2);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, HeavyEntries) {
|
TEST_P(CacheTest, HeavyEntries) {
|
||||||
// Add a bunch of light and heavy entries and then count the combined
|
// Add a bunch of light and heavy entries and then count the combined
|
||||||
// size of items still in the cache, which must be approximately the
|
// size of items still in the cache, which must be approximately the
|
||||||
// same as the total capacity.
|
// same as the total capacity.
|
||||||
@ -375,7 +406,7 @@ TEST_F(CacheTest, HeavyEntries) {
|
|||||||
ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10);
|
ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, NewId) {
|
TEST_P(CacheTest, NewId) {
|
||||||
uint64_t a = cache_->NewId();
|
uint64_t a = cache_->NewId();
|
||||||
uint64_t b = cache_->NewId();
|
uint64_t b = cache_->NewId();
|
||||||
ASSERT_NE(a, b);
|
ASSERT_NE(a, b);
|
||||||
@ -383,12 +414,10 @@ TEST_F(CacheTest, NewId) {
|
|||||||
|
|
||||||
|
|
||||||
class Value {
|
class Value {
|
||||||
private:
|
|
||||||
size_t v_;
|
|
||||||
public:
|
public:
|
||||||
explicit Value(size_t v) : v_(v) { }
|
explicit Value(size_t v) : v_(v) { }
|
||||||
|
|
||||||
~Value() { std::cout << v_ << " is destructed\n"; }
|
size_t v_;
|
||||||
};
|
};
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
@ -397,12 +426,12 @@ void deleter(const Slice& key, void* value) {
|
|||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
TEST_F(CacheTest, SetCapacity) {
|
TEST_P(CacheTest, SetCapacity) {
|
||||||
// test1: increase capacity
|
// test1: increase capacity
|
||||||
// lets create a cache with capacity 5,
|
// lets create a cache with capacity 5,
|
||||||
// then, insert 5 elements, then increase capacity
|
// then, insert 5 elements, then increase capacity
|
||||||
// to 10, returned capacity should be 10, usage=5
|
// to 10, returned capacity should be 10, usage=5
|
||||||
std::shared_ptr<Cache> cache = NewLRUCache(5, 0);
|
std::shared_ptr<Cache> cache = GetNewCache()(5, 0, false);
|
||||||
std::vector<Cache::Handle*> handles(10);
|
std::vector<Cache::Handle*> handles(10);
|
||||||
// Insert 5 entries, but not releasing.
|
// Insert 5 entries, but not releasing.
|
||||||
for (size_t i = 0; i < 5; i++) {
|
for (size_t i = 0; i < 5; i++) {
|
||||||
@ -442,7 +471,7 @@ TEST_F(CacheTest, SetCapacity) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, SetStrictCapacityLimit) {
|
TEST_P(CacheTest, SetStrictCapacityLimit) {
|
||||||
// test1: set the flag to false. Insert more keys than capacity. See if they
|
// test1: set the flag to false. Insert more keys than capacity. See if they
|
||||||
// all go through.
|
// all go through.
|
||||||
std::shared_ptr<Cache> cache = NewLRUCache(5, 0, false);
|
std::shared_ptr<Cache> cache = NewLRUCache(5, 0, false);
|
||||||
@ -489,11 +518,11 @@ TEST_F(CacheTest, SetStrictCapacityLimit) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CacheTest, OverCapacity) {
|
TEST_P(CacheTest, OverCapacity) {
|
||||||
size_t n = 10;
|
size_t n = 10;
|
||||||
|
|
||||||
// a LRUCache with n entries and one shard only
|
// a LRUCache with n entries and one shard only
|
||||||
std::shared_ptr<Cache> cache = NewLRUCache(n, 0);
|
std::shared_ptr<Cache> cache = GetNewCache()(n, 0, false);
|
||||||
|
|
||||||
std::vector<Cache::Handle*> handles(n+1);
|
std::vector<Cache::Handle*> handles(n+1);
|
||||||
|
|
||||||
@ -508,7 +537,6 @@ TEST_F(CacheTest, OverCapacity) {
|
|||||||
for (size_t i = 0; i < n + 1; i++) {
|
for (size_t i = 0; i < n + 1; i++) {
|
||||||
std::string key = ToString(i+1);
|
std::string key = ToString(i+1);
|
||||||
auto h = cache->Lookup(key);
|
auto h = cache->Lookup(key);
|
||||||
std::cout << key << (h?" found\n":" not found\n");
|
|
||||||
ASSERT_TRUE(h != nullptr);
|
ASSERT_TRUE(h != nullptr);
|
||||||
if (h) cache->Release(h);
|
if (h) cache->Release(h);
|
||||||
}
|
}
|
||||||
@ -518,6 +546,8 @@ TEST_F(CacheTest, OverCapacity) {
|
|||||||
for (size_t i = 0; i < n + 1; i++) {
|
for (size_t i = 0; i < n + 1; i++) {
|
||||||
cache->Release(handles[i]);
|
cache->Release(handles[i]);
|
||||||
}
|
}
|
||||||
|
// Make sure eviction is triggered.
|
||||||
|
cache->SetCapacity(n);
|
||||||
|
|
||||||
// cache is under capacity now since elements were released
|
// cache is under capacity now since elements were released
|
||||||
ASSERT_EQ(n, cache->GetUsage());
|
ASSERT_EQ(n, cache->GetUsage());
|
||||||
@ -544,7 +574,7 @@ void callback(void* entry, size_t charge) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST_F(CacheTest, ApplyToAllCacheEntiresTest) {
|
TEST_P(CacheTest, ApplyToAllCacheEntiresTest) {
|
||||||
std::vector<std::pair<int, int>> inserted;
|
std::vector<std::pair<int, int>> inserted;
|
||||||
callback_state.clear();
|
callback_state.clear();
|
||||||
|
|
||||||
@ -559,6 +589,17 @@ TEST_F(CacheTest, ApplyToAllCacheEntiresTest) {
|
|||||||
ASSERT_TRUE(inserted == callback_state);
|
ASSERT_TRUE(inserted == callback_state);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
shared_ptr<Cache> (*newLRUCache)(size_t, int, bool) = NewLRUCache;
|
||||||
|
#ifdef SUPPORT_CLOCK_CACHE
|
||||||
|
shared_ptr<Cache> (*newClockCache)(size_t, int, bool) = NewClockCache;
|
||||||
|
INSTANTIATE_TEST_CASE_P(CacheTestInstance, CacheTest,
|
||||||
|
testing::Values(NewCache(newLRUCache),
|
||||||
|
NewCache(newClockCache)));
|
||||||
|
#else
|
||||||
|
INSTANTIATE_TEST_CASE_P(CacheTestInstance, CacheTest,
|
||||||
|
testing::Values(NewCache(newLRUCache)));
|
||||||
|
#endif // SUPPORT_CLOCK_CACHE
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
700
util/clock_cache.cc
Normal file
700
util/clock_cache.cc
Normal file
@ -0,0 +1,700 @@
|
|||||||
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
|
||||||
|
#include "util/clock_cache.h"
|
||||||
|
|
||||||
|
#ifndef SUPPORT_CLOCK_CACHE
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
|
||||||
|
std::shared_ptr<Cache> NewClockCache(size_t capacity, int num_shard_bits,
|
||||||
|
bool strict_capacity_limit) {
|
||||||
|
// Clock cache not supported.
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
#include <assert.h>
|
||||||
|
#include <atomic>
|
||||||
|
#include <deque>
|
||||||
|
#include <limits>
|
||||||
|
|
||||||
|
#include "tbb/concurrent_hash_map.h"
|
||||||
|
|
||||||
|
#include "port/port.h"
|
||||||
|
#include "util/autovector.h"
|
||||||
|
#include "util/mutexlock.h"
|
||||||
|
#include "util/sharded_cache.h"
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
// An implementation of the Cache interface based on CLOCK algorithm, with
|
||||||
|
// better concurrent performance than LRUCache. The idea of CLOCK algorithm
|
||||||
|
// is to maintain all cache entries in a circular list, and an iterator
|
||||||
|
// (the "head") pointing to the last examined entry. Eviction starts from the
|
||||||
|
// current head. Each entry is given a second chance before eviction, if it
|
||||||
|
// has been access since last examine. In contrast to LRU, no modification
|
||||||
|
// to the internal data-structure (except for flipping the usage bit) needs
|
||||||
|
// to be done upon lookup. This gives us oppertunity to implement a cache
|
||||||
|
// with better concurrency.
|
||||||
|
//
|
||||||
|
// Each cache entry is represented by a cache handle, and all the handles
|
||||||
|
// are arranged in a circular list, as describe above. Upon erase of an entry,
|
||||||
|
// we never remove the handle. Instead, the handle is put into a recycle bin
|
||||||
|
// to be re-use. This is to avoid memory dealocation, which is hard to deal
|
||||||
|
// with in concurrent environment.
|
||||||
|
//
|
||||||
|
// The cache also maintains a concurrent hash map for lookup. Any concurrent
|
||||||
|
// hash map implementation should do the work. We currently use
|
||||||
|
// tbb::concurrent_hash_map because it supports concurrent erase.
|
||||||
|
//
|
||||||
|
// Each cache handle has the following flags and counters, which are squeeze
|
||||||
|
// in an atomic interger, to make sure the handle always be in a consistent
|
||||||
|
// state:
|
||||||
|
//
|
||||||
|
// * In-cache bit: whether the entry is reference by the cache itself. If
|
||||||
|
// an entry is in cache, its key would also be available in the hash map.
|
||||||
|
// * Usage bit: whether the entry has been access by user since last
|
||||||
|
// examine for eviction. Can be reset by eviction.
|
||||||
|
// * Reference count: reference count by user.
|
||||||
|
//
|
||||||
|
// An entry can be reference only when it's in cache. An entry can be evicted
|
||||||
|
// only when it is in cache, has no usage since last examine, and reference
|
||||||
|
// count is zero.
|
||||||
|
//
|
||||||
|
// The follow figure shows a possible layout of the cache. Boxes represents
|
||||||
|
// cache handles and numbers in each box being in-cache bit, usage bit and
|
||||||
|
// reference count respectively.
|
||||||
|
//
|
||||||
|
// hash map:
|
||||||
|
// +-------+--------+
|
||||||
|
// | key | handle |
|
||||||
|
// +-------+--------+
|
||||||
|
// | "foo" | 5 |-------------------------------------+
|
||||||
|
// +-------+--------+ |
|
||||||
|
// | "bar" | 2 |--+ |
|
||||||
|
// +-------+--------+ | |
|
||||||
|
// | |
|
||||||
|
// head | |
|
||||||
|
// | | |
|
||||||
|
// circular list: | | |
|
||||||
|
// +-------+ +-------+ +-------+ +-------+ +-------+ +-------
|
||||||
|
// |(0,0,0)|---|(1,1,0)|---|(0,0,0)|---|(0,1,3)|---|(1,0,0)|---| ...
|
||||||
|
// +-------+ +-------+ +-------+ +-------+ +-------+ +-------
|
||||||
|
// | |
|
||||||
|
// +-------+ +-----------+
|
||||||
|
// | |
|
||||||
|
// +---+---+
|
||||||
|
// recycle bin: | 1 | 3 |
|
||||||
|
// +---+---+
|
||||||
|
//
|
||||||
|
// Suppose we try to insert "baz" into the cache at this point and the cache is
|
||||||
|
// full. The cache will first look for entries to evict, starting from where
|
||||||
|
// head points to (the second entry). It resets usage bit of the second entry,
|
||||||
|
// skips the third and fourth entry since they are not in cache, and finally
|
||||||
|
// evict the fifth entry ("foo"). It looks at recycle bin for available handle,
|
||||||
|
// grabs handle 3, and insert the key into the handle. The following figure
|
||||||
|
// shows the resulting layout.
|
||||||
|
//
|
||||||
|
// hash map:
|
||||||
|
// +-------+--------+
|
||||||
|
// | key | handle |
|
||||||
|
// +-------+--------+
|
||||||
|
// | "baz" | 3 |-------------+
|
||||||
|
// +-------+--------+ |
|
||||||
|
// | "bar" | 2 |--+ |
|
||||||
|
// +-------+--------+ | |
|
||||||
|
// | |
|
||||||
|
// | | head
|
||||||
|
// | | |
|
||||||
|
// circular list: | | |
|
||||||
|
// +-------+ +-------+ +-------+ +-------+ +-------+ +-------
|
||||||
|
// |(0,0,0)|---|(1,0,0)|---|(1,0,0)|---|(0,1,3)|---|(0,0,0)|---| ...
|
||||||
|
// +-------+ +-------+ +-------+ +-------+ +-------+ +-------
|
||||||
|
// | |
|
||||||
|
// +-------+ +-----------------------------------+
|
||||||
|
// | |
|
||||||
|
// +---+---+
|
||||||
|
// recycle bin: | 1 | 5 |
|
||||||
|
// +---+---+
|
||||||
|
//
|
||||||
|
// A global mutex guards the circular list, the head, and the recycle bin.
|
||||||
|
// We additionally require that modifying the hash map needs to hold the mutex.
|
||||||
|
// As such, Modifying the cache (such as Insert() and Erase()) require to
|
||||||
|
// hold the mutex. Lookup() only access the hash map and the flags associated
|
||||||
|
// with each handle, and don't require explicit locking. Release() has to
|
||||||
|
// acquire the mutex only when it releases the last reference to the entry and
|
||||||
|
// the entry has been erased from cache explicitly. A future improvement could
|
||||||
|
// be to remove the mutex completely.
|
||||||
|
//
|
||||||
|
// Benchmark:
|
||||||
|
// We run readrandom db_bench on a test DB of size 13GB, with size of each
|
||||||
|
// level:
|
||||||
|
//
|
||||||
|
// Level Files Size(MB)
|
||||||
|
// -------------------------
|
||||||
|
// L0 1 0.01
|
||||||
|
// L1 18 17.32
|
||||||
|
// L2 230 182.94
|
||||||
|
// L3 1186 1833.63
|
||||||
|
// L4 4602 8140.30
|
||||||
|
//
|
||||||
|
// We test with both 32 and 16 read threads, with 2GB cache size (the whole DB
|
||||||
|
// doesn't fits in) and 64GB cache size (the whole DB can fit in cache), and
|
||||||
|
// whether to put index and filter blocks in block cache. The benchmark runs
|
||||||
|
// with
|
||||||
|
// with RocksDB 4.10. We got the following result:
|
||||||
|
//
|
||||||
|
// Threads Cache Cache ClockCache LRUCache
|
||||||
|
// Size Index/Filter Throughput(MB/s) Hit Throughput(MB/s) Hit
|
||||||
|
// 32 2GB yes 466.7 85.9% 433.7 86.5%
|
||||||
|
// 32 2GB no 529.9 72.7% 532.7 73.9%
|
||||||
|
// 32 64GB yes 649.9 99.9% 507.9 99.9%
|
||||||
|
// 32 64GB no 740.4 99.9% 662.8 99.9%
|
||||||
|
// 16 2GB yes 278.4 85.9% 283.4 86.5%
|
||||||
|
// 16 2GB no 318.6 72.7% 335.8 73.9%
|
||||||
|
// 16 64GB yes 391.9 99.9% 353.3 99.9%
|
||||||
|
// 16 64GB no 433.8 99.8% 419.4 99.8%
|
||||||
|
|
||||||
|
// Cache entry meta data.
|
||||||
|
struct CacheHandle {
|
||||||
|
Slice key;
|
||||||
|
uint32_t hash;
|
||||||
|
void* value;
|
||||||
|
size_t charge;
|
||||||
|
void (*deleter)(const Slice&, void* value);
|
||||||
|
|
||||||
|
// Flags and counters associated with the cache handle:
|
||||||
|
// lowest bit: n-cache bit
|
||||||
|
// second lowest bit: usage bit
|
||||||
|
// the rest bits: reference count
|
||||||
|
// The handle is unused when flags equals to 0. The thread decreases the count
|
||||||
|
// to 0 is responsible to put the handle back to recycle_ and cleanup memory.
|
||||||
|
std::atomic<uint32_t> flags;
|
||||||
|
|
||||||
|
CacheHandle() = default;
|
||||||
|
|
||||||
|
CacheHandle(const CacheHandle& a) { *this = a; }
|
||||||
|
|
||||||
|
CacheHandle& operator=(const CacheHandle& a) {
|
||||||
|
// Only copy members needed for deletion.
|
||||||
|
key = a.key;
|
||||||
|
value = a.value;
|
||||||
|
deleter = a.deleter;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Key of hash map. We store hash value with the key for convenience.
|
||||||
|
struct CacheKey {
|
||||||
|
Slice key;
|
||||||
|
uint32_t hash_value;
|
||||||
|
|
||||||
|
CacheKey() = default;
|
||||||
|
|
||||||
|
CacheKey(const Slice& k, uint32_t h) {
|
||||||
|
key = k;
|
||||||
|
hash_value = h;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool equal(const CacheKey& a, const CacheKey& b) {
|
||||||
|
return a.hash_value == b.hash_value && a.key == b.key;
|
||||||
|
}
|
||||||
|
|
||||||
|
static size_t hash(const CacheKey& a) {
|
||||||
|
return static_cast<size_t>(a.hash_value);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct CleanupContext {
|
||||||
|
// List of values to be deleted, along with the key and deleter.
|
||||||
|
autovector<CacheHandle> to_delete_value;
|
||||||
|
|
||||||
|
// List of keys to be deleted.
|
||||||
|
autovector<const char*> to_delete_key;
|
||||||
|
};
|
||||||
|
|
||||||
|
// A cache shard which maintains its own CLOCK cache.
|
||||||
|
class ClockCacheShard : public CacheShard {
|
||||||
|
public:
|
||||||
|
// Hash map type.
|
||||||
|
typedef tbb::concurrent_hash_map<CacheKey, CacheHandle*, CacheKey> HashTable;
|
||||||
|
|
||||||
|
ClockCacheShard();
|
||||||
|
~ClockCacheShard() = default;
|
||||||
|
|
||||||
|
// Interfaces
|
||||||
|
virtual void SetCapacity(size_t capacity) override;
|
||||||
|
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override;
|
||||||
|
virtual Status Insert(const Slice& key, uint32_t hash, void* value,
|
||||||
|
size_t charge,
|
||||||
|
void (*deleter)(const Slice& key, void* value),
|
||||||
|
Cache::Handle** handle) override;
|
||||||
|
virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override;
|
||||||
|
virtual void Release(Cache::Handle* handle) override;
|
||||||
|
virtual void Erase(const Slice& key, uint32_t hash) override;
|
||||||
|
virtual size_t GetUsage() const override;
|
||||||
|
virtual size_t GetPinnedUsage() const override;
|
||||||
|
virtual void EraseUnRefEntries() override;
|
||||||
|
virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
|
||||||
|
bool thread_safe) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
static const uint32_t kInCacheBit = 1;
|
||||||
|
static const uint32_t kUsageBit = 2;
|
||||||
|
static const uint32_t kRefsOffset = 2;
|
||||||
|
static const uint32_t kOneRef = 1 << kRefsOffset;
|
||||||
|
|
||||||
|
// Helper functions to extract cache handle flags and counters.
|
||||||
|
static bool InCache(uint32_t flags) { return flags & kInCacheBit; }
|
||||||
|
static bool HasUsage(uint32_t flags) { return flags & kUsageBit; }
|
||||||
|
static uint32_t CountRefs(uint32_t flags) { return flags >> kRefsOffset; }
|
||||||
|
|
||||||
|
// If the entry in in cache, increase reference count and return true.
|
||||||
|
// Return false otherwise.
|
||||||
|
//
|
||||||
|
// Not necessary to hold mutex_ before being called.
|
||||||
|
bool Ref(CacheHandle* handle);
|
||||||
|
|
||||||
|
// Decrease reference count of the entry. If this decreases the count to 0,
|
||||||
|
// recycle the entry. If set_usage is true, also set the usage bit.
|
||||||
|
//
|
||||||
|
// Not necessary to hold mutex_ before being called.
|
||||||
|
void Unref(CacheHandle* handle, bool set_usage, CleanupContext* context);
|
||||||
|
|
||||||
|
// Unset in-cache bit of the entry. Recycle the handle if necessary.
|
||||||
|
//
|
||||||
|
// Has to hold mutex_ before being called.
|
||||||
|
void UnsetInCache(CacheHandle* handle, CleanupContext* context);
|
||||||
|
|
||||||
|
// Put the handle back to recycle_ list, and put the value associated with
|
||||||
|
// it into to-be-deleted list. It doesn't cleanup the key as it might be
|
||||||
|
// reused by another handle.
|
||||||
|
//
|
||||||
|
// Has to hold mutex_ before being called.
|
||||||
|
void RecycleHandle(CacheHandle* handle, CleanupContext* context);
|
||||||
|
|
||||||
|
// Remove the key from hash map. Put the key associated with the entry into
|
||||||
|
// to be deleted list.
|
||||||
|
//
|
||||||
|
// Has to hold mutex_ before being called.
|
||||||
|
void EraseKey(CacheHandle* handle, CleanupContext* context);
|
||||||
|
|
||||||
|
// Delete keys and values in to-be-deleted list. Call the method without
|
||||||
|
// holding mutex, as destructors can be expensive.
|
||||||
|
void Cleanup(const CleanupContext& context);
|
||||||
|
|
||||||
|
// Examine the handle for eviction. If the handle is in cache, usage bit is
|
||||||
|
// not set, and referece count is 0, evict it from cache. Otherwise unset
|
||||||
|
// the usage bit.
|
||||||
|
//
|
||||||
|
// Has to hold mutex_ before being called.
|
||||||
|
bool TryEvict(CacheHandle* value, CleanupContext* context);
|
||||||
|
|
||||||
|
// Scan through the circular list, evict entries until we get enough capacity
|
||||||
|
// for new cache entry of specific size. Return true if success, false
|
||||||
|
// otherwise.
|
||||||
|
//
|
||||||
|
// Has to hold mutex_ before being called.
|
||||||
|
bool EvictFromCache(size_t charge, CleanupContext* context);
|
||||||
|
|
||||||
|
CacheHandle* Insert(const Slice& key, uint32_t hash, void* value,
|
||||||
|
size_t change,
|
||||||
|
void (*deleter)(const Slice& key, void* value),
|
||||||
|
bool hold_reference, CleanupContext* context);
|
||||||
|
|
||||||
|
// Guards list_, head_, and recycle_. In addition, updating table_ also has
|
||||||
|
// to hold the mutex, to avoid the cache being in inconsistent state.
|
||||||
|
mutable port::Mutex mutex_;
|
||||||
|
|
||||||
|
// The circular list of cache handles. Initially the list is empty. Once a
|
||||||
|
// handle is needed by insertion, and no more handles are available in
|
||||||
|
// recycle bin, one more handle is appended to the end.
|
||||||
|
//
|
||||||
|
// We use std::deque for the circular list because we want to make sure
|
||||||
|
// pointers to handles are valid through out the life-cycle of the cache
|
||||||
|
// (in contrast to std::vector), and be able to grow the list (in contrast
|
||||||
|
// to statically allocated arrays).
|
||||||
|
std::deque<CacheHandle> list_;
|
||||||
|
|
||||||
|
// Pointer to the next handle in the circular list to be examine for
|
||||||
|
// eviction.
|
||||||
|
size_t head_;
|
||||||
|
|
||||||
|
// Recycle bin of cache handles.
|
||||||
|
autovector<CacheHandle*> recycle_;
|
||||||
|
|
||||||
|
// Maximum cache size.
|
||||||
|
std::atomic<size_t> capacity_;
|
||||||
|
|
||||||
|
// Current total size of the cache.
|
||||||
|
std::atomic<size_t> usage_;
|
||||||
|
|
||||||
|
// Total un-released cache size.
|
||||||
|
std::atomic<size_t> pinned_usage_;
|
||||||
|
|
||||||
|
// Whether allow insert into cache if cache is full.
|
||||||
|
std::atomic<bool> strict_capacity_limit_;
|
||||||
|
|
||||||
|
// Hash table (tbb::concurrent_hash_map) for lookup.
|
||||||
|
HashTable table_;
|
||||||
|
};
|
||||||
|
|
||||||
|
ClockCacheShard::ClockCacheShard()
|
||||||
|
: head_(0), usage_(0), pinned_usage_(0), strict_capacity_limit_(false) {}
|
||||||
|
|
||||||
|
size_t ClockCacheShard::GetUsage() const {
|
||||||
|
return usage_.load(std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t ClockCacheShard::GetPinnedUsage() const {
|
||||||
|
return pinned_usage_.load(std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t),
|
||||||
|
bool thread_safe) {
|
||||||
|
if (thread_safe) {
|
||||||
|
mutex_.Lock();
|
||||||
|
}
|
||||||
|
for (auto& handle : list_) {
|
||||||
|
// Use relaxed semantics instead of acquire semantics since we are either
|
||||||
|
// holding mutex, or don't have thread safe requirement.
|
||||||
|
uint32_t flags = handle.flags.load(std::memory_order_relaxed);
|
||||||
|
if (InCache(flags)) {
|
||||||
|
callback(handle.value, handle.charge);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (thread_safe) {
|
||||||
|
mutex_.Unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::RecycleHandle(CacheHandle* handle,
|
||||||
|
CleanupContext* context) {
|
||||||
|
mutex_.AssertHeld();
|
||||||
|
assert(!InCache(handle->flags) && CountRefs(handle->flags) == 0);
|
||||||
|
// Only cleanup the value. The key may be reused by another handle.
|
||||||
|
context->to_delete_value.emplace_back(*handle);
|
||||||
|
recycle_.push_back(handle);
|
||||||
|
usage_.fetch_sub(handle->charge, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::EraseKey(CacheHandle* handle, CleanupContext* context) {
|
||||||
|
mutex_.AssertHeld();
|
||||||
|
assert(!InCache(handle->flags));
|
||||||
|
bool erased __attribute__((__unused__)) =
|
||||||
|
table_.erase(CacheKey(handle->key, handle->hash));
|
||||||
|
assert(erased);
|
||||||
|
context->to_delete_key.push_back(handle->key.data());
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::Cleanup(const CleanupContext& context) {
|
||||||
|
for (const CacheHandle& handle : context.to_delete_value) {
|
||||||
|
if (handle.deleter) {
|
||||||
|
(*handle.deleter)(handle.key, handle.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (const char* key : context.to_delete_key) {
|
||||||
|
delete[] key;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ClockCacheShard::Ref(CacheHandle* handle) {
|
||||||
|
// CAS loop to increase reference count.
|
||||||
|
uint32_t flags = handle->flags.load(std::memory_order_relaxed);
|
||||||
|
while (InCache(flags)) {
|
||||||
|
// Use acquire semantics on success, as further operations on the cache
|
||||||
|
// entry has to be order after reference count is increased.
|
||||||
|
if (handle->flags.compare_exchange_weak(flags, flags + kOneRef,
|
||||||
|
std::memory_order_acquire,
|
||||||
|
std::memory_order_relaxed)) {
|
||||||
|
if (CountRefs(flags) == 0) {
|
||||||
|
// No reference count before the operation.
|
||||||
|
pinned_usage_.fetch_add(handle->charge, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::Unref(CacheHandle* handle, bool set_usage,
|
||||||
|
CleanupContext* context) {
|
||||||
|
if (set_usage) {
|
||||||
|
handle->flags.fetch_or(kUsageBit, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
// Use acquire-release semantics as previous operations on the cache entry
|
||||||
|
// has to be order before reference count is decreased, and potential cleanup
|
||||||
|
// of the entry has to be order after.
|
||||||
|
uint32_t flags = handle->flags.fetch_sub(kOneRef, std::memory_order_acq_rel);
|
||||||
|
assert(CountRefs(flags) > 0);
|
||||||
|
if (CountRefs(flags) == 1) {
|
||||||
|
// this is the last reference.
|
||||||
|
pinned_usage_.fetch_sub(handle->charge, std::memory_order_relaxed);
|
||||||
|
// Cleanup if it is the last reference.
|
||||||
|
if (!InCache(flags)) {
|
||||||
|
MutexLock l(&mutex_);
|
||||||
|
RecycleHandle(handle, context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::UnsetInCache(CacheHandle* handle,
|
||||||
|
CleanupContext* context) {
|
||||||
|
mutex_.AssertHeld();
|
||||||
|
// Use acquire-release semantics as previous operations on the cache entry
|
||||||
|
// has to be order before reference count is decreased, and potential cleanup
|
||||||
|
// of the entry has to be order after.
|
||||||
|
uint32_t flags =
|
||||||
|
handle->flags.fetch_and(~kInCacheBit, std::memory_order_acq_rel);
|
||||||
|
// Cleanup if it is the last reference.
|
||||||
|
if (InCache(flags) && CountRefs(flags) == 0) {
|
||||||
|
RecycleHandle(handle, context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ClockCacheShard::TryEvict(CacheHandle* handle, CleanupContext* context) {
|
||||||
|
mutex_.AssertHeld();
|
||||||
|
uint32_t flags = kInCacheBit;
|
||||||
|
if (handle->flags.compare_exchange_strong(flags, 0, std::memory_order_acquire,
|
||||||
|
std::memory_order_relaxed)) {
|
||||||
|
RecycleHandle(handle, context);
|
||||||
|
EraseKey(handle, context);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
handle->flags.fetch_and(~kUsageBit, std::memory_order_relaxed);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ClockCacheShard::EvictFromCache(size_t charge, CleanupContext* context) {
|
||||||
|
size_t usage = usage_.load(std::memory_order_relaxed);
|
||||||
|
size_t capacity = capacity_.load(std::memory_order_relaxed);
|
||||||
|
if (usage == 0) {
|
||||||
|
return charge <= capacity;
|
||||||
|
}
|
||||||
|
size_t new_head = head_;
|
||||||
|
bool second_iteration = false;
|
||||||
|
while (usage + charge > capacity) {
|
||||||
|
assert(new_head < list_.size());
|
||||||
|
if (TryEvict(&list_[new_head], context)) {
|
||||||
|
usage = usage_.load(std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
new_head = (new_head + 1 >= list_.size()) ? 0 : new_head + 1;
|
||||||
|
if (new_head == head_) {
|
||||||
|
if (second_iteration) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
second_iteration = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
head_ = new_head;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::SetCapacity(size_t capacity) {
|
||||||
|
CleanupContext context;
|
||||||
|
{
|
||||||
|
MutexLock l(&mutex_);
|
||||||
|
capacity_.store(capacity, std::memory_order_relaxed);
|
||||||
|
EvictFromCache(0, &context);
|
||||||
|
}
|
||||||
|
Cleanup(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) {
|
||||||
|
strict_capacity_limit_.store(strict_capacity_limit,
|
||||||
|
std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
CacheHandle* ClockCacheShard::Insert(
|
||||||
|
const Slice& key, uint32_t hash, void* value, size_t charge,
|
||||||
|
void (*deleter)(const Slice& key, void* value), bool hold_reference,
|
||||||
|
CleanupContext* context) {
|
||||||
|
MutexLock l(&mutex_);
|
||||||
|
bool success = EvictFromCache(charge, context);
|
||||||
|
bool strict = strict_capacity_limit_.load(std::memory_order_relaxed);
|
||||||
|
if (!success && strict) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
// Grab available handle from recycle bin. If recycle bin is empty, create
|
||||||
|
// and append new handle to end of circular list.
|
||||||
|
CacheHandle* handle = nullptr;
|
||||||
|
if (!recycle_.empty()) {
|
||||||
|
handle = recycle_.back();
|
||||||
|
recycle_.pop_back();
|
||||||
|
} else {
|
||||||
|
list_.emplace_back();
|
||||||
|
handle = &list_.back();
|
||||||
|
}
|
||||||
|
// Fill handle.
|
||||||
|
handle->key = key;
|
||||||
|
handle->hash = hash;
|
||||||
|
handle->value = value;
|
||||||
|
handle->charge = charge;
|
||||||
|
handle->deleter = deleter;
|
||||||
|
uint32_t flags = hold_reference ? kInCacheBit + kOneRef : kInCacheBit;
|
||||||
|
handle->flags.store(flags, std::memory_order_relaxed);
|
||||||
|
HashTable::accessor accessor;
|
||||||
|
if (table_.find(accessor, CacheKey(key, hash))) {
|
||||||
|
// Key exists. Replace with new handle, but keep the existing key since
|
||||||
|
// the key in hash table is back by the existing one. The new key will be
|
||||||
|
// deleted by Cleanup().
|
||||||
|
CacheHandle* existing_handle = accessor->second;
|
||||||
|
context->to_delete_key.push_back(handle->key.data());
|
||||||
|
handle->key = existing_handle->key;
|
||||||
|
accessor->second = handle;
|
||||||
|
accessor.release();
|
||||||
|
UnsetInCache(existing_handle, context);
|
||||||
|
} else {
|
||||||
|
table_.insert(HashTable::value_type(CacheKey(key, hash), handle));
|
||||||
|
}
|
||||||
|
if (hold_reference) {
|
||||||
|
pinned_usage_.fetch_add(charge, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
usage_.fetch_add(charge, std::memory_order_relaxed);
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
Status ClockCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
|
||||||
|
size_t charge,
|
||||||
|
void (*deleter)(const Slice& key, void* value),
|
||||||
|
Cache::Handle** h) {
|
||||||
|
CleanupContext context;
|
||||||
|
HashTable::accessor accessor;
|
||||||
|
char* key_data = new char[key.size()];
|
||||||
|
memcpy(key_data, key.data(), key.size());
|
||||||
|
Slice key_copy(key_data, key.size());
|
||||||
|
CacheHandle* handle =
|
||||||
|
Insert(key_copy, hash, value, charge, deleter, h != nullptr, &context);
|
||||||
|
Status s;
|
||||||
|
if (h != nullptr) {
|
||||||
|
*h = reinterpret_cast<Cache::Handle*>(handle);
|
||||||
|
}
|
||||||
|
if (handle == nullptr) {
|
||||||
|
s = Status::Incomplete("Insert failed due to LRU cache being full.");
|
||||||
|
}
|
||||||
|
Cleanup(context);
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
Cache::Handle* ClockCacheShard::Lookup(const Slice& key, uint32_t hash) {
|
||||||
|
HashTable::const_accessor accessor;
|
||||||
|
if (!table_.find(accessor, CacheKey(key, hash))) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
CacheHandle* handle = accessor->second;
|
||||||
|
accessor.release();
|
||||||
|
// Ref() could fail if another thread sneak in and evict/erase the cache
|
||||||
|
// entry before we are able to hold reference.
|
||||||
|
if (!Ref(handle)) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
// Double check the key since the handle may now representing another key
|
||||||
|
// if other threads sneak in, evict/erase the entry and re-used the handle
|
||||||
|
// for another cache entry.
|
||||||
|
if (hash != handle->hash || key != handle->key) {
|
||||||
|
CleanupContext context;
|
||||||
|
Unref(handle, false, &context);
|
||||||
|
// It is possible Unref() delete the entry, so we need to cleanup.
|
||||||
|
Cleanup(context);
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
return reinterpret_cast<Cache::Handle*>(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::Release(Cache::Handle* h) {
|
||||||
|
CleanupContext context;
|
||||||
|
CacheHandle* handle = reinterpret_cast<CacheHandle*>(h);
|
||||||
|
Unref(handle, true, &context);
|
||||||
|
Cleanup(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::Erase(const Slice& key, uint32_t hash) {
|
||||||
|
CleanupContext context;
|
||||||
|
{
|
||||||
|
MutexLock l(&mutex_);
|
||||||
|
HashTable::accessor accessor;
|
||||||
|
if (table_.find(accessor, CacheKey(key, hash))) {
|
||||||
|
CacheHandle* handle = accessor->second;
|
||||||
|
table_.erase(accessor);
|
||||||
|
UnsetInCache(handle, &context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cleanup(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClockCacheShard::EraseUnRefEntries() {
|
||||||
|
CleanupContext context;
|
||||||
|
{
|
||||||
|
MutexLock l(&mutex_);
|
||||||
|
table_.clear();
|
||||||
|
for (auto& handle : list_) {
|
||||||
|
UnsetInCache(&handle, &context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cleanup(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
class ClockCache : public ShardedCache {
|
||||||
|
public:
|
||||||
|
ClockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit)
|
||||||
|
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
|
||||||
|
int num_shards = 1 << num_shard_bits;
|
||||||
|
shards_ = new ClockCacheShard[num_shards];
|
||||||
|
SetCapacity(capacity);
|
||||||
|
SetStrictCapacityLimit(strict_capacity_limit);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual ~ClockCache() { delete[] shards_; }
|
||||||
|
|
||||||
|
virtual const char* Name() const override { return "ClockCache"; }
|
||||||
|
|
||||||
|
virtual CacheShard* GetShard(int shard) override {
|
||||||
|
return reinterpret_cast<CacheShard*>(&shards_[shard]);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const CacheShard* GetShard(int shard) const override {
|
||||||
|
return reinterpret_cast<CacheShard*>(&shards_[shard]);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void* Value(Handle* handle) override {
|
||||||
|
return reinterpret_cast<const CacheHandle*>(handle)->value;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual size_t GetCharge(Handle* handle) const override {
|
||||||
|
return reinterpret_cast<const CacheHandle*>(handle)->charge;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual uint32_t GetHash(Handle* handle) const override {
|
||||||
|
return reinterpret_cast<const CacheHandle*>(handle)->hash;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void DisownData() override { shards_ = nullptr; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
ClockCacheShard* shards_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // end anonymous namespace
|
||||||
|
|
||||||
|
std::shared_ptr<Cache> NewClockCache(size_t capacity, int num_shard_bits,
|
||||||
|
bool strict_capacity_limit) {
|
||||||
|
return std::make_shared<ClockCache>(capacity, num_shard_bits,
|
||||||
|
strict_capacity_limit);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
#endif // SUPPORT_CLOCK_CACHE
|
16
util/clock_cache.h
Normal file
16
util/clock_cache.h
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "rocksdb/cache.h"
|
||||||
|
|
||||||
|
#if defined(TBB) && !defined(ROCKSDB_LITE)
|
||||||
|
#define SUPPORT_CLOCK_CACHE
|
||||||
|
#endif
|
Loading…
x
Reference in New Issue
Block a user