Initial support for secondary cache in LRUCache (#8271)

Summary:
Defined the abstract interface for a secondary cache in include/rocksdb/secondary_cache.h, and updated LRUCacheOptions to take a std::shared_ptr<SecondaryCache>. An item is initially inserted into the LRU (primary) cache. When it ages out and evicted from memory, its inserted into the secondary cache. On a LRU cache miss and successful lookup in the secondary cache, the item is promoted to the LRU cache. Only support synchronous lookup currently. The secondary cache would be used to implement a persistent (flash cache) or compressed cache.

Tests:
Results from cache_bench and db_bench don't show any regression due to these changes.

cache_bench results before and after this change -
Command
```./cache_bench -ops_per_thread=10000000 -threads=1```
Before
```Complete in 40.688 s; QPS = 245774```
```Complete in 40.486 s; QPS = 246996```
```Complete in 42.019 s; QPS = 237989```
After
```Complete in 40.672 s; QPS = 245869```
```Complete in 44.622 s; QPS = 224107```
```Complete in 42.445 s; QPS = 235599```

db_bench results before this change, and with this change + https://github.com/facebook/rocksdb/issues/8213 and https://github.com/facebook/rocksdb/issues/8191 -
Commands
```./db_bench  --benchmarks="fillseq,compact" -num=30000000 -key_size=32 -value_size=256 -use_direct_io_for_flush_and_compaction=true -db=/home/anand76/nvm_cache/db -partition_index_and_filters=true```

```./db_bench -db=/home/anand76/nvm_cache/db -use_existing_db=true -benchmarks=readrandom -num=30000000 -key_size=32 -value_size=256 -use_direct_reads=true -cache_size=1073741824 -cache_numshardbits=6 -cache_index_and_filter_blocks=true -read_random_exp_range=17 -statistics -partition_index_and_filters=true -threads=16 -duration=300```
Before
```
DB path: [/home/anand76/nvm_cache/db]
readrandom   :      80.702 micros/op 198104 ops/sec;   54.4 MB/s (3708999 of 3708999 found)
```
```
DB path: [/home/anand76/nvm_cache/db]
readrandom   :      87.124 micros/op 183625 ops/sec;   50.4 MB/s (3439999 of 3439999 found)
```
After
```
DB path: [/home/anand76/nvm_cache/db]
readrandom   :      77.653 micros/op 206025 ops/sec;   56.6 MB/s (3866999 of 3866999 found)
```
```
DB path: [/home/anand76/nvm_cache/db]
readrandom   :      84.962 micros/op 188299 ops/sec;   51.7 MB/s (3535999 of 3535999 found)
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8271

Reviewed By: zhichao-cao

Differential Revision: D28357511

Pulled By: anand1976

fbshipit-source-id: d1cfa236f00e649a18c53328be10a8062a4b6da2
This commit is contained in:
anand76 2021-05-13 22:57:51 -07:00 committed by Facebook GitHub Bot
parent d15fbae449
commit feb06e83b2
12 changed files with 961 additions and 110 deletions

20
cache/clock_cache.cc vendored
View File

@ -271,7 +271,25 @@ class ClockCacheShard final : public CacheShard {
Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle, Cache::Priority priority) override;
Status Insert(const Slice& key, uint32_t hash, void* value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::Handle** handle, Cache::Priority priority) override {
return Insert(key, hash, value, charge, helper->del_cb, handle, priority);
}
Cache::Handle* Lookup(const Slice& key, uint32_t hash) override;
Cache::Handle* Lookup(const Slice& key, uint32_t hash,
const Cache::CacheItemHelper* /*helper*/,
const Cache::CreateCallback& /*create_cb*/,
Cache::Priority /*priority*/, bool /*wait*/) override {
return Lookup(key, hash);
}
bool Release(Cache::Handle* handle, bool /*useful*/,
bool force_erase) override {
return Release(handle, force_erase);
}
bool IsReady(Cache::Handle* /*handle*/) override { return true; }
void Wait(Cache::Handle* /*handle*/) override {}
// If the entry in in cache, increase reference count and return true.
// Return false otherwise.
//
@ -797,6 +815,8 @@ class ClockCache final : public ShardedCache {
#endif // __clang__
}
void WaitAll(std::vector<Handle*>& /*handles*/) override {}
private:
ClockCacheShard* shards_;
};

275
cache/lru_cache.cc vendored
View File

@ -106,11 +106,11 @@ void LRUHandleTable::Resize() {
length_bits_ = new_length_bits;
}
LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit,
double high_pri_pool_ratio,
bool use_adaptive_mutex,
CacheMetadataChargePolicy metadata_charge_policy,
int max_upper_hash_bits)
LRUCacheShard::LRUCacheShard(
size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio,
bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy,
int max_upper_hash_bits,
const std::shared_ptr<SecondaryCache>& secondary_cache)
: capacity_(0),
high_pri_pool_usage_(0),
strict_capacity_limit_(strict_capacity_limit),
@ -119,7 +119,8 @@ LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit,
table_(max_upper_hash_bits),
usage_(0),
lru_usage_(0),
mutex_(use_adaptive_mutex) {
mutex_(use_adaptive_mutex),
secondary_cache_(secondary_cache) {
set_metadata_charge_policy(metadata_charge_policy);
// Make empty circular linked list
lru_.next = &lru_;
@ -179,7 +180,10 @@ void LRUCacheShard::ApplyToSomeEntries(
table_.ApplyToEntriesRange(
[callback](LRUHandle* h) {
callback(h->key(), h->value, h->charge, h->deleter);
DeleterFn deleter = h->IsSecondaryCacheCompatible()
? h->info_.helper->del_cb
: h->info_.deleter;
callback(h->key(), h->value, h->charge, deleter);
},
index_begin, index_end);
}
@ -288,8 +292,14 @@ void LRUCacheShard::SetCapacity(size_t capacity) {
EvictFromLRU(0, &last_reference_list);
}
// Try to insert the evicted entries into tiered cache
// Free the entries outside of mutex for performance reasons
for (auto entry : last_reference_list) {
if (secondary_cache_ && entry->IsSecondaryCacheCompatible() &&
!entry->IsPromoted()) {
secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper)
.PermitUncheckedError();
}
entry->Free();
}
}
@ -299,17 +309,139 @@ void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) {
strict_capacity_limit_ = strict_capacity_limit;
}
Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle* e = table_.Lookup(key, hash);
if (e != nullptr) {
assert(e->InCache());
if (!e->HasRefs()) {
// The entry is in LRU since it's in hash and has no external references
LRU_Remove(e);
Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) {
Status s = Status::OK();
autovector<LRUHandle*> last_reference_list;
size_t total_charge = e->CalcTotalCharge(metadata_charge_policy_);
{
MutexLock l(&mutex_);
// Free the space following strict LRU policy until enough space
// is freed or the lru list is empty
EvictFromLRU(total_charge, &last_reference_list);
if ((usage_ + total_charge) > capacity_ &&
(strict_capacity_limit_ || handle == nullptr)) {
if (handle == nullptr) {
// Don't insert the entry but still return ok, as if the entry inserted
// into cache and get evicted immediately.
e->SetInCache(false);
last_reference_list.push_back(e);
} else {
delete[] reinterpret_cast<char*>(e);
*handle = nullptr;
s = Status::Incomplete("Insert failed due to LRU cache being full.");
}
} else {
// Insert into the cache. Note that the cache might get larger than its
// capacity if not enough space was freed up.
LRUHandle* old = table_.Insert(e);
usage_ += total_charge;
if (old != nullptr) {
s = Status::OkOverwritten();
assert(old->InCache());
old->SetInCache(false);
if (!old->HasRefs()) {
// old is on LRU because it's in cache and its reference count is 0
LRU_Remove(old);
size_t old_total_charge =
old->CalcTotalCharge(metadata_charge_policy_);
assert(usage_ >= old_total_charge);
usage_ -= old_total_charge;
last_reference_list.push_back(old);
}
}
if (handle == nullptr) {
LRU_Insert(e);
} else {
e->Ref();
*handle = reinterpret_cast<Cache::Handle*>(e);
}
}
}
// Try to insert the evicted entries into the secondary cache
// Free the entries here outside of mutex for performance reasons
for (auto entry : last_reference_list) {
if (secondary_cache_ && entry->IsSecondaryCacheCompatible() &&
!entry->IsPromoted()) {
secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper)
.PermitUncheckedError();
}
entry->Free();
}
return s;
}
Cache::Handle* LRUCacheShard::Lookup(
const Slice& key, uint32_t hash,
const ShardedCache::CacheItemHelper* helper,
const ShardedCache::CreateCallback& create_cb, Cache::Priority priority,
bool wait) {
LRUHandle* e = nullptr;
{
MutexLock l(&mutex_);
e = table_.Lookup(key, hash);
if (e != nullptr) {
assert(e->InCache());
if (!e->HasRefs()) {
// The entry is in LRU since it's in hash and has no external references
LRU_Remove(e);
}
e->Ref();
e->SetHit();
}
}
// If handle table lookup failed, then allocate a handle outside the
// mutex if we're going to lookup in the secondary cache
// Only support synchronous for now
// TODO: Support asynchronous lookup in secondary cache
if (!e && secondary_cache_ && helper && helper->saveto_cb && wait) {
// For objects from the secondary cache, we expect the caller to provide
// a way to create/delete the primary cache object. The only case where
// a deleter would not be required is for dummy entries inserted for
// accounting purposes, which we won't demote to the secondary cache
// anyway.
assert(create_cb && helper->del_cb);
std::unique_ptr<SecondaryCacheHandle> secondary_handle =
secondary_cache_->Lookup(key, create_cb, wait);
if (secondary_handle != nullptr) {
void* value = nullptr;
e = reinterpret_cast<LRUHandle*>(
new char[sizeof(LRUHandle) - 1 + key.size()]);
e->flags = 0;
e->SetPromoted(true);
e->SetSecondaryCacheCompatible(true);
e->info_.helper = helper;
e->key_length = key.size();
e->hash = hash;
e->refs = 0;
e->next = e->prev = nullptr;
e->SetInCache(true);
e->SetPriority(priority);
memcpy(e->key_data, key.data(), key.size());
value = secondary_handle->Value();
e->value = value;
e->charge = secondary_handle->Size();
// This call could nullify e if the cache is over capacity and
// strict_capacity_limit_ is true. In such a case, the caller will try
// to insert later, which might again fail, but its ok as this should
// not be common
// Being conservative here since there could be lookups that are
// actually ok to fail rather than succeed and bloat up the memory
// usage (preloading partitioned index blocks, for example).
Status s = InsertItem(e, reinterpret_cast<Cache::Handle**>(&e));
if (!s.ok()) {
assert(e == nullptr);
(*helper->del_cb)(key, value);
}
}
e->Ref();
e->SetHit();
}
return reinterpret_cast<Cache::Handle*>(e);
}
@ -370,81 +502,32 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) {
Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
size_t charge,
void (*deleter)(const Slice& key, void* value),
const Cache::CacheItemHelper* helper,
Cache::Handle** handle, Cache::Priority priority) {
// Allocate the memory here outside of the mutex
// If the cache is full, we'll have to release it
// It shouldn't happen very often though.
LRUHandle* e = reinterpret_cast<LRUHandle*>(
new char[sizeof(LRUHandle) - 1 + key.size()]);
Status s = Status::OK();
autovector<LRUHandle*> last_reference_list;
e->value = value;
e->deleter = deleter;
e->flags = 0;
if (helper) {
e->SetSecondaryCacheCompatible(true);
e->info_.helper = helper;
} else {
e->info_.deleter = deleter;
}
e->charge = charge;
e->key_length = key.size();
e->flags = 0;
e->hash = hash;
e->refs = 0;
e->next = e->prev = nullptr;
e->SetInCache(true);
e->SetPriority(priority);
memcpy(e->key_data, key.data(), key.size());
size_t total_charge = e->CalcTotalCharge(metadata_charge_policy_);
{
MutexLock l(&mutex_);
// Free the space following strict LRU policy until enough space
// is freed or the lru list is empty
EvictFromLRU(total_charge, &last_reference_list);
if ((usage_ + total_charge) > capacity_ &&
(strict_capacity_limit_ || handle == nullptr)) {
if (handle == nullptr) {
// Don't insert the entry but still return ok, as if the entry inserted
// into cache and get evicted immediately.
e->SetInCache(false);
last_reference_list.push_back(e);
} else {
delete[] reinterpret_cast<char*>(e);
*handle = nullptr;
s = Status::Incomplete("Insert failed due to LRU cache being full.");
}
} else {
// Insert into the cache. Note that the cache might get larger than its
// capacity if not enough space was freed up.
LRUHandle* old = table_.Insert(e);
usage_ += total_charge;
if (old != nullptr) {
s = Status::OkOverwritten();
assert(old->InCache());
old->SetInCache(false);
if (!old->HasRefs()) {
// old is on LRU because it's in cache and its reference count is 0
LRU_Remove(old);
size_t old_total_charge =
old->CalcTotalCharge(metadata_charge_policy_);
assert(usage_ >= old_total_charge);
usage_ -= old_total_charge;
last_reference_list.push_back(old);
}
}
if (handle == nullptr) {
LRU_Insert(e);
} else {
e->Ref();
*handle = reinterpret_cast<Cache::Handle*>(e);
}
}
}
// Free the entries here outside of mutex for performance reasons
for (auto entry : last_reference_list) {
entry->Free();
}
return s;
return InsertItem(e, handle);
}
void LRUCacheShard::Erase(const Slice& key, uint32_t hash) {
@ -500,7 +583,8 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit, double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> allocator,
bool use_adaptive_mutex,
CacheMetadataChargePolicy metadata_charge_policy)
CacheMetadataChargePolicy metadata_charge_policy,
const std::shared_ptr<SecondaryCache>& secondary_cache)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
std::move(allocator)) {
num_shards_ = 1 << num_shard_bits;
@ -508,10 +592,10 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits,
port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_));
size_t per_shard = (capacity + (num_shards_ - 1)) / num_shards_;
for (int i = 0; i < num_shards_; i++) {
new (&shards_[i])
LRUCacheShard(per_shard, strict_capacity_limit, high_pri_pool_ratio,
use_adaptive_mutex, metadata_charge_policy,
/* max_upper_hash_bits */ 32 - num_shard_bits);
new (&shards_[i]) LRUCacheShard(
per_shard, strict_capacity_limit, high_pri_pool_ratio,
use_adaptive_mutex, metadata_charge_policy,
/* max_upper_hash_bits */ 32 - num_shard_bits, secondary_cache);
}
}
@ -576,19 +660,12 @@ double LRUCache::GetHighPriPoolRatio() {
return result;
}
std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit,
cache_opts.high_pri_pool_ratio,
cache_opts.memory_allocator, cache_opts.use_adaptive_mutex,
cache_opts.metadata_charge_policy);
}
std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
CacheMetadataChargePolicy metadata_charge_policy) {
CacheMetadataChargePolicy metadata_charge_policy,
const std::shared_ptr<SecondaryCache>& secondary_cache) {
if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces
}
@ -601,7 +678,25 @@ std::shared_ptr<Cache> NewLRUCache(
}
return std::make_shared<LRUCache>(
capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio,
std::move(memory_allocator), use_adaptive_mutex, metadata_charge_policy);
std::move(memory_allocator), use_adaptive_mutex, metadata_charge_policy,
secondary_cache);
}
std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
return NewLRUCache(
cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
cache_opts.memory_allocator, cache_opts.use_adaptive_mutex,
cache_opts.metadata_charge_policy, cache_opts.secondary_cache);
}
std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
CacheMetadataChargePolicy metadata_charge_policy) {
return NewLRUCache(capacity, num_shard_bits, strict_capacity_limit,
high_pri_pool_ratio, memory_allocator, use_adaptive_mutex,
metadata_charge_policy, nullptr);
}
} // namespace ROCKSDB_NAMESPACE

93
cache/lru_cache.h vendored
View File

@ -1,4 +1,4 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
@ -14,6 +14,7 @@
#include "cache/sharded_cache.h"
#include "port/malloc.h"
#include "port/port.h"
#include "rocksdb/secondary_cache.h"
#include "util/autovector.h"
namespace ROCKSDB_NAMESPACE {
@ -49,7 +50,12 @@ namespace ROCKSDB_NAMESPACE {
struct LRUHandle {
void* value;
void (*deleter)(const Slice&, void* value);
union Info {
Info() {}
~Info() {}
void (*deleter)(const Slice&, void* value);
const ShardedCache::CacheItemHelper* helper;
} info_;
LRUHandle* next_hash;
LRUHandle* next;
LRUHandle* prev;
@ -69,6 +75,12 @@ struct LRUHandle {
IN_HIGH_PRI_POOL = (1 << 2),
// Whether this entry has had any lookups (hits).
HAS_HIT = (1 << 3),
// Can this be inserted into the tiered cache
IS_TIERED_CACHE_COMPATIBLE = (1 << 4),
// Is the handle still being read from a lower tier
IS_PENDING = (1 << 5),
// Has the item been promoted from a lower tier
IS_PROMOTED = (1 << 6),
};
uint8_t flags;
@ -95,6 +107,11 @@ struct LRUHandle {
bool IsHighPri() const { return flags & IS_HIGH_PRI; }
bool InHighPriPool() const { return flags & IN_HIGH_PRI_POOL; }
bool HasHit() const { return flags & HAS_HIT; }
bool IsSecondaryCacheCompatible() const {
return flags & IS_TIERED_CACHE_COMPATIBLE;
}
bool IsPending() const { return flags & IS_PENDING; }
bool IsPromoted() const { return flags & IS_PROMOTED; }
void SetInCache(bool in_cache) {
if (in_cache) {
@ -122,10 +139,36 @@ struct LRUHandle {
void SetHit() { flags |= HAS_HIT; }
void SetSecondaryCacheCompatible(bool tiered) {
if (tiered) {
flags |= IS_TIERED_CACHE_COMPATIBLE;
} else {
flags &= ~IS_TIERED_CACHE_COMPATIBLE;
}
}
void SetIncomplete(bool incomp) {
if (incomp) {
flags |= IS_PENDING;
} else {
flags &= ~IS_PENDING;
}
}
void SetPromoted(bool promoted) {
if (promoted) {
flags |= IS_PROMOTED;
} else {
flags &= ~IS_PROMOTED;
}
}
void Free() {
assert(refs == 0);
if (deleter) {
(*deleter)(key(), value);
if (!IsSecondaryCacheCompatible() && info_.deleter) {
(*info_.deleter)(key(), value);
} else if (IsSecondaryCacheCompatible()) {
(*info_.helper->del_cb)(key(), value);
}
delete[] reinterpret_cast<char*>(this);
}
@ -207,7 +250,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard {
LRUCacheShard(size_t capacity, bool strict_capacity_limit,
double high_pri_pool_ratio, bool use_adaptive_mutex,
CacheMetadataChargePolicy metadata_charge_policy,
int max_upper_hash_bits);
int max_upper_hash_bits,
const std::shared_ptr<SecondaryCache>& secondary_cache);
virtual ~LRUCacheShard() override = default;
// Separate from constructor so caller can easily make an array of LRUCache
@ -226,8 +270,32 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard {
size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle,
Cache::Priority priority) override;
virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override;
Cache::Priority priority) override {
return Insert(key, hash, value, charge, deleter, nullptr, handle, priority);
}
virtual Status Insert(const Slice& key, uint32_t hash, void* value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::Handle** handle,
Cache::Priority priority) override {
assert(helper);
return Insert(key, hash, value, charge, nullptr, helper, handle, priority);
}
// If helper_cb is null, the values of the following arguments don't
// matter
virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash,
const ShardedCache::CacheItemHelper* helper,
const ShardedCache::CreateCallback& create_cb,
ShardedCache::Priority priority,
bool wait) override;
virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override {
return Lookup(key, hash, nullptr, nullptr, Cache::Priority::LOW, true);
}
virtual bool Release(Cache::Handle* handle, bool /*useful*/,
bool force_erase) override {
return Release(handle, force_erase);
}
virtual bool IsReady(Cache::Handle* /*handle*/) override { return true; }
virtual void Wait(Cache::Handle* /*handle*/) override {}
virtual bool Ref(Cache::Handle* handle) override;
virtual bool Release(Cache::Handle* handle,
bool force_erase = false) override;
@ -259,6 +327,11 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard {
double GetHighPriPoolRatio();
private:
Status InsertItem(LRUHandle* item, Cache::Handle** handle);
Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
const Cache::CacheItemHelper* helper, Cache::Handle** handle,
Cache::Priority priority);
void LRU_Remove(LRUHandle* e);
void LRU_Insert(LRUHandle* e);
@ -319,6 +392,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard {
// We don't count mutex_ as the cache's internal state so semantically we
// don't mind mutex_ invoking the non-const actions.
mutable port::Mutex mutex_;
std::shared_ptr<SecondaryCache> secondary_cache_;
};
class LRUCache
@ -332,7 +407,8 @@ class LRUCache
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr,
bool use_adaptive_mutex = kDefaultToAdaptiveMutex,
CacheMetadataChargePolicy metadata_charge_policy =
kDontChargeCacheMetadata);
kDontChargeCacheMetadata,
const std::shared_ptr<SecondaryCache>& secondary_cache = nullptr);
virtual ~LRUCache();
virtual const char* Name() const override { return "LRUCache"; }
virtual CacheShard* GetShard(uint32_t shard) override;
@ -341,6 +417,7 @@ class LRUCache
virtual size_t GetCharge(Handle* handle) const override;
virtual uint32_t GetHash(Handle* handle) const override;
virtual void DisownData() override;
virtual void WaitAll(std::vector<Handle*>& /*handles*/) override {}
// Retrieves number of elements in LRU, for unit test purpose only
size_t TEST_GetLRUSize();

View File

@ -7,8 +7,12 @@
#include <string>
#include <vector>
#include "port/port.h"
#include "rocksdb/cache.h"
#include "test_util/testharness.h"
#include "util/coding.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
@ -30,10 +34,10 @@ class LRUCacheTest : public testing::Test {
DeleteCache();
cache_ = reinterpret_cast<LRUCacheShard*>(
port::cacheline_aligned_alloc(sizeof(LRUCacheShard)));
new (cache_)
LRUCacheShard(capacity, false /*strict_capacity_limit*/,
high_pri_pool_ratio, use_adaptive_mutex,
kDontChargeCacheMetadata, 24 /*max_upper_hash_bits*/);
new (cache_) LRUCacheShard(
capacity, false /*strict_capcity_limit*/, high_pri_pool_ratio,
use_adaptive_mutex, kDontChargeCacheMetadata,
24 /*max_upper_hash_bits*/, nullptr /*secondary_cache*/);
}
void Insert(const std::string& key,
@ -192,6 +196,357 @@ TEST_F(LRUCacheTest, EntriesWithPriority) {
ValidateLRUList({"e", "f", "g", "Z", "d"}, 2);
}
class TestSecondaryCache : public SecondaryCache {
public:
explicit TestSecondaryCache(size_t capacity)
: num_inserts_(0), num_lookups_(0) {
cache_ = NewLRUCache(capacity, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
}
~TestSecondaryCache() override { cache_.reset(); }
std::string Name() override { return "TestSecondaryCache"; }
Status Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) override {
size_t size;
char* buf;
Status s;
num_inserts_++;
size = (*helper->size_cb)(value);
buf = new char[size + sizeof(uint64_t)];
EncodeFixed64(buf, size);
s = (*helper->saveto_cb)(value, 0, size, buf + sizeof(uint64_t));
if (!s.ok()) {
delete[] buf;
return s;
}
return cache_->Insert(key, buf, size,
[](const Slice& /*key*/, void* val) -> void {
delete[] static_cast<char*>(val);
});
}
std::unique_ptr<SecondaryCacheHandle> Lookup(
const Slice& key, const Cache::CreateCallback& create_cb,
bool /*wait*/) override {
std::unique_ptr<SecondaryCacheHandle> secondary_handle;
Cache::Handle* handle = cache_->Lookup(key);
num_lookups_++;
if (handle) {
void* value;
size_t charge;
char* ptr = (char*)cache_->Value(handle);
size_t size = DecodeFixed64(ptr);
ptr += sizeof(uint64_t);
Status s = create_cb(ptr, size, &value, &charge);
if (s.ok()) {
secondary_handle.reset(
new TestSecondaryCacheHandle(cache_.get(), handle, value, charge));
} else {
cache_->Release(handle);
}
}
return secondary_handle;
}
void Erase(const Slice& /*key*/) override {}
void WaitAll(std::vector<SecondaryCacheHandle*> /*handles*/) override {}
std::string GetPrintableOptions() const override { return ""; }
uint32_t num_inserts() { return num_inserts_; }
uint32_t num_lookups() { return num_lookups_; }
private:
class TestSecondaryCacheHandle : public SecondaryCacheHandle {
public:
TestSecondaryCacheHandle(Cache* cache, Cache::Handle* handle, void* value,
size_t size)
: cache_(cache), handle_(handle), value_(value), size_(size) {}
~TestSecondaryCacheHandle() override { cache_->Release(handle_); }
bool IsReady() override { return true; }
void Wait() override {}
void* Value() override { return value_; }
size_t Size() override { return size_; }
private:
Cache* cache_;
Cache::Handle* handle_;
void* value_;
size_t size_;
};
std::shared_ptr<Cache> cache_;
uint32_t num_inserts_;
uint32_t num_lookups_;
};
class LRUSecondaryCacheTest : public LRUCacheTest {
public:
LRUSecondaryCacheTest() : fail_create_(false) {}
~LRUSecondaryCacheTest() {}
protected:
class TestItem {
public:
TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) {
memcpy(buf_.get(), buf, size);
}
~TestItem() {}
char* Buf() { return buf_.get(); }
size_t Size() { return size_; }
private:
std::unique_ptr<char[]> buf_;
size_t size_;
};
static size_t SizeCallback(void* obj) {
return reinterpret_cast<TestItem*>(obj)->Size();
}
static Status SaveToCallback(void* obj, size_t offset, size_t size,
void* out) {
TestItem* item = reinterpret_cast<TestItem*>(obj);
char* buf = item->Buf();
EXPECT_EQ(size, item->Size());
EXPECT_EQ(offset, 0);
memcpy(out, buf, size);
return Status::OK();
}
static void DeletionCallback(const Slice& /*key*/, void* obj) {
delete reinterpret_cast<TestItem*>(obj);
}
static Cache::CacheItemHelper helper_;
static Status SaveToCallbackFail(void* /*obj*/, size_t /*offset*/,
size_t /*size*/, void* /*out*/) {
return Status::NotSupported();
}
static Cache::CacheItemHelper helper_fail_;
Cache::CreateCallback test_item_creator =
[&](void* buf, size_t size, void** out_obj, size_t* charge) -> Status {
if (fail_create_) {
return Status::NotSupported();
}
*out_obj = reinterpret_cast<void*>(new TestItem((char*)buf, size));
*charge = size;
return Status::OK();
};
void SetFailCreate(bool fail) { fail_create_ = fail; }
private:
bool fail_create_;
};
Cache::CacheItemHelper LRUSecondaryCacheTest::helper_(
LRUSecondaryCacheTest::SizeCallback, LRUSecondaryCacheTest::SaveToCallback,
LRUSecondaryCacheTest::DeletionCallback);
Cache::CacheItemHelper LRUSecondaryCacheTest::helper_fail_(
LRUSecondaryCacheTest::SizeCallback,
LRUSecondaryCacheTest::SaveToCallbackFail,
LRUSecondaryCacheTest::DeletionCallback);
TEST_F(LRUSecondaryCacheTest, BasicTest) {
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048);
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k2 should be demoted to NVM
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
str2.length()));
Cache::Handle* handle;
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
// This lookup should promote k1 and demote k2
handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
ASSERT_EQ(secondary_cache->num_inserts(), 2u);
ASSERT_EQ(secondary_cache->num_lookups(), 1u);
cache.reset();
secondary_cache.reset();
}
TEST_F(LRUSecondaryCacheTest, BasicFailTest) {
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048);
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_NOK(cache->Insert("k1", item1, nullptr, str1.length()));
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
str1.length()));
Cache::Handle* handle;
handle = cache->Lookup("k2", nullptr, test_item_creator, Cache::Priority::LOW,
true);
ASSERT_EQ(handle, nullptr);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, false);
ASSERT_EQ(handle, nullptr);
cache.reset();
secondary_cache.reset();
}
TEST_F(LRUSecondaryCacheTest, SaveFailTest) {
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048);
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_fail_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to NVM
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_fail_,
str2.length()));
Cache::Handle* handle;
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
// This lookup should fail, since k1 demotion would have failed
handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_EQ(handle, nullptr);
// Since k1 didn't get promoted, k2 should still be in cache
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 1u);
cache.reset();
secondary_cache.reset();
}
TEST_F(LRUSecondaryCacheTest, CreateFailTest) {
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048);
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to NVM
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
str2.length()));
Cache::Handle* handle;
SetFailCreate(true);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
// This lookup should fail, since k1 creation would have failed
handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_EQ(handle, nullptr);
// Since k1 didn't get promoted, k2 should still be in cache
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 1u);
cache.reset();
secondary_cache.reset();
}
TEST_F(LRUSecondaryCacheTest, FullCapacityTest) {
LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048);
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to NVM
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
str2.length()));
Cache::Handle* handle;
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
// This lookup should fail, since k1 promotion would have failed due to
// the block cache being at capacity
Cache::Handle* handle2;
handle2 = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_EQ(handle2, nullptr);
// Since k1 didn't get promoted, k2 should still be in cache
cache->Release(handle);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 1u);
cache.reset();
secondary_cache.reset();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -63,11 +63,42 @@ Status ShardedCache::Insert(const Slice& key, void* value, size_t charge,
->Insert(key, hash, value, charge, deleter, handle, priority);
}
Status ShardedCache::Insert(const Slice& key, void* value,
const CacheItemHelper* helper, size_t charge,
Handle** handle, Priority priority) {
uint32_t hash = HashSlice(key);
if (!helper) {
return Status::InvalidArgument();
}
return GetShard(Shard(hash))
->Insert(key, hash, value, helper, charge, handle, priority);
}
Cache::Handle* ShardedCache::Lookup(const Slice& key, Statistics* /*stats*/) {
uint32_t hash = HashSlice(key);
return GetShard(Shard(hash))->Lookup(key, hash);
}
Cache::Handle* ShardedCache::Lookup(const Slice& key,
const CacheItemHelper* helper,
const CreateCallback& create_cb,
Priority priority, bool wait,
Statistics* /*stats*/) {
uint32_t hash = HashSlice(key);
return GetShard(Shard(hash))
->Lookup(key, hash, helper, create_cb, priority, wait);
}
bool ShardedCache::IsReady(Handle* handle) {
uint32_t hash = GetHash(handle);
return GetShard(Shard(hash))->IsReady(handle);
}
void ShardedCache::Wait(Handle* handle) {
uint32_t hash = GetHash(handle);
GetShard(Shard(hash))->Wait(handle);
}
bool ShardedCache::Ref(Handle* handle) {
uint32_t hash = GetHash(handle);
return GetShard(Shard(hash))->Ref(handle);
@ -78,6 +109,11 @@ bool ShardedCache::Release(Handle* handle, bool force_erase) {
return GetShard(Shard(hash))->Release(handle, force_erase);
}
bool ShardedCache::Release(Handle* handle, bool useful, bool force_erase) {
uint32_t hash = GetHash(handle);
return GetShard(Shard(hash))->Release(handle, useful, force_erase);
}
void ShardedCache::Erase(const Slice& key) {
uint32_t hash = HashSlice(key);
GetShard(Shard(hash))->Erase(key, hash);

25
cache/sharded_cache.h vendored
View File

@ -27,9 +27,20 @@ class CacheShard {
virtual Status Insert(const Slice& key, uint32_t hash, void* value,
size_t charge, DeleterFn deleter,
Cache::Handle** handle, Cache::Priority priority) = 0;
virtual Status Insert(const Slice& key, uint32_t hash, void* value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::Handle** handle, Cache::Priority priority) = 0;
virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) = 0;
virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash,
const Cache::CacheItemHelper* helper,
const Cache::CreateCallback& create_cb,
Cache::Priority priority, bool wait) = 0;
virtual bool Release(Cache::Handle* handle, bool useful,
bool force_erase) = 0;
virtual bool IsReady(Cache::Handle* handle) = 0;
virtual void Wait(Cache::Handle* handle) = 0;
virtual bool Ref(Cache::Handle* handle) = 0;
virtual bool Release(Cache::Handle* handle, bool force_erase = false) = 0;
virtual bool Release(Cache::Handle* handle, bool force_erase) = 0;
virtual void Erase(const Slice& key, uint32_t hash) = 0;
virtual void SetCapacity(size_t capacity) = 0;
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0;
@ -67,6 +78,7 @@ class ShardedCache : public Cache {
virtual const CacheShard* GetShard(uint32_t shard) const = 0;
virtual void* Value(Handle* handle) override = 0;
virtual size_t GetCharge(Handle* handle) const override = 0;
virtual void WaitAll(std::vector<Handle*>& handles) override = 0;
virtual uint32_t GetHash(Handle* handle) const = 0;
virtual void DisownData() override = 0;
@ -77,7 +89,18 @@ class ShardedCache : public Cache {
virtual Status Insert(const Slice& key, void* value, size_t charge,
DeleterFn deleter, Handle** handle,
Priority priority) override;
virtual Status Insert(const Slice& key, void* value,
const CacheItemHelper* helper, size_t chargge,
Handle** handle = nullptr,
Priority priority = Priority::LOW) override;
virtual Handle* Lookup(const Slice& key, Statistics* stats) override;
virtual Handle* Lookup(const Slice& key, const CacheItemHelper* helper,
const CreateCallback& create_cb, Priority priority,
bool wait, Statistics* stats = nullptr) override;
virtual bool Release(Handle* handle, bool useful,
bool force_erase = false) override;
virtual bool IsReady(Handle* handle) override;
virtual void Wait(Handle* handle) override;
virtual bool Ref(Handle* handle) override;
virtual bool Release(Handle* handle, bool force_erase = false) override;
virtual void Erase(const Slice& key) override;

View File

@ -2820,6 +2820,7 @@ class DBBasicTestMultiGet : public DBTestBase {
const char* Name() const override { return "MyBlockCache"; }
using Cache::Insert;
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
@ -2828,6 +2829,7 @@ class DBBasicTestMultiGet : public DBTestBase {
return target_->Insert(key, value, charge, deleter, handle, priority);
}
using Cache::Lookup;
Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override {
num_lookups_++;
Handle* handle = target_->Lookup(key, stats);

View File

@ -446,6 +446,7 @@ class MockCache : public LRUCache {
false /*strict_capacity_limit*/, 0.0 /*high_pri_pool_ratio*/) {
}
using ShardedCache::Insert;
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), Handle** handle,
Priority priority) override {
@ -533,6 +534,7 @@ class LookupLiarCache : public CacheWrapper {
explicit LookupLiarCache(std::shared_ptr<Cache> target)
: CacheWrapper(std::move(target)) {}
using Cache::Lookup;
Handle* Lookup(const Slice& key, Statistics* stats) override {
if (nth_lookup_not_found_ == 1) {
nth_lookup_not_found_ = 0;

View File

@ -826,6 +826,7 @@ class CacheWrapper : public Cache {
const char* Name() const override { return target_->Name(); }
using Cache::Insert;
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
@ -833,12 +834,14 @@ class CacheWrapper : public Cache {
return target_->Insert(key, value, charge, deleter, handle, priority);
}
using Cache::Lookup;
Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override {
return target_->Lookup(key, stats);
}
bool Ref(Handle* handle) override { return target_->Ref(handle); }
using Cache::Release;
bool Release(Handle* handle, bool force_erase = false) override {
return target_->Release(handle, force_erase);
}

View File

@ -36,6 +36,7 @@ namespace ROCKSDB_NAMESPACE {
class Cache;
struct ConfigOptions;
class SecondaryCache;
extern const bool kDefaultToAdaptiveMutex;
@ -89,6 +90,9 @@ struct LRUCacheOptions {
CacheMetadataChargePolicy metadata_charge_policy =
kDefaultCacheMetadataChargePolicy;
// A SecondaryCache instance to use a the non-volatile tier
std::shared_ptr<SecondaryCache> secondary_cache;
LRUCacheOptions() {}
LRUCacheOptions(size_t _capacity, int _num_shard_bits,
bool _strict_capacity_limit, double _high_pri_pool_ratio,
@ -143,6 +147,67 @@ class Cache {
// likely to get evicted than low priority entries.
enum class Priority { HIGH, LOW };
// A set of callbacks to allow objects in the primary block cache to be
// be persisted in a secondary cache. The purpose of the secondary cache
// is to support other ways of caching the object, such as persistent or
// compressed data, that may require the object to be parsed and transformed
// in some way. Since the primary cache holds C++ objects and the secondary
// cache may only hold flat data that doesn't need relocation, these
// callbacks need to be provided by the user of the block
// cache to do the conversion.
// The CacheItemHelper is passed to Insert() and Lookup(). It has pointers
// to callback functions for size, saving and deletion of the
// object. The callbacks are defined in C-style in order to make them
// stateless and not add to the cache metadata size.
// Saving multiple std::function objects will take up 32 bytes per
// function, even if its not bound to an object and does no capture.
//
// All the callbacks are C-style function pointers in order to simplify
// lifecycle management. Objects in the cache can outlive the parent DB,
// so anything required for these operations should be contained in the
// object itself.
//
// The SizeCallback takes a void* pointer to the object and returns the size
// of the persistable data. It can be used by the secondary cache to allocate
// memory if needed.
using SizeCallback = size_t (*)(void* obj);
// The SaveToCallback takes a void* object pointer and saves the persistable
// data into a buffer. The secondary cache may decide to not store it in a
// contiguous buffer, in which case this callback will be called multiple
// times with increasing offset
using SaveToCallback = Status (*)(void* obj, size_t offset, size_t size,
void* out);
// A function pointer type for custom destruction of an entry's
// value. The Cache is responsible for copying and reclaiming space
// for the key, but values are managed by the caller.
using DeleterFn = void (*)(const Slice& key, void* value);
// A struct with pointers to helper functions for spilling items from the
// cache into the secondary cache. May be extended in the future. An
// instance of this struct is expected to outlive the cache.
struct CacheItemHelper {
SizeCallback size_cb;
SaveToCallback saveto_cb;
DeleterFn del_cb;
CacheItemHelper() : size_cb(nullptr), saveto_cb(nullptr), del_cb(nullptr) {}
CacheItemHelper(SizeCallback _size_cb, SaveToCallback _saveto_cb,
DeleterFn _del_cb)
: size_cb(_size_cb), saveto_cb(_saveto_cb), del_cb(_del_cb) {}
};
// The CreateCallback is passed by the block cache user to Lookup(). It
// takes in a buffer from the NVM cache and constructs an object using
// it. The callback doesn't have ownership of the buffer and should
// copy the contents into its own buffer.
// typedef std::function<Status(void* buf, size_t size, void** out_obj,
// size_t* charge)>
// CreateCallback;
using CreateCallback = std::function<Status(void* buf, size_t size,
void** out_obj, size_t* charge)>;
Cache(std::shared_ptr<MemoryAllocator> allocator = nullptr)
: memory_allocator_(std::move(allocator)) {}
// No copying allowed
@ -173,16 +238,11 @@ class Cache {
// Opaque handle to an entry stored in the cache.
struct Handle {};
// A function pointer type for custom destruction of an entry's
// value. The Cache is responsible for copying and reclaiming space
// for the key, but values are managed by the caller.
using DeleterFn = void (*)(const Slice& key, void* value);
// The type of the Cache
virtual const char* Name() const = 0;
// Insert a mapping from key->value into the cache and assign it
// the specified charge against the total cache capacity.
// Insert a mapping from key->value into the volatile cache only
// and assign it // the specified charge against the total cache capacity.
// If strict_capacity_limit is true and cache reaches its full capacity,
// return Status::Incomplete.
//
@ -321,6 +381,104 @@ class Cache {
MemoryAllocator* memory_allocator() const { return memory_allocator_.get(); }
// EXPERIMENTAL
// The following APIs are experimental and might change in the future.
// The Insert and Lookup APIs below are intended to allow cached objects
// to be demoted/promoted between the primary block cache and a secondary
// cache. The secondary cache could be a non-volatile cache, and will
// likely store the object in a different representation more suitable
// for on disk storage. They rely on a per object CacheItemHelper to do
// the conversions.
// The secondary cache may persist across process and system restarts,
// and may even be moved between hosts. Therefore, the cache key must
// be repeatable across restarts/reboots, and globally unique if
// multiple DBs share the same cache and the set of DBs can change
// over time.
// Insert a mapping from key->value into the cache and assign it
// the specified charge against the total cache capacity.
// If strict_capacity_limit is true and cache reaches its full capacity,
// return Status::Incomplete.
//
// The helper argument is saved by the cache and will be used when the
// inserted object is evicted or promoted to the secondary cache. It,
// therefore, must outlive the cache.
//
// If handle is not nullptr, returns a handle that corresponds to the
// mapping. The caller must call this->Release(handle) when the returned
// mapping is no longer needed. In case of error caller is responsible to
// cleanup the value (i.e. calling "deleter").
//
// If handle is nullptr, it is as if Release is called immediately after
// insert. In case of error value will be cleanup.
//
// Regardless of whether the item was inserted into the cache,
// it will attempt to insert it into the secondary cache if one is
// configured, and the helper supports it.
// The cache implementation must support a secondary cache, otherwise
// the item is only inserted into the primary cache. It may
// defer the insertion to the secondary cache as it sees fit.
//
// When the inserted entry is no longer needed, the key and
// value will be passed to "deleter".
virtual Status Insert(const Slice& key, void* value,
const CacheItemHelper* helper, size_t charge,
Handle** handle = nullptr,
Priority priority = Priority::LOW) {
if (!helper) {
return Status::InvalidArgument();
}
return Insert(key, value, charge, helper->del_cb, handle, priority);
}
// Lookup the key in the primary and secondary caches (if one is configured).
// The create_cb callback function object will be used to contruct the
// cached object.
// If none of the caches have the mapping for the key, returns nullptr.
// Else, returns a handle that corresponds to the mapping.
//
// This call may promote the object from the secondary cache (if one is
// configured, and has the given key) to the primary cache.
//
// The helper argument should be provided if the caller wants the lookup
// to include the secondary cache (if one is configured) and the object,
// if it exists, to be promoted to the primary cache. The helper may be
// saved and used later when the object is evicted. Therefore, it must
// outlive the cache.
//
// The handle returned may not be ready. The caller should call IsReady()
// to check if the item value is ready, and call Wait() or WaitAll() if
// its not ready. The caller should then call Value() to check if the
// item was successfully retrieved. If unsuccessful (perhaps due to an
// IO error), Value() will return nullptr.
virtual Handle* Lookup(const Slice& key, const CacheItemHelper* /*helper_cb*/,
const CreateCallback& /*create_cb*/,
Priority /*priority*/, bool /*wait*/,
Statistics* stats = nullptr) {
return Lookup(key, stats);
}
// Release a mapping returned by a previous Lookup(). The "useful"
// parameter specifies whether the data was actually used or not,
// which may be used by the cache implementation to decide whether
// to consider it as a hit for retention purposes.
virtual bool Release(Handle* handle, bool /*useful*/, bool force_erase) {
return Release(handle, force_erase);
}
// Determines if the handle returned by Lookup() has a valid value yet.
virtual bool IsReady(Handle* /*handle*/) { return true; }
// If the handle returned by Lookup() is not ready yet, wait till it
// becomes ready.
// Note: A ready handle doesn't necessarily mean it has a valid value. The
// user should call Value() and check for nullptr.
virtual void Wait(Handle* /*handle*/) {}
// Wait for a vector of handles to become ready. As with Wait(), the user
// should check the Value() of each handle for nullptr
virtual void WaitAll(std::vector<Handle*>& /*handles*/) {}
private:
std::shared_ptr<MemoryAllocator> memory_allocator_;
};

View File

@ -0,0 +1,77 @@
// Copyright (c) 2021, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <stdint.h>
#include <memory>
#include <string>
#include "rocksdb/cache.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
// A handle for lookup result. The handle may not be immediately ready or
// have a valid value. The caller must call isReady() to determine if its
// ready, and call Wait() in order to block until it becomes ready.
// The caller must call value() after it becomes ready to determine if the
// handle successfullly read the item.
class SecondaryCacheHandle {
public:
virtual ~SecondaryCacheHandle() {}
// Returns whether the handle is ready or not
virtual bool IsReady() = 0;
// Block until handle becomes ready
virtual void Wait() = 0;
// Return the value. If nullptr, it means the lookup was unsuccessful
virtual void* Value() = 0;
// Return the size of value
virtual size_t Size() = 0;
};
// SecondaryCache
//
// Cache interface for caching blocks on a secondary tier (which can include
// non-volatile media, or alternate forms of caching such as compressed data)
class SecondaryCache {
public:
virtual ~SecondaryCache() {}
virtual std::string Name() = 0;
// Insert the given value into this cache. The value is not written
// directly. Rather, the SaveToCallback provided by helper_cb will be
// used to extract the persistable data in value, which will be written
// to this tier. The implementation may or may not write it to cache
// depending on the admission control policy, even if the return status is
// success.
virtual Status Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) = 0;
// Lookup the data for the given key in this cache. The create_cb
// will be used to create the object. The handle returned may not be
// ready yet, unless wait=true, in which case Lookup() will block until
// the handle is ready
virtual std::unique_ptr<SecondaryCacheHandle> Lookup(
const Slice& key, const Cache::CreateCallback& create_cb, bool wait) = 0;
// At the discretion of the implementation, erase the data associated
// with key
virtual void Erase(const Slice& key) = 0;
// Wait for a collection of handles to become ready
virtual void WaitAll(std::vector<SecondaryCacheHandle*> handles) = 0;
virtual std::string GetPrintableOptions() const = 0;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -167,6 +167,7 @@ class SimCacheImpl : public SimCache {
cache_->SetStrictCapacityLimit(strict_capacity_limit);
}
using Cache::Insert;
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), Handle** handle,
Priority priority) override {
@ -193,6 +194,7 @@ class SimCacheImpl : public SimCache {
return cache_->Insert(key, value, charge, deleter, handle, priority);
}
using Cache::Lookup;
Handle* Lookup(const Slice& key, Statistics* stats) override {
Handle* h = key_only_cache_->Lookup(key);
if (h != nullptr) {
@ -213,6 +215,7 @@ class SimCacheImpl : public SimCache {
bool Ref(Handle* handle) override { return cache_->Ref(handle); }
using Cache::Release;
bool Release(Handle* handle, bool force_erase = false) override {
return cache_->Release(handle, force_erase);
}