diff --git a/cache/clock_cache.cc b/cache/clock_cache.cc index 7934b378b..e36be4ba7 100644 --- a/cache/clock_cache.cc +++ b/cache/clock_cache.cc @@ -271,7 +271,27 @@ class ClockCacheShard final : public CacheShard { Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Cache::Handle** handle, Cache::Priority priority) override; + Status Insert(const Slice& key, uint32_t hash, void* value, + Cache::CacheItemHelperCallback helper_cb, size_t charge, + Cache::Handle** handle, Cache::Priority priority) override { + Cache::DeletionCallback delete_cb; + (*helper_cb)(nullptr, nullptr, &delete_cb); + return Insert(key, hash, value, charge, delete_cb, handle, priority); + } Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + Cache::Handle* Lookup(const Slice& key, uint32_t hash, + Cache::CacheItemHelperCallback /*helper_cb*/, + const Cache::CreateCallback& /*create_cb*/, + Cache::Priority /*priority*/, bool /*wait*/) override { + return Lookup(key, hash); + } + bool Release(Cache::Handle* handle, bool /*useful*/, + bool force_erase) override { + return Release(handle, force_erase); + } + bool isReady(Cache::Handle* /*handle*/) override { return true; } + void Wait(Cache::Handle* /*handle*/) override {} + // If the entry in in cache, increase reference count and return true. // Return false otherwise. // @@ -748,6 +768,8 @@ class ClockCache final : public ShardedCache { void DisownData() override { shards_ = nullptr; } + void WaitAll(std::vector& /*handles*/) override {} + private: ClockCacheShard* shards_; }; diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index f42190121..139e18105 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -97,7 +97,8 @@ void LRUHandleTable::Resize() { LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy) + CacheMetadataChargePolicy metadata_charge_policy, + const std::shared_ptr& nvm_cache) : capacity_(0), high_pri_pool_usage_(0), strict_capacity_limit_(strict_capacity_limit), @@ -105,7 +106,8 @@ LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, high_pri_pool_capacity_(0), usage_(0), lru_usage_(0), - mutex_(use_adaptive_mutex) { + mutex_(use_adaptive_mutex), + nvm_cache_(nvm_cache) { set_metadata_charge_policy(metadata_charge_policy); // Make empty circular linked list lru_.next = &lru_; @@ -256,8 +258,13 @@ void LRUCacheShard::SetCapacity(size_t capacity) { EvictFromLRU(0, &last_reference_list); } + // Try to insert the evicted entries into NVM cache // Free the entries outside of mutex for performance reasons for (auto entry : last_reference_list) { + if (nvm_cache_ && entry->IsNvmCompatible() && !entry->IsPromoted()) { + nvm_cache_->Insert(entry->key(), entry->value, entry->info_.helper_cb) + .PermitUncheckedError(); + } entry->Free(); } } @@ -267,17 +274,126 @@ void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { strict_capacity_limit_ = strict_capacity_limit; } -Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) { - MutexLock l(&mutex_); - LRUHandle* e = table_.Lookup(key, hash); - if (e != nullptr) { - assert(e->InCache()); - if (!e->HasRefs()) { - // The entry is in LRU since it's in hash and has no external references - LRU_Remove(e); +Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) { + Status s = Status::OK(); + autovector last_reference_list; + size_t total_charge = e->CalcTotalCharge(metadata_charge_policy_); + + { + MutexLock l(&mutex_); + + // Free the space following strict LRU policy until enough space + // is freed or the lru list is empty + EvictFromLRU(total_charge, &last_reference_list); + + if ((usage_ + total_charge) > capacity_ && + (strict_capacity_limit_ || handle == nullptr)) { + if (handle == nullptr) { + // Don't insert the entry but still return ok, as if the entry inserted + // into cache and get evicted immediately. + e->SetInCache(false); + last_reference_list.push_back(e); + } else { + delete[] reinterpret_cast(e); + *handle = nullptr; + s = Status::Incomplete("Insert failed due to LRU cache being full."); + } + } else { + // Insert into the cache. Note that the cache might get larger than its + // capacity if not enough space was freed up. + LRUHandle* old = table_.Insert(e); + usage_ += total_charge; + if (old != nullptr) { + s = Status::OkOverwritten(); + assert(old->InCache()); + old->SetInCache(false); + if (!old->HasRefs()) { + // old is on LRU because it's in cache and its reference count is 0 + LRU_Remove(old); + size_t old_total_charge = + old->CalcTotalCharge(metadata_charge_policy_); + assert(usage_ >= old_total_charge); + usage_ -= old_total_charge; + last_reference_list.push_back(old); + } + } + if (handle == nullptr) { + LRU_Insert(e); + } else { + e->Ref(); + *handle = reinterpret_cast(e); + } + } + } + + // Try to insert the evicted entries into NVM cache + // Free the entries here outside of mutex for performance reasons + for (auto entry : last_reference_list) { + if (nvm_cache_ && entry->IsNvmCompatible() && !entry->IsPromoted()) { + nvm_cache_->Insert(entry->key(), entry->value, entry->info_.helper_cb) + .PermitUncheckedError(); + } + entry->Free(); + } + + return s; +} + +Cache::Handle* LRUCacheShard::Lookup( + const Slice& key, uint32_t hash, + ShardedCache::CacheItemHelperCallback helper_cb, + const ShardedCache::CreateCallback& create_cb, Cache::Priority priority, + bool wait) { + LRUHandle* e = nullptr; + { + MutexLock l(&mutex_); + e = table_.Lookup(key, hash); + if (e != nullptr) { + assert(e->InCache()); + if (!e->HasRefs()) { + // The entry is in LRU since it's in hash and has no external references + LRU_Remove(e); + } + e->Ref(); + e->SetHit(); + } + } + + // If handle table lookup failed, then allocate a handle outside the + // mutex if we're going to lookup in the NVM cache + // Only support synchronous for now + // TODO: Support asynchronous lookup in NVM cache + if (!e && nvm_cache_ && helper_cb && wait) { + assert(create_cb); + std::unique_ptr nvm_handle = + nvm_cache_->Lookup(key, create_cb, wait); + if (nvm_handle != nullptr) { + e = reinterpret_cast( + new char[sizeof(LRUHandle) - 1 + key.size()]); + + e->flags = 0; + e->SetPromoted(true); + e->SetNvmCompatible(true); + e->info_.helper_cb = helper_cb; + e->charge = nvm_handle->Size(); + e->key_length = key.size(); + e->hash = hash; + e->refs = 0; + e->next = e->prev = nullptr; + e->SetInCache(true); + e->SetPriority(priority); + memcpy(e->key_data, key.data(), key.size()); + + e->value = nvm_handle->Value(); + e->charge = nvm_handle->Size(); + + // This call could nullify e if the cache is over capacity and + // strict_capacity_limit_ is true. In such a case, the caller will try + // to insert later, which might again fail, but its ok as this should + // not be common + InsertItem(e, reinterpret_cast(&e)) + .PermitUncheckedError(); } - e->Ref(); - e->SetHit(); } return reinterpret_cast(e); } @@ -338,81 +454,32 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) { Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), + Cache::CacheItemHelperCallback helper_cb, Cache::Handle** handle, Cache::Priority priority) { // Allocate the memory here outside of the mutex // If the cache is full, we'll have to release it // It shouldn't happen very often though. LRUHandle* e = reinterpret_cast( new char[sizeof(LRUHandle) - 1 + key.size()]); - Status s = Status::OK(); - autovector last_reference_list; e->value = value; - e->deleter = deleter; + e->flags = 0; + if (helper_cb) { + e->SetNvmCompatible(true); + e->info_.helper_cb = helper_cb; + } else { + e->info_.deleter = deleter; + } e->charge = charge; e->key_length = key.size(); - e->flags = 0; e->hash = hash; e->refs = 0; e->next = e->prev = nullptr; e->SetInCache(true); e->SetPriority(priority); memcpy(e->key_data, key.data(), key.size()); - size_t total_charge = e->CalcTotalCharge(metadata_charge_policy_); - { - MutexLock l(&mutex_); - - // Free the space following strict LRU policy until enough space - // is freed or the lru list is empty - EvictFromLRU(total_charge, &last_reference_list); - - if ((usage_ + total_charge) > capacity_ && - (strict_capacity_limit_ || handle == nullptr)) { - if (handle == nullptr) { - // Don't insert the entry but still return ok, as if the entry inserted - // into cache and get evicted immediately. - e->SetInCache(false); - last_reference_list.push_back(e); - } else { - delete[] reinterpret_cast(e); - *handle = nullptr; - s = Status::Incomplete("Insert failed due to LRU cache being full."); - } - } else { - // Insert into the cache. Note that the cache might get larger than its - // capacity if not enough space was freed up. - LRUHandle* old = table_.Insert(e); - usage_ += total_charge; - if (old != nullptr) { - s = Status::OkOverwritten(); - assert(old->InCache()); - old->SetInCache(false); - if (!old->HasRefs()) { - // old is on LRU because it's in cache and its reference count is 0 - LRU_Remove(old); - size_t old_total_charge = - old->CalcTotalCharge(metadata_charge_policy_); - assert(usage_ >= old_total_charge); - usage_ -= old_total_charge; - last_reference_list.push_back(old); - } - } - if (handle == nullptr) { - LRU_Insert(e); - } else { - e->Ref(); - *handle = reinterpret_cast(e); - } - } - } - - // Free the entries here outside of mutex for performance reasons - for (auto entry : last_reference_list) { - entry->Free(); - } - - return s; + return InsertItem(e, handle); } void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { @@ -468,7 +535,8 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, std::shared_ptr allocator, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy) + CacheMetadataChargePolicy metadata_charge_policy, + const std::shared_ptr& nvm_cache) : ShardedCache(capacity, num_shard_bits, strict_capacity_limit, std::move(allocator)) { num_shards_ = 1 << num_shard_bits; @@ -478,7 +546,7 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits, for (int i = 0; i < num_shards_; i++) { new (&shards_[i]) LRUCacheShard(per_shard, strict_capacity_limit, high_pri_pool_ratio, - use_adaptive_mutex, metadata_charge_policy); + use_adaptive_mutex, metadata_charge_policy, nvm_cache); } } @@ -543,19 +611,12 @@ double LRUCache::GetHighPriPoolRatio() { return result; } -std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts) { - return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, - cache_opts.strict_capacity_limit, - cache_opts.high_pri_pool_ratio, - cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, - cache_opts.metadata_charge_policy); -} - std::shared_ptr NewLRUCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy) { + CacheMetadataChargePolicy metadata_charge_policy, + const std::shared_ptr& nvm_cache) { if (num_shard_bits >= 20) { return nullptr; // the cache cannot be sharded into too many fine pieces } @@ -568,7 +629,25 @@ std::shared_ptr NewLRUCache( } return std::make_shared( capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, - std::move(memory_allocator), use_adaptive_mutex, metadata_charge_policy); + std::move(memory_allocator), use_adaptive_mutex, metadata_charge_policy, + nvm_cache); } +std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts) { + return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, + cache_opts.strict_capacity_limit, + cache_opts.high_pri_pool_ratio, + cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, + cache_opts.metadata_charge_policy, cache_opts.nvm_cache); +} + +std::shared_ptr NewLRUCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr memory_allocator, bool use_adaptive_mutex, + CacheMetadataChargePolicy metadata_charge_policy) { + return NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, + high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, + metadata_charge_policy, nullptr); +} } // namespace ROCKSDB_NAMESPACE diff --git a/cache/lru_cache.h b/cache/lru_cache.h index cee3f148b..85f5ed003 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -11,9 +11,9 @@ #include #include "cache/sharded_cache.h" - #include "port/malloc.h" #include "port/port.h" +#include "rocksdb/nvm_cache.h" #include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -49,7 +49,14 @@ namespace ROCKSDB_NAMESPACE { struct LRUHandle { void* value; - void (*deleter)(const Slice&, void* value); + union Info { + Info() {} + ~Info() {} + void (*deleter)(const Slice&, void* value); + ShardedCache::CacheItemHelperCallback helper_cb; + // This needs to be explicitly constructed and destructed + std::unique_ptr nvm_handle; + } info_; LRUHandle* next_hash; LRUHandle* next; LRUHandle* prev; @@ -69,6 +76,12 @@ struct LRUHandle { IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry has had any lookups (hits). HAS_HIT = (1 << 3), + // Can this be inserted into the NVM cache + IS_NVM_COMPATIBLE = (1 << 4), + // Is the handle still being read from NVM + IS_INCOMPLETE = (1 << 5), + // Has the item been promoted from NVM + IS_PROMOTED = (1 << 6), }; uint8_t flags; @@ -95,6 +108,9 @@ struct LRUHandle { bool IsHighPri() const { return flags & IS_HIGH_PRI; } bool InHighPriPool() const { return flags & IN_HIGH_PRI_POOL; } bool HasHit() const { return flags & HAS_HIT; } + bool IsNvmCompatible() const { return flags & IS_NVM_COMPATIBLE; } + bool IsIncomplete() const { return flags & IS_INCOMPLETE; } + bool IsPromoted() const { return flags & IS_PROMOTED; } void SetInCache(bool in_cache) { if (in_cache) { @@ -122,10 +138,38 @@ struct LRUHandle { void SetHit() { flags |= HAS_HIT; } + void SetNvmCompatible(bool nvm) { + if (nvm) { + flags |= IS_NVM_COMPATIBLE; + } else { + flags &= ~IS_NVM_COMPATIBLE; + } + } + + void SetIncomplete(bool incomp) { + if (incomp) { + flags |= IS_INCOMPLETE; + } else { + flags &= ~IS_INCOMPLETE; + } + } + + void SetPromoted(bool promoted) { + if (promoted) { + flags |= IS_PROMOTED; + } else { + flags &= ~IS_PROMOTED; + } + } + void Free() { assert(refs == 0); - if (deleter) { - (*deleter)(key(), value); + if (!IsNvmCompatible() && info_.deleter) { + (*info_.deleter)(key(), value); + } else if (IsNvmCompatible()) { + ShardedCache::DeletionCallback del_cb; + (*info_.helper_cb)(nullptr, nullptr, &del_cb); + (*del_cb)(key(), value); } delete[] reinterpret_cast(this); } @@ -193,7 +237,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { public: LRUCacheShard(size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy); + CacheMetadataChargePolicy metadata_charge_policy, + const std::shared_ptr& nvm_cache); virtual ~LRUCacheShard() override = default; // Separate from constructor so caller can easily make an array of LRUCache @@ -212,8 +257,32 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { size_t charge, void (*deleter)(const Slice& key, void* value), Cache::Handle** handle, - Cache::Priority priority) override; - virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + Cache::Priority priority) override { + return Insert(key, hash, value, charge, deleter, nullptr, handle, priority); + } + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + Cache::CacheItemHelperCallback helper_cb, size_t charge, + Cache::Handle** handle, + Cache::Priority priority) override { + return Insert(key, hash, value, charge, nullptr, helper_cb, handle, + priority); + } + // If helper_cb is null, the values of the following arguments don't + // matter + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash, + ShardedCache::CacheItemHelperCallback helper_cb, + const ShardedCache::CreateCallback& create_cb, + ShardedCache::Priority priority, + bool wait) override; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override { + return Lookup(key, hash, nullptr, nullptr, Cache::Priority::LOW, true); + } + virtual bool Release(Cache::Handle* handle, bool /*useful*/, + bool force_erase) override { + return Release(handle, force_erase); + } + virtual bool isReady(Cache::Handle* /*handle*/) override { return true; } + virtual void Wait(Cache::Handle* /*handle*/) override {} virtual bool Ref(Cache::Handle* handle) override; virtual bool Release(Cache::Handle* handle, bool force_erase = false) override; @@ -243,6 +312,11 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { double GetHighPriPoolRatio(); private: + Status InsertItem(LRUHandle* item, Cache::Handle** handle); + Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::CacheItemHelperCallback helper_cb, + Cache::Handle** handle, Cache::Priority priority); void LRU_Remove(LRUHandle* e); void LRU_Insert(LRUHandle* e); @@ -303,6 +377,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { // We don't count mutex_ as the cache's internal state so semantically we // don't mind mutex_ invoking the non-const actions. mutable port::Mutex mutex_; + + std::shared_ptr nvm_cache_; }; class LRUCache @@ -316,7 +392,8 @@ class LRUCache std::shared_ptr memory_allocator = nullptr, bool use_adaptive_mutex = kDefaultToAdaptiveMutex, CacheMetadataChargePolicy metadata_charge_policy = - kDontChargeCacheMetadata); + kDontChargeCacheMetadata, + const std::shared_ptr& nvm_cache = nullptr); virtual ~LRUCache(); virtual const char* Name() const override { return "LRUCache"; } virtual CacheShard* GetShard(int shard) override; @@ -325,6 +402,7 @@ class LRUCache virtual size_t GetCharge(Handle* handle) const override; virtual uint32_t GetHash(Handle* handle) const override; virtual void DisownData() override; + virtual void WaitAll(std::vector& /*handles*/) override {} // Retrieves number of elements in LRU, for unit test purpose only size_t TEST_GetLRUSize(); diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index b30227e7b..3a6ce96a4 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -7,8 +7,12 @@ #include #include + #include "port/port.h" +#include "rocksdb/cache.h" #include "test_util/testharness.h" +#include "util/coding.h" +#include "util/random.h" namespace ROCKSDB_NAMESPACE { @@ -32,7 +36,7 @@ class LRUCacheTest : public testing::Test { port::cacheline_aligned_alloc(sizeof(LRUCacheShard))); new (cache_) LRUCacheShard(capacity, false /*strict_capacity_limit*/, high_pri_pool_ratio, use_adaptive_mutex, - kDontChargeCacheMetadata); + kDontChargeCacheMetadata, nullptr /*nvm_cache*/); } void Insert(const std::string& key, @@ -191,6 +195,178 @@ TEST_F(LRUCacheTest, EntriesWithPriority) { ValidateLRUList({"e", "f", "g", "Z", "d"}, 2); } +class TestNvmCache : public NvmCache { + public: + TestNvmCache(size_t capacity) : num_inserts_(0), num_lookups_(0) { + cache_ = NewLRUCache(capacity, 0, false, 0.5, nullptr, + kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); + } + ~TestNvmCache() { cache_.reset(); } + + std::string Name() override { return "TestNvmCache"; } + + Status Insert(const Slice& key, void* value, + Cache::CacheItemHelperCallback helper_cb) override { + Cache::SizeCallback size_cb; + Cache::SaveToCallback save_cb; + size_t size; + char* buf; + Status s; + + num_inserts_++; + (*helper_cb)(&size_cb, &save_cb, nullptr); + size = (*size_cb)(value); + buf = new char[size + sizeof(uint64_t)]; + EncodeFixed64(buf, size); + s = (*save_cb)(value, 0, size, buf + sizeof(uint64_t)); + EXPECT_OK(s); + return cache_->Insert(key, buf, size, + [](const Slice& /*key*/, void* value) -> void { + delete[] reinterpret_cast(value); + }); + } + + std::unique_ptr Lookup(const Slice& key, + const Cache::CreateCallback& create_cb, + bool /*wait*/) override { + std::unique_ptr nvm_handle; + Cache::Handle* handle = cache_->Lookup(key); + num_lookups_++; + if (handle) { + void* value; + size_t charge; + char* ptr = (char*)cache_->Value(handle); + size_t size = DecodeFixed64(ptr); + ptr += sizeof(uint64_t); + Status s = create_cb(ptr, size, &value, &charge); + EXPECT_OK(s); + nvm_handle.reset( + new TestNvmCacheHandle(cache_.get(), handle, value, charge)); + } + return nvm_handle; + } + + void Erase(const Slice& /*key*/) override {} + + void WaitAll(std::vector /*handles*/) override {} + + std::string GetPrintableOptions() const override { return ""; } + + uint32_t num_inserts() { return num_inserts_; } + + uint32_t num_lookups() { return num_lookups_; } + + private: + class TestNvmCacheHandle : public NvmCacheHandle { + public: + TestNvmCacheHandle(Cache* cache, Cache::Handle* handle, void* value, + size_t size) + : cache_(cache), handle_(handle), value_(value), size_(size) {} + ~TestNvmCacheHandle() { + delete[] reinterpret_cast(cache_->Value(handle_)); + cache_->Release(handle_); + } + + bool isReady() override { return true; } + + void Wait() override {} + + void* Value() override { return value_; } + + size_t Size() override { return size_; } + + private: + Cache* cache_; + Cache::Handle* handle_; + void* value_; + size_t size_; + }; + + std::shared_ptr cache_; + uint32_t num_inserts_; + uint32_t num_lookups_; +}; + +TEST_F(LRUCacheTest, TestNvmCache) { + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + std::shared_ptr nvm_cache(new TestNvmCache(2048)); + opts.nvm_cache = nvm_cache; + std::shared_ptr cache = NewLRUCache(opts); + + class TestItem { + public: + TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) { + memcpy(buf_.get(), buf, size); + } + ~TestItem() {} + + char* Buf() { return buf_.get(); } + size_t Size() { return size_; } + + private: + std::unique_ptr buf_; + size_t size_; + }; + + Cache::CacheItemHelperCallback helper_cb = + [](Cache::SizeCallback* size_cb, Cache::SaveToCallback* saveto_cb, + Cache::DeletionCallback* del_cb) -> void { + if (size_cb) { + *size_cb = [](void* obj) -> size_t { + return reinterpret_cast(obj)->Size(); + }; + } + if (saveto_cb) { + *saveto_cb = [](void* obj, size_t offset, size_t size, + void* out) -> Status { + TestItem* item = reinterpret_cast(obj); + char* buf = item->Buf(); + EXPECT_EQ(size, item->Size()); + EXPECT_EQ(offset, 0); + memcpy(out, buf, size); + return Status::OK(); + }; + } + if (del_cb) { + *del_cb = [](const Slice& /*key*/, void* obj) -> void { + delete reinterpret_cast(obj); + }; + } + }; + + int create_count = 0; + Cache::CreateCallback test_item_creator = + [&create_count](void* buf, size_t size, void** out_obj, + size_t* charge) -> Status { + create_count++; + *out_obj = reinterpret_cast(new TestItem((char*)buf, size)); + *charge = size; + return Status::OK(); + }; + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + cache->Insert("k1", item1, helper_cb, str1.length()); + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k2 should be demoted to NVM + cache->Insert("k2", item2, helper_cb, str2.length()); + + Cache::Handle* handle; + handle = cache->Lookup("k2", helper_cb, test_item_creator, + Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should promote k1 and demote k2 + handle = cache->Lookup("k1", helper_cb, test_item_creator, + Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + ASSERT_EQ(nvm_cache->num_inserts(), 2); + ASSERT_EQ(nvm_cache->num_lookups(), 1); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/cache/sharded_cache.cc b/cache/sharded_cache.cc index 6c915df8c..80044afe2 100644 --- a/cache/sharded_cache.cc +++ b/cache/sharded_cache.cc @@ -51,11 +51,39 @@ Status ShardedCache::Insert(const Slice& key, void* value, size_t charge, ->Insert(key, hash, value, charge, deleter, handle, priority); } +Status ShardedCache::Insert(const Slice& key, void* value, + CacheItemHelperCallback helper_cb, size_t charge, + Handle** handle, Priority priority) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash)) + ->Insert(key, hash, value, helper_cb, charge, handle, priority); +} + Cache::Handle* ShardedCache::Lookup(const Slice& key, Statistics* /*stats*/) { uint32_t hash = HashSlice(key); return GetShard(Shard(hash))->Lookup(key, hash); } +Cache::Handle* ShardedCache::Lookup(const Slice& key, + CacheItemHelperCallback helper_cb, + const CreateCallback& create_cb, + Priority priority, bool wait, + Statistics* /*stats*/) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash)) + ->Lookup(key, hash, helper_cb, create_cb, priority, wait); +} + +bool ShardedCache::isReady(Handle* handle) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->isReady(handle); +} + +void ShardedCache::Wait(Handle* handle) { + uint32_t hash = GetHash(handle); + GetShard(Shard(hash))->Wait(handle); +} + bool ShardedCache::Ref(Handle* handle) { uint32_t hash = GetHash(handle); return GetShard(Shard(hash))->Ref(handle); @@ -66,6 +94,11 @@ bool ShardedCache::Release(Handle* handle, bool force_erase) { return GetShard(Shard(hash))->Release(handle, force_erase); } +bool ShardedCache::Release(Handle* handle, bool useful, bool force_erase) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Release(handle, useful, force_erase); +} + void ShardedCache::Erase(const Slice& key) { uint32_t hash = HashSlice(key); GetShard(Shard(hash))->Erase(key, hash); diff --git a/cache/sharded_cache.h b/cache/sharded_cache.h index ce9e459dc..54779d8e3 100644 --- a/cache/sharded_cache.h +++ b/cache/sharded_cache.h @@ -28,9 +28,20 @@ class CacheShard { size_t charge, void (*deleter)(const Slice& key, void* value), Cache::Handle** handle, Cache::Priority priority) = 0; + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + Cache::CacheItemHelperCallback helper_cb, size_t charge, + Cache::Handle** handle, Cache::Priority priority) = 0; virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) = 0; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash, + Cache::CacheItemHelperCallback helper_cb, + const Cache::CreateCallback& create_cb, + Cache::Priority priority, bool wait) = 0; + virtual bool Release(Cache::Handle* handle, bool useful, + bool force_erase) = 0; + virtual bool isReady(Cache::Handle* handle) = 0; + virtual void Wait(Cache::Handle* handle) = 0; virtual bool Ref(Cache::Handle* handle) = 0; - virtual bool Release(Cache::Handle* handle, bool force_erase = false) = 0; + virtual bool Release(Cache::Handle* handle, bool force_erase) = 0; virtual void Erase(const Slice& key, uint32_t hash) = 0; virtual void SetCapacity(size_t capacity) = 0; virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; @@ -62,6 +73,7 @@ class ShardedCache : public Cache { virtual const CacheShard* GetShard(int shard) const = 0; virtual void* Value(Handle* handle) override = 0; virtual size_t GetCharge(Handle* handle) const override = 0; + virtual void WaitAll(std::vector& handles) override = 0; virtual uint32_t GetHash(Handle* handle) const = 0; virtual void DisownData() override = 0; @@ -72,7 +84,18 @@ class ShardedCache : public Cache { virtual Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle, Priority priority) override; + virtual Status Insert(const Slice& key, void* value, + CacheItemHelperCallback helper_cb, size_t chargge, + Handle** handle = nullptr, + Priority priority = Priority::LOW) override; virtual Handle* Lookup(const Slice& key, Statistics* stats) override; + virtual Handle* Lookup(const Slice& key, CacheItemHelperCallback helper_cb, + const CreateCallback& create_cb, Priority priority, + bool wait, Statistics* stats = nullptr) override; + virtual bool Release(Handle* handle, bool useful, + bool force_erase = false) override; + virtual bool isReady(Handle* handle) override; + virtual void Wait(Handle* handle) override; virtual bool Ref(Handle* handle) override; virtual bool Release(Handle* handle, bool force_erase = false) override; virtual void Erase(const Slice& key) override; diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 2f06520c8..3e66bd3fc 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -23,8 +23,11 @@ #pragma once #include + +#include #include #include + #include "rocksdb/memory_allocator.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" @@ -34,6 +37,7 @@ namespace ROCKSDB_NAMESPACE { class Cache; struct ConfigOptions; +class NvmCache; extern const bool kDefaultToAdaptiveMutex; @@ -87,6 +91,9 @@ struct LRUCacheOptions { CacheMetadataChargePolicy metadata_charge_policy = kDefaultCacheMetadataChargePolicy; + // An NvmCache instance to use a the non-volatile tier + std::shared_ptr nvm_cache; + LRUCacheOptions() {} LRUCacheOptions(size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit, double _high_pri_pool_ratio, @@ -137,6 +144,57 @@ class Cache { // likely to get evicted than low priority entries. enum class Priority { HIGH, LOW }; + // A set of callbacks to allow objects in the volatile block cache to be + // be persisted in a NVM cache tier. Since the volatile cache holds C++ + // objects and the NVM cache may only hold flat data that doesn't need + // relocation, these callbacks need to be provided by the user of the block + // cache to do the conversion. + // The CacheItemHelperCallback is passed to Insert(). When invoked, it + // returns the callback functions for size, saving and deletion of the + // object. We do it this way so that the cache implementation only needs to + // save one function pointer in its metadata per object, the + // CacheItemHelperCallback pointer which is a C-style function pointer. + // Saving multiple std::function objects will take up 32 bytes per + // function, even if its not bound to an object and does no capture. The + // other alternative is to take a pointer to another object that implements + // this interface, but that would create issues with managing the object + // lifecycle. + // + // All the callbacks are C-style function pointers in order to simplify + // lifecycle management. Objects in the cache can outlive the parent DB, + // so anything required for these operations should be contained in the + // object itself. + // + // The SizeCallback takes a void* pointer to the object and returns the size + // of the persistable data. It can be used by the NVM cache to allocate + // memory if needed. + typedef size_t (*SizeCallback)(void* obj); + + // The SaveToCallback takes a void* object pointer and saves the persistable + // data into a buffer. The NVM cache may decide to not store it in a + // contiguous buffer, in which case this callback will be called multiple + // times with increasing offset + typedef rocksdb::Status (*SaveToCallback)(void* obj, size_t offset, + size_t size, void* out); + + // DeletionCallback is a function pointer that deletes the cached + // object. The signature matches the old deleter function. + typedef void (*DeletionCallback)(const Slice&, void*); + + // A callback function that returns the size, save to, and deletion + // callbacks. Fill any of size_cb, saveto_cb, del_cb that is non-null + typedef void (*CacheItemHelperCallback)(SizeCallback* size_cb, + SaveToCallback* saveto_cb, + DeletionCallback* del_cb); + + // The CreateCallback is passed by the block cache user to Lookup(). It + // takes in a buffer from the NVM cache and constructs an object using + // it. The callback doesn't have ownership of the buffer and should + // copy the contents into its own buffer. + typedef std::function + CreateCallback; + Cache(std::shared_ptr allocator = nullptr) : memory_allocator_(std::move(allocator)) {} // No copying allowed @@ -170,8 +228,8 @@ class Cache { // The type of the Cache virtual const char* Name() const = 0; - // Insert a mapping from key->value into the cache and assign it - // the specified charge against the total cache capacity. + // Insert a mapping from key->value into the volatile cache only + // and assign it // the specified charge against the total cache capacity. // If strict_capacity_limit is true and cache reaches its full capacity, // return Status::Incomplete. // @@ -190,6 +248,38 @@ class Cache { Handle** handle = nullptr, Priority priority = Priority::LOW) = 0; + // Insert a mapping from key->value into the volatile cache and assign it + // the specified charge against the total cache capacity. + // If strict_capacity_limit is true and cache reaches its full capacity, + // return Status::Incomplete. + // + // If handle is not nullptr, returns a handle that corresponds to the + // mapping. The caller must call this->Release(handle) when the returned + // mapping is no longer needed. In case of error caller is responsible to + // cleanup the value (i.e. calling "deleter"). + // + // If handle is nullptr, it is as if Release is called immediately after + // insert. In case of error value will be cleanup. + // + // Regardless of whether the item was inserted into the volatile cache, + // it will attempt to insert it into the NVM cache if one is configured. + // The block cache implementation must support the NVM tier, otherwise + // the item is only inserted into the volatile tier. It may + // defer the insertion to NVM as it sees fit. The NVM + // cache may or may not write it to NVM depending on its admission + // policy. + // + // When the inserted entry is no longer needed, the key and + // value will be passed to "deleter". + virtual Status Insert(const Slice& key, void* value, + CacheItemHelperCallback helper_cb, size_t charge, + Handle** handle = nullptr, + Priority priority = Priority::LOW) { + DeletionCallback delete_cb; + (*helper_cb)(nullptr, nullptr, &delete_cb); + return Insert(key, value, charge, delete_cb, handle, priority); + } + // If the cache has no mapping for "key", returns nullptr. // // Else return a handle that corresponds to the mapping. The caller @@ -199,6 +289,25 @@ class Cache { // function. virtual Handle* Lookup(const Slice& key, Statistics* stats = nullptr) = 0; + // Lookup the key in the volatile and NVM tiers (if one is configured). + // The create_cb callback function object will be used to contruct the + // cached object. + // If none of the tiers have the mapping for the key, rturns nullptr. + // Else, returns a handle that corresponds to the mapping. + // + // The handle returned may not be ready. The caller should call isReady() + // to check if the item value is ready, and call Wait() or WaitAll() if + // its not ready. The caller should then call Value() to check if the + // item was successfully retrieved. If unsuccessful (perhaps due to an + // IO error), Value() will return nullptr. + virtual Handle* Lookup(const Slice& key, + CacheItemHelperCallback /*helper_cb*/, + const CreateCallback& /*create_cb*/, + Priority /*priority*/, bool /*wait*/, + Statistics* stats = nullptr) { + return Lookup(key, stats); + } + // Increments the reference count for the handle if it refers to an entry in // the cache. Returns true if refcount was incremented; otherwise, returns // false. @@ -219,6 +328,27 @@ class Cache { // REQUIRES: handle must have been returned by a method on *this. virtual bool Release(Handle* handle, bool force_erase = false) = 0; + // Release a mapping returned by a previous Lookup(). The "useful" + // parameter specifies whether the data was actually used or not, + // which may be used by the cache implementation to decide whether + // to consider it as a hit for retention purposes. + virtual bool Release(Handle* handle, bool /*useful*/, bool force_erase) { + return Release(handle, force_erase); + } + + // Determines if the handle returned by Lookup() has a valid value yet. + virtual bool isReady(Handle* /*handle*/) { return true; } + + // If the handle returned by Lookup() is not ready yet, wait till it + // becomes ready. + // Note: A ready handle doesn't necessarily mean it has a valid value. The + // user should call Value() and check for nullptr. + virtual void Wait(Handle* /*handle*/) {} + + // Wait for a vector of handles to become ready. As with Wait(), the user + // should check the Value() of each handle for nullptr + virtual void WaitAll(std::vector& /*handles*/) {} + // Return the value encapsulated in a handle returned by a // successful Lookup(). // REQUIRES: handle must not have been released yet.