From 5a165416ac6466a00a5f1db9cdcb53d1d170c279 Mon Sep 17 00:00:00 2001 From: anand76 Date: Wed, 24 Mar 2021 13:14:14 -0700 Subject: [PATCH 01/15] Update HISTORY.md --- HISTORY.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index e4ee4dbb2..083219405 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,6 @@ # Rocksdb Change Log +## Unreleased + ## 6.19.0 (03/21/2021) ### Bug Fixes * Fixed the truncation error found in APIs/tools when dumping block-based SST files in a human-readable format. After fix, the block-based table can be fully dumped as a readable file. From 5732c039a4d5dabb1afdf814c845853db8b8fefd Mon Sep 17 00:00:00 2001 From: Zhichao Cao Date: Wed, 24 Mar 2021 13:27:12 -0700 Subject: [PATCH 02/15] try the push --- HISTORY.md | 1 + 1 file changed, 1 insertion(+) diff --git a/HISTORY.md b/HISTORY.md index 083219405..86fd20d03 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,6 +1,7 @@ # Rocksdb Change Log ## Unreleased + ## 6.19.0 (03/21/2021) ### Bug Fixes * Fixed the truncation error found in APIs/tools when dumping block-based SST files in a human-readable format. After fix, the block-based table can be fully dumped as a readable file. From 9cc94bfbd30892d807d3799822c5ffdbd754425b Mon Sep 17 00:00:00 2001 From: anand76 Date: Thu, 25 Mar 2021 14:19:56 -0700 Subject: [PATCH 03/15] Initial support for NVM cache in LRUCache Summary: Only support synchronous lookup currently. --- cache/clock_cache.cc | 22 ++++ cache/lru_cache.cc | 243 ++++++++++++++++++++++++++-------------- cache/lru_cache.h | 94 ++++++++++++++-- cache/lru_cache_test.cc | 178 ++++++++++++++++++++++++++++- cache/sharded_cache.cc | 33 ++++++ cache/sharded_cache.h | 25 ++++- include/rocksdb/cache.h | 134 +++++++++++++++++++++- 7 files changed, 635 insertions(+), 94 deletions(-) diff --git a/cache/clock_cache.cc b/cache/clock_cache.cc index 7934b378b..e36be4ba7 100644 --- a/cache/clock_cache.cc +++ b/cache/clock_cache.cc @@ -271,7 +271,27 @@ class ClockCacheShard final : public CacheShard { Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Cache::Handle** handle, Cache::Priority priority) override; + Status Insert(const Slice& key, uint32_t hash, void* value, + Cache::CacheItemHelperCallback helper_cb, size_t charge, + Cache::Handle** handle, Cache::Priority priority) override { + Cache::DeletionCallback delete_cb; + (*helper_cb)(nullptr, nullptr, &delete_cb); + return Insert(key, hash, value, charge, delete_cb, handle, priority); + } Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + Cache::Handle* Lookup(const Slice& key, uint32_t hash, + Cache::CacheItemHelperCallback /*helper_cb*/, + const Cache::CreateCallback& /*create_cb*/, + Cache::Priority /*priority*/, bool /*wait*/) override { + return Lookup(key, hash); + } + bool Release(Cache::Handle* handle, bool /*useful*/, + bool force_erase) override { + return Release(handle, force_erase); + } + bool isReady(Cache::Handle* /*handle*/) override { return true; } + void Wait(Cache::Handle* /*handle*/) override {} + // If the entry in in cache, increase reference count and return true. // Return false otherwise. // @@ -748,6 +768,8 @@ class ClockCache final : public ShardedCache { void DisownData() override { shards_ = nullptr; } + void WaitAll(std::vector& /*handles*/) override {} + private: ClockCacheShard* shards_; }; diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index f42190121..139e18105 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -97,7 +97,8 @@ void LRUHandleTable::Resize() { LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy) + CacheMetadataChargePolicy metadata_charge_policy, + const std::shared_ptr& nvm_cache) : capacity_(0), high_pri_pool_usage_(0), strict_capacity_limit_(strict_capacity_limit), @@ -105,7 +106,8 @@ LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, high_pri_pool_capacity_(0), usage_(0), lru_usage_(0), - mutex_(use_adaptive_mutex) { + mutex_(use_adaptive_mutex), + nvm_cache_(nvm_cache) { set_metadata_charge_policy(metadata_charge_policy); // Make empty circular linked list lru_.next = &lru_; @@ -256,8 +258,13 @@ void LRUCacheShard::SetCapacity(size_t capacity) { EvictFromLRU(0, &last_reference_list); } + // Try to insert the evicted entries into NVM cache // Free the entries outside of mutex for performance reasons for (auto entry : last_reference_list) { + if (nvm_cache_ && entry->IsNvmCompatible() && !entry->IsPromoted()) { + nvm_cache_->Insert(entry->key(), entry->value, entry->info_.helper_cb) + .PermitUncheckedError(); + } entry->Free(); } } @@ -267,17 +274,126 @@ void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { strict_capacity_limit_ = strict_capacity_limit; } -Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) { - MutexLock l(&mutex_); - LRUHandle* e = table_.Lookup(key, hash); - if (e != nullptr) { - assert(e->InCache()); - if (!e->HasRefs()) { - // The entry is in LRU since it's in hash and has no external references - LRU_Remove(e); +Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) { + Status s = Status::OK(); + autovector last_reference_list; + size_t total_charge = e->CalcTotalCharge(metadata_charge_policy_); + + { + MutexLock l(&mutex_); + + // Free the space following strict LRU policy until enough space + // is freed or the lru list is empty + EvictFromLRU(total_charge, &last_reference_list); + + if ((usage_ + total_charge) > capacity_ && + (strict_capacity_limit_ || handle == nullptr)) { + if (handle == nullptr) { + // Don't insert the entry but still return ok, as if the entry inserted + // into cache and get evicted immediately. + e->SetInCache(false); + last_reference_list.push_back(e); + } else { + delete[] reinterpret_cast(e); + *handle = nullptr; + s = Status::Incomplete("Insert failed due to LRU cache being full."); + } + } else { + // Insert into the cache. Note that the cache might get larger than its + // capacity if not enough space was freed up. + LRUHandle* old = table_.Insert(e); + usage_ += total_charge; + if (old != nullptr) { + s = Status::OkOverwritten(); + assert(old->InCache()); + old->SetInCache(false); + if (!old->HasRefs()) { + // old is on LRU because it's in cache and its reference count is 0 + LRU_Remove(old); + size_t old_total_charge = + old->CalcTotalCharge(metadata_charge_policy_); + assert(usage_ >= old_total_charge); + usage_ -= old_total_charge; + last_reference_list.push_back(old); + } + } + if (handle == nullptr) { + LRU_Insert(e); + } else { + e->Ref(); + *handle = reinterpret_cast(e); + } + } + } + + // Try to insert the evicted entries into NVM cache + // Free the entries here outside of mutex for performance reasons + for (auto entry : last_reference_list) { + if (nvm_cache_ && entry->IsNvmCompatible() && !entry->IsPromoted()) { + nvm_cache_->Insert(entry->key(), entry->value, entry->info_.helper_cb) + .PermitUncheckedError(); + } + entry->Free(); + } + + return s; +} + +Cache::Handle* LRUCacheShard::Lookup( + const Slice& key, uint32_t hash, + ShardedCache::CacheItemHelperCallback helper_cb, + const ShardedCache::CreateCallback& create_cb, Cache::Priority priority, + bool wait) { + LRUHandle* e = nullptr; + { + MutexLock l(&mutex_); + e = table_.Lookup(key, hash); + if (e != nullptr) { + assert(e->InCache()); + if (!e->HasRefs()) { + // The entry is in LRU since it's in hash and has no external references + LRU_Remove(e); + } + e->Ref(); + e->SetHit(); + } + } + + // If handle table lookup failed, then allocate a handle outside the + // mutex if we're going to lookup in the NVM cache + // Only support synchronous for now + // TODO: Support asynchronous lookup in NVM cache + if (!e && nvm_cache_ && helper_cb && wait) { + assert(create_cb); + std::unique_ptr nvm_handle = + nvm_cache_->Lookup(key, create_cb, wait); + if (nvm_handle != nullptr) { + e = reinterpret_cast( + new char[sizeof(LRUHandle) - 1 + key.size()]); + + e->flags = 0; + e->SetPromoted(true); + e->SetNvmCompatible(true); + e->info_.helper_cb = helper_cb; + e->charge = nvm_handle->Size(); + e->key_length = key.size(); + e->hash = hash; + e->refs = 0; + e->next = e->prev = nullptr; + e->SetInCache(true); + e->SetPriority(priority); + memcpy(e->key_data, key.data(), key.size()); + + e->value = nvm_handle->Value(); + e->charge = nvm_handle->Size(); + + // This call could nullify e if the cache is over capacity and + // strict_capacity_limit_ is true. In such a case, the caller will try + // to insert later, which might again fail, but its ok as this should + // not be common + InsertItem(e, reinterpret_cast(&e)) + .PermitUncheckedError(); } - e->Ref(); - e->SetHit(); } return reinterpret_cast(e); } @@ -338,81 +454,32 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) { Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), + Cache::CacheItemHelperCallback helper_cb, Cache::Handle** handle, Cache::Priority priority) { // Allocate the memory here outside of the mutex // If the cache is full, we'll have to release it // It shouldn't happen very often though. LRUHandle* e = reinterpret_cast( new char[sizeof(LRUHandle) - 1 + key.size()]); - Status s = Status::OK(); - autovector last_reference_list; e->value = value; - e->deleter = deleter; + e->flags = 0; + if (helper_cb) { + e->SetNvmCompatible(true); + e->info_.helper_cb = helper_cb; + } else { + e->info_.deleter = deleter; + } e->charge = charge; e->key_length = key.size(); - e->flags = 0; e->hash = hash; e->refs = 0; e->next = e->prev = nullptr; e->SetInCache(true); e->SetPriority(priority); memcpy(e->key_data, key.data(), key.size()); - size_t total_charge = e->CalcTotalCharge(metadata_charge_policy_); - { - MutexLock l(&mutex_); - - // Free the space following strict LRU policy until enough space - // is freed or the lru list is empty - EvictFromLRU(total_charge, &last_reference_list); - - if ((usage_ + total_charge) > capacity_ && - (strict_capacity_limit_ || handle == nullptr)) { - if (handle == nullptr) { - // Don't insert the entry but still return ok, as if the entry inserted - // into cache and get evicted immediately. - e->SetInCache(false); - last_reference_list.push_back(e); - } else { - delete[] reinterpret_cast(e); - *handle = nullptr; - s = Status::Incomplete("Insert failed due to LRU cache being full."); - } - } else { - // Insert into the cache. Note that the cache might get larger than its - // capacity if not enough space was freed up. - LRUHandle* old = table_.Insert(e); - usage_ += total_charge; - if (old != nullptr) { - s = Status::OkOverwritten(); - assert(old->InCache()); - old->SetInCache(false); - if (!old->HasRefs()) { - // old is on LRU because it's in cache and its reference count is 0 - LRU_Remove(old); - size_t old_total_charge = - old->CalcTotalCharge(metadata_charge_policy_); - assert(usage_ >= old_total_charge); - usage_ -= old_total_charge; - last_reference_list.push_back(old); - } - } - if (handle == nullptr) { - LRU_Insert(e); - } else { - e->Ref(); - *handle = reinterpret_cast(e); - } - } - } - - // Free the entries here outside of mutex for performance reasons - for (auto entry : last_reference_list) { - entry->Free(); - } - - return s; + return InsertItem(e, handle); } void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { @@ -468,7 +535,8 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, std::shared_ptr allocator, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy) + CacheMetadataChargePolicy metadata_charge_policy, + const std::shared_ptr& nvm_cache) : ShardedCache(capacity, num_shard_bits, strict_capacity_limit, std::move(allocator)) { num_shards_ = 1 << num_shard_bits; @@ -478,7 +546,7 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits, for (int i = 0; i < num_shards_; i++) { new (&shards_[i]) LRUCacheShard(per_shard, strict_capacity_limit, high_pri_pool_ratio, - use_adaptive_mutex, metadata_charge_policy); + use_adaptive_mutex, metadata_charge_policy, nvm_cache); } } @@ -543,19 +611,12 @@ double LRUCache::GetHighPriPoolRatio() { return result; } -std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts) { - return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, - cache_opts.strict_capacity_limit, - cache_opts.high_pri_pool_ratio, - cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, - cache_opts.metadata_charge_policy); -} - std::shared_ptr NewLRUCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy) { + CacheMetadataChargePolicy metadata_charge_policy, + const std::shared_ptr& nvm_cache) { if (num_shard_bits >= 20) { return nullptr; // the cache cannot be sharded into too many fine pieces } @@ -568,7 +629,25 @@ std::shared_ptr NewLRUCache( } return std::make_shared( capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, - std::move(memory_allocator), use_adaptive_mutex, metadata_charge_policy); + std::move(memory_allocator), use_adaptive_mutex, metadata_charge_policy, + nvm_cache); } +std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts) { + return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, + cache_opts.strict_capacity_limit, + cache_opts.high_pri_pool_ratio, + cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, + cache_opts.metadata_charge_policy, cache_opts.nvm_cache); +} + +std::shared_ptr NewLRUCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr memory_allocator, bool use_adaptive_mutex, + CacheMetadataChargePolicy metadata_charge_policy) { + return NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, + high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, + metadata_charge_policy, nullptr); +} } // namespace ROCKSDB_NAMESPACE diff --git a/cache/lru_cache.h b/cache/lru_cache.h index 32edd9ec9..38b6a4a5c 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -11,9 +11,9 @@ #include #include "cache/sharded_cache.h" - #include "port/malloc.h" #include "port/port.h" +#include "rocksdb/nvm_cache.h" #include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -49,7 +49,14 @@ namespace ROCKSDB_NAMESPACE { struct LRUHandle { void* value; - void (*deleter)(const Slice&, void* value); + union Info { + Info() {} + ~Info() {} + void (*deleter)(const Slice&, void* value); + ShardedCache::CacheItemHelperCallback helper_cb; + // This needs to be explicitly constructed and destructed + std::unique_ptr nvm_handle; + } info_; LRUHandle* next_hash; LRUHandle* next; LRUHandle* prev; @@ -69,6 +76,12 @@ struct LRUHandle { IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry has had any lookups (hits). HAS_HIT = (1 << 3), + // Can this be inserted into the NVM cache + IS_NVM_COMPATIBLE = (1 << 4), + // Is the handle still being read from NVM + IS_INCOMPLETE = (1 << 5), + // Has the item been promoted from NVM + IS_PROMOTED = (1 << 6), }; uint8_t flags; @@ -95,6 +108,9 @@ struct LRUHandle { bool IsHighPri() const { return flags & IS_HIGH_PRI; } bool InHighPriPool() const { return flags & IN_HIGH_PRI_POOL; } bool HasHit() const { return flags & HAS_HIT; } + bool IsNvmCompatible() const { return flags & IS_NVM_COMPATIBLE; } + bool IsIncomplete() const { return flags & IS_INCOMPLETE; } + bool IsPromoted() const { return flags & IS_PROMOTED; } void SetInCache(bool in_cache) { if (in_cache) { @@ -122,10 +138,38 @@ struct LRUHandle { void SetHit() { flags |= HAS_HIT; } + void SetNvmCompatible(bool nvm) { + if (nvm) { + flags |= IS_NVM_COMPATIBLE; + } else { + flags &= ~IS_NVM_COMPATIBLE; + } + } + + void SetIncomplete(bool incomp) { + if (incomp) { + flags |= IS_INCOMPLETE; + } else { + flags &= ~IS_INCOMPLETE; + } + } + + void SetPromoted(bool promoted) { + if (promoted) { + flags |= IS_PROMOTED; + } else { + flags &= ~IS_PROMOTED; + } + } + void Free() { assert(refs == 0); - if (deleter) { - (*deleter)(key(), value); + if (!IsNvmCompatible() && info_.deleter) { + (*info_.deleter)(key(), value); + } else if (IsNvmCompatible()) { + ShardedCache::DeletionCallback del_cb; + (*info_.helper_cb)(nullptr, nullptr, &del_cb); + (*del_cb)(key(), value); } delete[] reinterpret_cast(this); } @@ -193,7 +237,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { public: LRUCacheShard(size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy); + CacheMetadataChargePolicy metadata_charge_policy, + const std::shared_ptr& nvm_cache); virtual ~LRUCacheShard() override = default; // Separate from constructor so caller can easily make an array of LRUCache @@ -212,8 +257,32 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { size_t charge, void (*deleter)(const Slice& key, void* value), Cache::Handle** handle, - Cache::Priority priority) override; - virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + Cache::Priority priority) override { + return Insert(key, hash, value, charge, deleter, nullptr, handle, priority); + } + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + Cache::CacheItemHelperCallback helper_cb, size_t charge, + Cache::Handle** handle, + Cache::Priority priority) override { + return Insert(key, hash, value, charge, nullptr, helper_cb, handle, + priority); + } + // If helper_cb is null, the values of the following arguments don't + // matter + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash, + ShardedCache::CacheItemHelperCallback helper_cb, + const ShardedCache::CreateCallback& create_cb, + ShardedCache::Priority priority, + bool wait) override; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override { + return Lookup(key, hash, nullptr, nullptr, Cache::Priority::LOW, true); + } + virtual bool Release(Cache::Handle* handle, bool /*useful*/, + bool force_erase) override { + return Release(handle, force_erase); + } + virtual bool isReady(Cache::Handle* /*handle*/) override { return true; } + virtual void Wait(Cache::Handle* /*handle*/) override {} virtual bool Ref(Cache::Handle* handle) override; virtual bool Release(Cache::Handle* handle, bool force_erase = false) override; @@ -243,6 +312,11 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { double GetHighPriPoolRatio(); private: + Status InsertItem(LRUHandle* item, Cache::Handle** handle); + Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::CacheItemHelperCallback helper_cb, + Cache::Handle** handle, Cache::Priority priority); void LRU_Remove(LRUHandle* e); void LRU_Insert(LRUHandle* e); @@ -303,6 +377,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { // We don't count mutex_ as the cache's internal state so semantically we // don't mind mutex_ invoking the non-const actions. mutable port::Mutex mutex_; + + std::shared_ptr nvm_cache_; }; class LRUCache @@ -316,7 +392,8 @@ class LRUCache std::shared_ptr memory_allocator = nullptr, bool use_adaptive_mutex = kDefaultToAdaptiveMutex, CacheMetadataChargePolicy metadata_charge_policy = - kDontChargeCacheMetadata); + kDontChargeCacheMetadata, + const std::shared_ptr& nvm_cache = nullptr); virtual ~LRUCache(); virtual const char* Name() const override { return "LRUCache"; } virtual CacheShard* GetShard(int shard) override; @@ -325,6 +402,7 @@ class LRUCache virtual size_t GetCharge(Handle* handle) const override; virtual uint32_t GetHash(Handle* handle) const override; virtual void DisownData() override; + virtual void WaitAll(std::vector& /*handles*/) override {} // Retrieves number of elements in LRU, for unit test purpose only size_t TEST_GetLRUSize(); diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 08c05024a..25be4a5f8 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -7,8 +7,12 @@ #include #include + #include "port/port.h" +#include "rocksdb/cache.h" #include "test_util/testharness.h" +#include "util/coding.h" +#include "util/random.h" namespace ROCKSDB_NAMESPACE { @@ -32,7 +36,7 @@ class LRUCacheTest : public testing::Test { port::cacheline_aligned_alloc(sizeof(LRUCacheShard))); new (cache_) LRUCacheShard(capacity, false /*strict_capcity_limit*/, high_pri_pool_ratio, use_adaptive_mutex, - kDontChargeCacheMetadata); + kDontChargeCacheMetadata, nullptr /*nvm_cache*/); } void Insert(const std::string& key, @@ -191,6 +195,178 @@ TEST_F(LRUCacheTest, EntriesWithPriority) { ValidateLRUList({"e", "f", "g", "Z", "d"}, 2); } +class TestNvmCache : public NvmCache { + public: + TestNvmCache(size_t capacity) : num_inserts_(0), num_lookups_(0) { + cache_ = NewLRUCache(capacity, 0, false, 0.5, nullptr, + kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); + } + ~TestNvmCache() { cache_.reset(); } + + std::string Name() override { return "TestNvmCache"; } + + Status Insert(const Slice& key, void* value, + Cache::CacheItemHelperCallback helper_cb) override { + Cache::SizeCallback size_cb; + Cache::SaveToCallback save_cb; + size_t size; + char* buf; + Status s; + + num_inserts_++; + (*helper_cb)(&size_cb, &save_cb, nullptr); + size = (*size_cb)(value); + buf = new char[size + sizeof(uint64_t)]; + EncodeFixed64(buf, size); + s = (*save_cb)(value, 0, size, buf + sizeof(uint64_t)); + EXPECT_OK(s); + return cache_->Insert(key, buf, size, + [](const Slice& /*key*/, void* value) -> void { + delete[] reinterpret_cast(value); + }); + } + + std::unique_ptr Lookup(const Slice& key, + const Cache::CreateCallback& create_cb, + bool /*wait*/) override { + std::unique_ptr nvm_handle; + Cache::Handle* handle = cache_->Lookup(key); + num_lookups_++; + if (handle) { + void* value; + size_t charge; + char* ptr = (char*)cache_->Value(handle); + size_t size = DecodeFixed64(ptr); + ptr += sizeof(uint64_t); + Status s = create_cb(ptr, size, &value, &charge); + EXPECT_OK(s); + nvm_handle.reset( + new TestNvmCacheHandle(cache_.get(), handle, value, charge)); + } + return nvm_handle; + } + + void Erase(const Slice& /*key*/) override {} + + void WaitAll(std::vector /*handles*/) override {} + + std::string GetPrintableOptions() const override { return ""; } + + uint32_t num_inserts() { return num_inserts_; } + + uint32_t num_lookups() { return num_lookups_; } + + private: + class TestNvmCacheHandle : public NvmCacheHandle { + public: + TestNvmCacheHandle(Cache* cache, Cache::Handle* handle, void* value, + size_t size) + : cache_(cache), handle_(handle), value_(value), size_(size) {} + ~TestNvmCacheHandle() { + delete[] reinterpret_cast(cache_->Value(handle_)); + cache_->Release(handle_); + } + + bool isReady() override { return true; } + + void Wait() override {} + + void* Value() override { return value_; } + + size_t Size() override { return size_; } + + private: + Cache* cache_; + Cache::Handle* handle_; + void* value_; + size_t size_; + }; + + std::shared_ptr cache_; + uint32_t num_inserts_; + uint32_t num_lookups_; +}; + +TEST_F(LRUCacheTest, TestNvmCache) { + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + std::shared_ptr nvm_cache(new TestNvmCache(2048)); + opts.nvm_cache = nvm_cache; + std::shared_ptr cache = NewLRUCache(opts); + + class TestItem { + public: + TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) { + memcpy(buf_.get(), buf, size); + } + ~TestItem() {} + + char* Buf() { return buf_.get(); } + size_t Size() { return size_; } + + private: + std::unique_ptr buf_; + size_t size_; + }; + + Cache::CacheItemHelperCallback helper_cb = + [](Cache::SizeCallback* size_cb, Cache::SaveToCallback* saveto_cb, + Cache::DeletionCallback* del_cb) -> void { + if (size_cb) { + *size_cb = [](void* obj) -> size_t { + return reinterpret_cast(obj)->Size(); + }; + } + if (saveto_cb) { + *saveto_cb = [](void* obj, size_t offset, size_t size, + void* out) -> Status { + TestItem* item = reinterpret_cast(obj); + char* buf = item->Buf(); + EXPECT_EQ(size, item->Size()); + EXPECT_EQ(offset, 0); + memcpy(out, buf, size); + return Status::OK(); + }; + } + if (del_cb) { + *del_cb = [](const Slice& /*key*/, void* obj) -> void { + delete reinterpret_cast(obj); + }; + } + }; + + int create_count = 0; + Cache::CreateCallback test_item_creator = + [&create_count](void* buf, size_t size, void** out_obj, + size_t* charge) -> Status { + create_count++; + *out_obj = reinterpret_cast(new TestItem((char*)buf, size)); + *charge = size; + return Status::OK(); + }; + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + cache->Insert("k1", item1, helper_cb, str1.length()); + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k2 should be demoted to NVM + cache->Insert("k2", item2, helper_cb, str2.length()); + + Cache::Handle* handle; + handle = cache->Lookup("k2", helper_cb, test_item_creator, + Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should promote k1 and demote k2 + handle = cache->Lookup("k1", helper_cb, test_item_creator, + Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + ASSERT_EQ(nvm_cache->num_inserts(), 2); + ASSERT_EQ(nvm_cache->num_lookups(), 1); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/cache/sharded_cache.cc b/cache/sharded_cache.cc index 6c915df8c..80044afe2 100644 --- a/cache/sharded_cache.cc +++ b/cache/sharded_cache.cc @@ -51,11 +51,39 @@ Status ShardedCache::Insert(const Slice& key, void* value, size_t charge, ->Insert(key, hash, value, charge, deleter, handle, priority); } +Status ShardedCache::Insert(const Slice& key, void* value, + CacheItemHelperCallback helper_cb, size_t charge, + Handle** handle, Priority priority) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash)) + ->Insert(key, hash, value, helper_cb, charge, handle, priority); +} + Cache::Handle* ShardedCache::Lookup(const Slice& key, Statistics* /*stats*/) { uint32_t hash = HashSlice(key); return GetShard(Shard(hash))->Lookup(key, hash); } +Cache::Handle* ShardedCache::Lookup(const Slice& key, + CacheItemHelperCallback helper_cb, + const CreateCallback& create_cb, + Priority priority, bool wait, + Statistics* /*stats*/) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash)) + ->Lookup(key, hash, helper_cb, create_cb, priority, wait); +} + +bool ShardedCache::isReady(Handle* handle) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->isReady(handle); +} + +void ShardedCache::Wait(Handle* handle) { + uint32_t hash = GetHash(handle); + GetShard(Shard(hash))->Wait(handle); +} + bool ShardedCache::Ref(Handle* handle) { uint32_t hash = GetHash(handle); return GetShard(Shard(hash))->Ref(handle); @@ -66,6 +94,11 @@ bool ShardedCache::Release(Handle* handle, bool force_erase) { return GetShard(Shard(hash))->Release(handle, force_erase); } +bool ShardedCache::Release(Handle* handle, bool useful, bool force_erase) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Release(handle, useful, force_erase); +} + void ShardedCache::Erase(const Slice& key) { uint32_t hash = HashSlice(key); GetShard(Shard(hash))->Erase(key, hash); diff --git a/cache/sharded_cache.h b/cache/sharded_cache.h index ce9e459dc..54779d8e3 100644 --- a/cache/sharded_cache.h +++ b/cache/sharded_cache.h @@ -28,9 +28,20 @@ class CacheShard { size_t charge, void (*deleter)(const Slice& key, void* value), Cache::Handle** handle, Cache::Priority priority) = 0; + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + Cache::CacheItemHelperCallback helper_cb, size_t charge, + Cache::Handle** handle, Cache::Priority priority) = 0; virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) = 0; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash, + Cache::CacheItemHelperCallback helper_cb, + const Cache::CreateCallback& create_cb, + Cache::Priority priority, bool wait) = 0; + virtual bool Release(Cache::Handle* handle, bool useful, + bool force_erase) = 0; + virtual bool isReady(Cache::Handle* handle) = 0; + virtual void Wait(Cache::Handle* handle) = 0; virtual bool Ref(Cache::Handle* handle) = 0; - virtual bool Release(Cache::Handle* handle, bool force_erase = false) = 0; + virtual bool Release(Cache::Handle* handle, bool force_erase) = 0; virtual void Erase(const Slice& key, uint32_t hash) = 0; virtual void SetCapacity(size_t capacity) = 0; virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; @@ -62,6 +73,7 @@ class ShardedCache : public Cache { virtual const CacheShard* GetShard(int shard) const = 0; virtual void* Value(Handle* handle) override = 0; virtual size_t GetCharge(Handle* handle) const override = 0; + virtual void WaitAll(std::vector& handles) override = 0; virtual uint32_t GetHash(Handle* handle) const = 0; virtual void DisownData() override = 0; @@ -72,7 +84,18 @@ class ShardedCache : public Cache { virtual Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle, Priority priority) override; + virtual Status Insert(const Slice& key, void* value, + CacheItemHelperCallback helper_cb, size_t chargge, + Handle** handle = nullptr, + Priority priority = Priority::LOW) override; virtual Handle* Lookup(const Slice& key, Statistics* stats) override; + virtual Handle* Lookup(const Slice& key, CacheItemHelperCallback helper_cb, + const CreateCallback& create_cb, Priority priority, + bool wait, Statistics* stats = nullptr) override; + virtual bool Release(Handle* handle, bool useful, + bool force_erase = false) override; + virtual bool isReady(Handle* handle) override; + virtual void Wait(Handle* handle) override; virtual bool Ref(Handle* handle) override; virtual bool Release(Handle* handle, bool force_erase = false) override; virtual void Erase(const Slice& key) override; diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 1ce203d38..933942839 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -23,8 +23,11 @@ #pragma once #include + +#include #include #include + #include "rocksdb/memory_allocator.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" @@ -34,6 +37,7 @@ namespace ROCKSDB_NAMESPACE { class Cache; struct ConfigOptions; +class NvmCache; extern const bool kDefaultToAdaptiveMutex; @@ -87,6 +91,9 @@ struct LRUCacheOptions { CacheMetadataChargePolicy metadata_charge_policy = kDefaultCacheMetadataChargePolicy; + // An NvmCache instance to use a the non-volatile tier + std::shared_ptr nvm_cache; + LRUCacheOptions() {} LRUCacheOptions(size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit, double _high_pri_pool_ratio, @@ -137,6 +144,57 @@ class Cache { // likely to get evicted than low priority entries. enum class Priority { HIGH, LOW }; + // A set of callbacks to allow objects in the volatile block cache to be + // be persisted in a NVM cache tier. Since the volatile cache holds C++ + // objects and the NVM cache may only hold flat data that doesn't need + // relocation, these callbacks need to be provided by the user of the block + // cache to do the conversion. + // The CacheItemHelperCallback is passed to Insert(). When invoked, it + // returns the callback functions for size, saving and deletion of the + // object. We do it this way so that the cache implementation only needs to + // save one function pointer in its metadata per object, the + // CacheItemHelperCallback pointer which is a C-style function pointer. + // Saving multiple std::function objects will take up 32 bytes per + // function, even if its not bound to an object and does no capture. The + // other alternative is to take a pointer to another object that implements + // this interface, but that would create issues with managing the object + // lifecycle. + // + // All the callbacks are C-style function pointers in order to simplify + // lifecycle management. Objects in the cache can outlive the parent DB, + // so anything required for these operations should be contained in the + // object itself. + // + // The SizeCallback takes a void* pointer to the object and returns the size + // of the persistable data. It can be used by the NVM cache to allocate + // memory if needed. + typedef size_t (*SizeCallback)(void* obj); + + // The SaveToCallback takes a void* object pointer and saves the persistable + // data into a buffer. The NVM cache may decide to not store it in a + // contiguous buffer, in which case this callback will be called multiple + // times with increasing offset + typedef rocksdb::Status (*SaveToCallback)(void* obj, size_t offset, + size_t size, void* out); + + // DeletionCallback is a function pointer that deletes the cached + // object. The signature matches the old deleter function. + typedef void (*DeletionCallback)(const Slice&, void*); + + // A callback function that returns the size, save to, and deletion + // callbacks. Fill any of size_cb, saveto_cb, del_cb that is non-null + typedef void (*CacheItemHelperCallback)(SizeCallback* size_cb, + SaveToCallback* saveto_cb, + DeletionCallback* del_cb); + + // The CreateCallback is passed by the block cache user to Lookup(). It + // takes in a buffer from the NVM cache and constructs an object using + // it. The callback doesn't have ownership of the buffer and should + // copy the contents into its own buffer. + typedef std::function + CreateCallback; + Cache(std::shared_ptr allocator = nullptr) : memory_allocator_(std::move(allocator)) {} // No copying allowed @@ -170,8 +228,8 @@ class Cache { // The type of the Cache virtual const char* Name() const = 0; - // Insert a mapping from key->value into the cache and assign it - // the specified charge against the total cache capacity. + // Insert a mapping from key->value into the volatile cache only + // and assign it // the specified charge against the total cache capacity. // If strict_capacity_limit is true and cache reaches its full capacity, // return Status::Incomplete. // @@ -190,6 +248,38 @@ class Cache { Handle** handle = nullptr, Priority priority = Priority::LOW) = 0; + // Insert a mapping from key->value into the volatile cache and assign it + // the specified charge against the total cache capacity. + // If strict_capacity_limit is true and cache reaches its full capacity, + // return Status::Incomplete. + // + // If handle is not nullptr, returns a handle that corresponds to the + // mapping. The caller must call this->Release(handle) when the returned + // mapping is no longer needed. In case of error caller is responsible to + // cleanup the value (i.e. calling "deleter"). + // + // If handle is nullptr, it is as if Release is called immediately after + // insert. In case of error value will be cleanup. + // + // Regardless of whether the item was inserted into the volatile cache, + // it will attempt to insert it into the NVM cache if one is configured. + // The block cache implementation must support the NVM tier, otherwise + // the item is only inserted into the volatile tier. It may + // defer the insertion to NVM as it sees fit. The NVM + // cache may or may not write it to NVM depending on its admission + // policy. + // + // When the inserted entry is no longer needed, the key and + // value will be passed to "deleter". + virtual Status Insert(const Slice& key, void* value, + CacheItemHelperCallback helper_cb, size_t charge, + Handle** handle = nullptr, + Priority priority = Priority::LOW) { + DeletionCallback delete_cb; + (*helper_cb)(nullptr, nullptr, &delete_cb); + return Insert(key, value, charge, delete_cb, handle, priority); + } + // If the cache has no mapping for "key", returns nullptr. // // Else return a handle that corresponds to the mapping. The caller @@ -199,6 +289,25 @@ class Cache { // function. virtual Handle* Lookup(const Slice& key, Statistics* stats = nullptr) = 0; + // Lookup the key in the volatile and NVM tiers (if one is configured). + // The create_cb callback function object will be used to contruct the + // cached object. + // If none of the tiers have the mapping for the key, rturns nullptr. + // Else, returns a handle that corresponds to the mapping. + // + // The handle returned may not be ready. The caller should call isReady() + // to check if the item value is ready, and call Wait() or WaitAll() if + // its not ready. The caller should then call Value() to check if the + // item was successfully retrieved. If unsuccessful (perhaps due to an + // IO error), Value() will return nullptr. + virtual Handle* Lookup(const Slice& key, + CacheItemHelperCallback /*helper_cb*/, + const CreateCallback& /*create_cb*/, + Priority /*priority*/, bool /*wait*/, + Statistics* stats = nullptr) { + return Lookup(key, stats); + } + // Increments the reference count for the handle if it refers to an entry in // the cache. Returns true if refcount was incremented; otherwise, returns // false. @@ -219,6 +328,27 @@ class Cache { // REQUIRES: handle must have been returned by a method on *this. virtual bool Release(Handle* handle, bool force_erase = false) = 0; + // Release a mapping returned by a previous Lookup(). The "useful" + // parameter specifies whether the data was actually used or not, + // which may be used by the cache implementation to decide whether + // to consider it as a hit for retention purposes. + virtual bool Release(Handle* handle, bool /*useful*/, bool force_erase) { + return Release(handle, force_erase); + } + + // Determines if the handle returned by Lookup() has a valid value yet. + virtual bool isReady(Handle* /*handle*/) { return true; } + + // If the handle returned by Lookup() is not ready yet, wait till it + // becomes ready. + // Note: A ready handle doesn't necessarily mean it has a valid value. The + // user should call Value() and check for nullptr. + virtual void Wait(Handle* /*handle*/) {} + + // Wait for a vector of handles to become ready. As with Wait(), the user + // should check the Value() of each handle for nullptr + virtual void WaitAll(std::vector& /*handles*/) {} + // Return the value encapsulated in a handle returned by a // successful Lookup(). // REQUIRES: handle must not have been released yet. From c1ae1f143ef4775af6020964c0231632334f4dd6 Mon Sep 17 00:00:00 2001 From: anand76 Date: Mon, 29 Mar 2021 11:37:16 -0700 Subject: [PATCH 04/15] Add missing nvm_cache.h header file --- include/rocksdb/nvm_cache.h | 75 +++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 include/rocksdb/nvm_cache.h diff --git a/include/rocksdb/nvm_cache.h b/include/rocksdb/nvm_cache.h new file mode 100644 index 000000000..a2f585291 --- /dev/null +++ b/include/rocksdb/nvm_cache.h @@ -0,0 +1,75 @@ +// Copyright (c) 2021, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include + +#include +#include + +#include "rocksdb/cache.h" +#include "rocksdb/slice.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" + +namespace ROCKSDB_NAMESPACE { + +// A handle for lookup result. The handle may not be immediately ready or +// have a valid value. The caller must call isReady() to determine if its +// ready, and call Wait() in order to block until it becomes ready. +// The caller must call value() after it becomes ready to determine if the +// handle successfullly read the item. +class NvmCacheHandle { + public: + virtual ~NvmCacheHandle() {} + + // Returns whether the handle is ready or not + virtual bool isReady() = 0; + + // Block until handle becomes ready + virtual void Wait() = 0; + + // Return the value. If nullptr, it means the lookup was unsuccessful + virtual void* Value() = 0; + + // Return the size of value + virtual size_t Size() = 0; +}; + +// NvmCache +// +// NVM cache interface for caching blocks on a persistent medium. +class NvmCache { + public: + virtual ~NvmCache() {} + + virtual std::string Name() = 0; + + // Insert the given value into the NVM cache. The value is not written + // directly. Rather, the SaveToCallback provided by helper_cb will be + // used to extract the persistable data in value, which will be written + // to NVM. The implementation may or may not write it to NVM depending + // on the admission control policy, even if the return status is success. + virtual Status Insert(const Slice& key, void* value, + Cache::CacheItemHelperCallback helper_cb) = 0; + + // Lookup the data for the given key in the NVM cache. The create_cb + // will be used to create the object. The handle returned may not be + // ready yet, unless wait=true, in which case Lookup() will block until + // the handle is ready + virtual std::unique_ptr Lookup( + const Slice& key, const Cache::CreateCallback& create_cb, bool wait) = 0; + + // At the discretion of the implementation, erase the data associated + // with key + virtual void Erase(const Slice& key) = 0; + + // Wait for a collection of handles to become ready + virtual void WaitAll(std::vector handles) = 0; + + virtual std::string GetPrintableOptions() const = 0; +}; + +} // namespace ROCKSDB_NAMESPACE From 1b0a1abafa8453a82728fdd4cc76c8d563de8a08 Mon Sep 17 00:00:00 2001 From: anand76 Date: Mon, 29 Mar 2021 14:28:04 -0700 Subject: [PATCH 05/15] Fix some build and test failures --- cache/lru_cache_test.cc | 5 ++--- db/db_test_util.h | 3 +++ utilities/simulator_cache/sim_cache.cc | 3 +++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 25be4a5f8..6ff2151aa 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -221,8 +221,8 @@ class TestNvmCache : public NvmCache { s = (*save_cb)(value, 0, size, buf + sizeof(uint64_t)); EXPECT_OK(s); return cache_->Insert(key, buf, size, - [](const Slice& /*key*/, void* value) -> void { - delete[] reinterpret_cast(value); + [](const Slice& /*key*/, void* val) -> void { + delete[] reinterpret_cast(val); }); } @@ -263,7 +263,6 @@ class TestNvmCache : public NvmCache { size_t size) : cache_(cache), handle_(handle), value_(value), size_(size) {} ~TestNvmCacheHandle() { - delete[] reinterpret_cast(cache_->Value(handle_)); cache_->Release(handle_); } diff --git a/db/db_test_util.h b/db/db_test_util.h index 481343de4..da2ccd6a2 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -794,6 +794,7 @@ class CacheWrapper : public Cache { const char* Name() const override { return target_->Name(); } + using Cache::Insert; Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle = nullptr, @@ -801,12 +802,14 @@ class CacheWrapper : public Cache { return target_->Insert(key, value, charge, deleter, handle, priority); } + using Cache::Lookup; Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override { return target_->Lookup(key, stats); } bool Ref(Handle* handle) override { return target_->Ref(handle); } + using Cache::Release; bool Release(Handle* handle, bool force_erase = false) override { return target_->Release(handle, force_erase); } diff --git a/utilities/simulator_cache/sim_cache.cc b/utilities/simulator_cache/sim_cache.cc index 0da71ab16..f957a70fc 100644 --- a/utilities/simulator_cache/sim_cache.cc +++ b/utilities/simulator_cache/sim_cache.cc @@ -167,6 +167,7 @@ class SimCacheImpl : public SimCache { cache_->SetStrictCapacityLimit(strict_capacity_limit); } + using Cache::Insert; Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle, Priority priority) override { @@ -193,6 +194,7 @@ class SimCacheImpl : public SimCache { return cache_->Insert(key, value, charge, deleter, handle, priority); } + using Cache::Lookup; Handle* Lookup(const Slice& key, Statistics* stats) override { Handle* h = key_only_cache_->Lookup(key); if (h != nullptr) { @@ -213,6 +215,7 @@ class SimCacheImpl : public SimCache { bool Ref(Handle* handle) override { return cache_->Ref(handle); } + using Cache::Release; bool Release(Handle* handle, bool force_erase = false) override { return cache_->Release(handle, force_erase); } From 2db4e48211b833b8b70ef3be04780315efb87c53 Mon Sep 17 00:00:00 2001 From: anand76 Date: Tue, 30 Mar 2021 09:21:15 -0700 Subject: [PATCH 06/15] Fix some CircleCI failures --- cache/lru_cache_test.cc | 6 +++--- db/db_basic_test.cc | 2 ++ db/db_block_cache_test.cc | 2 ++ include/rocksdb/cache.h | 8 ++++---- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 6ff2151aa..1194d1b89 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -229,7 +229,7 @@ class TestNvmCache : public NvmCache { std::unique_ptr Lookup(const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/) override { - std::unique_ptr nvm_handle; + std::unique_ptr nvm_handle; Cache::Handle* handle = cache_->Lookup(key); num_lookups_++; if (handle) { @@ -363,8 +363,8 @@ TEST_F(LRUCacheTest, TestNvmCache) { Cache::Priority::LOW, true); ASSERT_NE(handle, nullptr); cache->Release(handle); - ASSERT_EQ(nvm_cache->num_inserts(), 2); - ASSERT_EQ(nvm_cache->num_lookups(), 1); + ASSERT_EQ(nvm_cache->num_inserts(), 2u); + ASSERT_EQ(nvm_cache->num_lookups(), 1u); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 63a225252..34f27b809 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2820,6 +2820,7 @@ class DBBasicTestMultiGet : public DBTestBase { const char* Name() const override { return "MyBlockCache"; } + using Cache::Insert; Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle = nullptr, @@ -2828,6 +2829,7 @@ class DBBasicTestMultiGet : public DBTestBase { return target_->Insert(key, value, charge, deleter, handle, priority); } + using Cache::Lookup; Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override { num_lookups_++; Handle* handle = target_->Lookup(key, stats); diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index cf6f6dfd9..a2a08abc8 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -446,6 +446,7 @@ class MockCache : public LRUCache { false /*strict_capacity_limit*/, 0.0 /*high_pri_pool_ratio*/) { } + using ShardedCache::Insert; Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle, Priority priority) override { @@ -533,6 +534,7 @@ class LookupLiarCache : public CacheWrapper { explicit LookupLiarCache(std::shared_ptr target) : CacheWrapper(std::move(target)) {} + using Cache::Lookup; Handle* Lookup(const Slice& key, Statistics* stats) override { if (nth_lookup_not_found_ == 1) { nth_lookup_not_found_ = 0; diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 933942839..b509a1a31 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -174,7 +174,7 @@ class Cache { // data into a buffer. The NVM cache may decide to not store it in a // contiguous buffer, in which case this callback will be called multiple // times with increasing offset - typedef rocksdb::Status (*SaveToCallback)(void* obj, size_t offset, + typedef ROCKSDB_NAMESPACE::Status (*SaveToCallback)(void* obj, size_t offset, size_t size, void* out); // DeletionCallback is a function pointer that deletes the cached @@ -191,8 +191,8 @@ class Cache { // takes in a buffer from the NVM cache and constructs an object using // it. The callback doesn't have ownership of the buffer and should // copy the contents into its own buffer. - typedef std::function + typedef std::function CreateCallback; Cache(std::shared_ptr allocator = nullptr) @@ -275,7 +275,7 @@ class Cache { CacheItemHelperCallback helper_cb, size_t charge, Handle** handle = nullptr, Priority priority = Priority::LOW) { - DeletionCallback delete_cb; + DeletionCallback delete_cb = nullptr; (*helper_cb)(nullptr, nullptr, &delete_cb); return Insert(key, value, charge, delete_cb, handle, priority); } From b2c302597dc0fda7a882cb554b5da8eaf0a0a315 Mon Sep 17 00:00:00 2001 From: anand76 Date: Tue, 30 Mar 2021 12:34:19 -0700 Subject: [PATCH 07/15] Fix formatting and a ASSERT_STATUS_CHECKED test failure --- cache/lru_cache_test.cc | 6 ++---- include/rocksdb/cache.h | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 1194d1b89..ccc6e0b57 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -262,9 +262,7 @@ class TestNvmCache : public NvmCache { TestNvmCacheHandle(Cache* cache, Cache::Handle* handle, void* value, size_t size) : cache_(cache), handle_(handle), value_(value), size_(size) {} - ~TestNvmCacheHandle() { - cache_->Release(handle_); - } + ~TestNvmCacheHandle() { cache_->Release(handle_); } bool isReady() override { return true; } @@ -351,7 +349,7 @@ TEST_F(LRUCacheTest, TestNvmCache) { std::string str2 = rnd.RandomString(1020); TestItem* item2 = new TestItem(str2.data(), str2.length()); // k2 should be demoted to NVM - cache->Insert("k2", item2, helper_cb, str2.length()); + ASSERT_OK(cache->Insert("k2", item2, helper_cb, str2.length())); Cache::Handle* handle; handle = cache->Lookup("k2", helper_cb, test_item_creator, diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index b509a1a31..ad7516fbb 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -175,7 +175,7 @@ class Cache { // contiguous buffer, in which case this callback will be called multiple // times with increasing offset typedef ROCKSDB_NAMESPACE::Status (*SaveToCallback)(void* obj, size_t offset, - size_t size, void* out); + size_t size, void* out); // DeletionCallback is a function pointer that deletes the cached // object. The signature matches the old deleter function. @@ -191,8 +191,8 @@ class Cache { // takes in a buffer from the NVM cache and constructs an object using // it. The callback doesn't have ownership of the buffer and should // copy the contents into its own buffer. - typedef std::function + typedef std::function CreateCallback; Cache(std::shared_ptr allocator = nullptr) From 8015fc98718734fa9048c2fa0bfc16a4e5234629 Mon Sep 17 00:00:00 2001 From: anand76 Date: Wed, 31 Mar 2021 12:13:52 -0700 Subject: [PATCH 08/15] Address comments --- cache/lru_cache.cc | 50 ++++++++++--------- cache/lru_cache.h | 42 ++++++++-------- cache/lru_cache_test.cc | 48 +++++++++--------- include/rocksdb/cache.h | 6 +-- .../rocksdb/{nvm_cache.h => tiered_cache.h} | 19 +++---- 5 files changed, 85 insertions(+), 80 deletions(-) rename include/rocksdb/{nvm_cache.h => tiered_cache.h} (83%) diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index 139e18105..65916a8c9 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -98,7 +98,7 @@ LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, - const std::shared_ptr& nvm_cache) + const std::shared_ptr& tiered_cache) : capacity_(0), high_pri_pool_usage_(0), strict_capacity_limit_(strict_capacity_limit), @@ -107,7 +107,7 @@ LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, usage_(0), lru_usage_(0), mutex_(use_adaptive_mutex), - nvm_cache_(nvm_cache) { + tiered_cache_(tiered_cache) { set_metadata_charge_policy(metadata_charge_policy); // Make empty circular linked list lru_.next = &lru_; @@ -261,8 +261,9 @@ void LRUCacheShard::SetCapacity(size_t capacity) { // Try to insert the evicted entries into NVM cache // Free the entries outside of mutex for performance reasons for (auto entry : last_reference_list) { - if (nvm_cache_ && entry->IsNvmCompatible() && !entry->IsPromoted()) { - nvm_cache_->Insert(entry->key(), entry->value, entry->info_.helper_cb) + if (tiered_cache_ && entry->IsTieredCacheCompatible() && + !entry->IsPromoted()) { + tiered_cache_->Insert(entry->key(), entry->value, entry->info_.helper_cb) .PermitUncheckedError(); } entry->Free(); @@ -329,8 +330,9 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) { // Try to insert the evicted entries into NVM cache // Free the entries here outside of mutex for performance reasons for (auto entry : last_reference_list) { - if (nvm_cache_ && entry->IsNvmCompatible() && !entry->IsPromoted()) { - nvm_cache_->Insert(entry->key(), entry->value, entry->info_.helper_cb) + if (tiered_cache_ && entry->IsTieredCacheCompatible() && + !entry->IsPromoted()) { + tiered_cache_->Insert(entry->key(), entry->value, entry->info_.helper_cb) .PermitUncheckedError(); } entry->Free(); @@ -363,19 +365,19 @@ Cache::Handle* LRUCacheShard::Lookup( // mutex if we're going to lookup in the NVM cache // Only support synchronous for now // TODO: Support asynchronous lookup in NVM cache - if (!e && nvm_cache_ && helper_cb && wait) { + if (!e && tiered_cache_ && helper_cb && wait) { assert(create_cb); - std::unique_ptr nvm_handle = - nvm_cache_->Lookup(key, create_cb, wait); - if (nvm_handle != nullptr) { + std::unique_ptr tiered_handle = + tiered_cache_->Lookup(key, create_cb, wait); + if (tiered_handle != nullptr) { e = reinterpret_cast( new char[sizeof(LRUHandle) - 1 + key.size()]); e->flags = 0; e->SetPromoted(true); - e->SetNvmCompatible(true); + e->SetTieredCacheCompatible(true); e->info_.helper_cb = helper_cb; - e->charge = nvm_handle->Size(); + e->charge = tiered_handle->Size(); e->key_length = key.size(); e->hash = hash; e->refs = 0; @@ -384,8 +386,8 @@ Cache::Handle* LRUCacheShard::Lookup( e->SetPriority(priority); memcpy(e->key_data, key.data(), key.size()); - e->value = nvm_handle->Value(); - e->charge = nvm_handle->Size(); + e->value = tiered_handle->Value(); + e->charge = tiered_handle->Size(); // This call could nullify e if the cache is over capacity and // strict_capacity_limit_ is true. In such a case, the caller will try @@ -465,7 +467,7 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value, e->value = value; e->flags = 0; if (helper_cb) { - e->SetNvmCompatible(true); + e->SetTieredCacheCompatible(true); e->info_.helper_cb = helper_cb; } else { e->info_.deleter = deleter; @@ -536,7 +538,7 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits, std::shared_ptr allocator, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, - const std::shared_ptr& nvm_cache) + const std::shared_ptr& tiered_cache) : ShardedCache(capacity, num_shard_bits, strict_capacity_limit, std::move(allocator)) { num_shards_ = 1 << num_shard_bits; @@ -546,7 +548,7 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits, for (int i = 0; i < num_shards_; i++) { new (&shards_[i]) LRUCacheShard(per_shard, strict_capacity_limit, high_pri_pool_ratio, - use_adaptive_mutex, metadata_charge_policy, nvm_cache); + use_adaptive_mutex, metadata_charge_policy, tiered_cache); } } @@ -616,7 +618,7 @@ std::shared_ptr NewLRUCache( double high_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, - const std::shared_ptr& nvm_cache) { + const std::shared_ptr& tiered_cache) { if (num_shard_bits >= 20) { return nullptr; // the cache cannot be sharded into too many fine pieces } @@ -630,15 +632,15 @@ std::shared_ptr NewLRUCache( return std::make_shared( capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, std::move(memory_allocator), use_adaptive_mutex, metadata_charge_policy, - nvm_cache); + tiered_cache); } std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts) { - return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, - cache_opts.strict_capacity_limit, - cache_opts.high_pri_pool_ratio, - cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, - cache_opts.metadata_charge_policy, cache_opts.nvm_cache); + return NewLRUCache( + cache_opts.capacity, cache_opts.num_shard_bits, + cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio, + cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, + cache_opts.metadata_charge_policy, cache_opts.tiered_cache); } std::shared_ptr NewLRUCache( diff --git a/cache/lru_cache.h b/cache/lru_cache.h index 38b6a4a5c..bcb23d5e3 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -13,7 +13,7 @@ #include "cache/sharded_cache.h" #include "port/malloc.h" #include "port/port.h" -#include "rocksdb/nvm_cache.h" +#include "rocksdb/tiered_cache.h" #include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -55,7 +55,7 @@ struct LRUHandle { void (*deleter)(const Slice&, void* value); ShardedCache::CacheItemHelperCallback helper_cb; // This needs to be explicitly constructed and destructed - std::unique_ptr nvm_handle; + std::unique_ptr tiered_handle; } info_; LRUHandle* next_hash; LRUHandle* next; @@ -76,11 +76,11 @@ struct LRUHandle { IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry has had any lookups (hits). HAS_HIT = (1 << 3), - // Can this be inserted into the NVM cache - IS_NVM_COMPATIBLE = (1 << 4), - // Is the handle still being read from NVM - IS_INCOMPLETE = (1 << 5), - // Has the item been promoted from NVM + // Can this be inserted into the tiered cache + IS_TIERED_CACHE_COMPATIBLE = (1 << 4), + // Is the handle still being read from a lower tier + IS_PENDING = (1 << 5), + // Has the item been promoted from a lower tier IS_PROMOTED = (1 << 6), }; @@ -108,8 +108,10 @@ struct LRUHandle { bool IsHighPri() const { return flags & IS_HIGH_PRI; } bool InHighPriPool() const { return flags & IN_HIGH_PRI_POOL; } bool HasHit() const { return flags & HAS_HIT; } - bool IsNvmCompatible() const { return flags & IS_NVM_COMPATIBLE; } - bool IsIncomplete() const { return flags & IS_INCOMPLETE; } + bool IsTieredCacheCompatible() const { + return flags & IS_TIERED_CACHE_COMPATIBLE; + } + bool IsPending() const { return flags & IS_PENDING; } bool IsPromoted() const { return flags & IS_PROMOTED; } void SetInCache(bool in_cache) { @@ -138,19 +140,19 @@ struct LRUHandle { void SetHit() { flags |= HAS_HIT; } - void SetNvmCompatible(bool nvm) { - if (nvm) { - flags |= IS_NVM_COMPATIBLE; + void SetTieredCacheCompatible(bool tiered) { + if (tiered) { + flags |= IS_TIERED_CACHE_COMPATIBLE; } else { - flags &= ~IS_NVM_COMPATIBLE; + flags &= ~IS_TIERED_CACHE_COMPATIBLE; } } void SetIncomplete(bool incomp) { if (incomp) { - flags |= IS_INCOMPLETE; + flags |= IS_PENDING; } else { - flags &= ~IS_INCOMPLETE; + flags &= ~IS_PENDING; } } @@ -164,9 +166,9 @@ struct LRUHandle { void Free() { assert(refs == 0); - if (!IsNvmCompatible() && info_.deleter) { + if (!IsTieredCacheCompatible() && info_.deleter) { (*info_.deleter)(key(), value); - } else if (IsNvmCompatible()) { + } else if (IsTieredCacheCompatible()) { ShardedCache::DeletionCallback del_cb; (*info_.helper_cb)(nullptr, nullptr, &del_cb); (*del_cb)(key(), value); @@ -238,7 +240,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { LRUCacheShard(size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, - const std::shared_ptr& nvm_cache); + const std::shared_ptr& tiered_cache); virtual ~LRUCacheShard() override = default; // Separate from constructor so caller can easily make an array of LRUCache @@ -378,7 +380,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { // don't mind mutex_ invoking the non-const actions. mutable port::Mutex mutex_; - std::shared_ptr nvm_cache_; + std::shared_ptr tiered_cache_; }; class LRUCache @@ -393,7 +395,7 @@ class LRUCache bool use_adaptive_mutex = kDefaultToAdaptiveMutex, CacheMetadataChargePolicy metadata_charge_policy = kDontChargeCacheMetadata, - const std::shared_ptr& nvm_cache = nullptr); + const std::shared_ptr& tiered_cache = nullptr); virtual ~LRUCache(); virtual const char* Name() const override { return "LRUCache"; } virtual CacheShard* GetShard(int shard) override; diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index ccc6e0b57..45eb1fbe4 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -34,9 +34,9 @@ class LRUCacheTest : public testing::Test { DeleteCache(); cache_ = reinterpret_cast( port::cacheline_aligned_alloc(sizeof(LRUCacheShard))); - new (cache_) LRUCacheShard(capacity, false /*strict_capcity_limit*/, - high_pri_pool_ratio, use_adaptive_mutex, - kDontChargeCacheMetadata, nullptr /*nvm_cache*/); + new (cache_) LRUCacheShard( + capacity, false /*strict_capcity_limit*/, high_pri_pool_ratio, + use_adaptive_mutex, kDontChargeCacheMetadata, nullptr /*tiered_cache*/); } void Insert(const std::string& key, @@ -195,15 +195,15 @@ TEST_F(LRUCacheTest, EntriesWithPriority) { ValidateLRUList({"e", "f", "g", "Z", "d"}, 2); } -class TestNvmCache : public NvmCache { +class TestTieredCache : public TieredCache { public: - TestNvmCache(size_t capacity) : num_inserts_(0), num_lookups_(0) { + TestTieredCache(size_t capacity) : num_inserts_(0), num_lookups_(0) { cache_ = NewLRUCache(capacity, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); } - ~TestNvmCache() { cache_.reset(); } + ~TestTieredCache() { cache_.reset(); } - std::string Name() override { return "TestNvmCache"; } + std::string Name() override { return "TestTieredCache"; } Status Insert(const Slice& key, void* value, Cache::CacheItemHelperCallback helper_cb) override { @@ -226,10 +226,10 @@ class TestNvmCache : public NvmCache { }); } - std::unique_ptr Lookup(const Slice& key, - const Cache::CreateCallback& create_cb, - bool /*wait*/) override { - std::unique_ptr nvm_handle; + std::unique_ptr Lookup( + const Slice& key, const Cache::CreateCallback& create_cb, + bool /*wait*/) override { + std::unique_ptr tiered_handle; Cache::Handle* handle = cache_->Lookup(key); num_lookups_++; if (handle) { @@ -240,15 +240,15 @@ class TestNvmCache : public NvmCache { ptr += sizeof(uint64_t); Status s = create_cb(ptr, size, &value, &charge); EXPECT_OK(s); - nvm_handle.reset( - new TestNvmCacheHandle(cache_.get(), handle, value, charge)); + tiered_handle.reset( + new TestTieredCacheHandle(cache_.get(), handle, value, charge)); } - return nvm_handle; + return tiered_handle; } void Erase(const Slice& /*key*/) override {} - void WaitAll(std::vector /*handles*/) override {} + void WaitAll(std::vector /*handles*/) override {} std::string GetPrintableOptions() const override { return ""; } @@ -257,12 +257,12 @@ class TestNvmCache : public NvmCache { uint32_t num_lookups() { return num_lookups_; } private: - class TestNvmCacheHandle : public NvmCacheHandle { + class TestTieredCacheHandle : public TieredCacheHandle { public: - TestNvmCacheHandle(Cache* cache, Cache::Handle* handle, void* value, - size_t size) + TestTieredCacheHandle(Cache* cache, Cache::Handle* handle, void* value, + size_t size) : cache_(cache), handle_(handle), value_(value), size_(size) {} - ~TestNvmCacheHandle() { cache_->Release(handle_); } + ~TestTieredCacheHandle() { cache_->Release(handle_); } bool isReady() override { return true; } @@ -284,11 +284,11 @@ class TestNvmCache : public NvmCache { uint32_t num_lookups_; }; -TEST_F(LRUCacheTest, TestNvmCache) { +TEST_F(LRUCacheTest, TestTieredCache) { LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); - std::shared_ptr nvm_cache(new TestNvmCache(2048)); - opts.nvm_cache = nvm_cache; + std::shared_ptr tiered_cache(new TestTieredCache(2048)); + opts.tiered_cache = tiered_cache; std::shared_ptr cache = NewLRUCache(opts); class TestItem { @@ -361,8 +361,8 @@ TEST_F(LRUCacheTest, TestNvmCache) { Cache::Priority::LOW, true); ASSERT_NE(handle, nullptr); cache->Release(handle); - ASSERT_EQ(nvm_cache->num_inserts(), 2u); - ASSERT_EQ(nvm_cache->num_lookups(), 1u); + ASSERT_EQ(tiered_cache->num_inserts(), 2u); + ASSERT_EQ(tiered_cache->num_lookups(), 1u); } } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index ad7516fbb..bae4f0d91 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -37,7 +37,7 @@ namespace ROCKSDB_NAMESPACE { class Cache; struct ConfigOptions; -class NvmCache; +class TieredCache; extern const bool kDefaultToAdaptiveMutex; @@ -91,8 +91,8 @@ struct LRUCacheOptions { CacheMetadataChargePolicy metadata_charge_policy = kDefaultCacheMetadataChargePolicy; - // An NvmCache instance to use a the non-volatile tier - std::shared_ptr nvm_cache; + // A TieredCache instance to use a the non-volatile tier + std::shared_ptr tiered_cache; LRUCacheOptions() {} LRUCacheOptions(size_t _capacity, int _num_shard_bits, diff --git a/include/rocksdb/nvm_cache.h b/include/rocksdb/tiered_cache.h similarity index 83% rename from include/rocksdb/nvm_cache.h rename to include/rocksdb/tiered_cache.h index a2f585291..9af168b36 100644 --- a/include/rocksdb/nvm_cache.h +++ b/include/rocksdb/tiered_cache.h @@ -21,9 +21,9 @@ namespace ROCKSDB_NAMESPACE { // ready, and call Wait() in order to block until it becomes ready. // The caller must call value() after it becomes ready to determine if the // handle successfullly read the item. -class NvmCacheHandle { +class TieredCacheHandle { public: - virtual ~NvmCacheHandle() {} + virtual ~TieredCacheHandle() {} // Returns whether the handle is ready or not virtual bool isReady() = 0; @@ -38,19 +38,20 @@ class NvmCacheHandle { virtual size_t Size() = 0; }; -// NvmCache +// TieredCache // -// NVM cache interface for caching blocks on a persistent medium. -class NvmCache { +// Cache interface for caching blocks on a stackable tiers (which can include +// non-volatile mediums) +class TieredCache { public: - virtual ~NvmCache() {} + virtual ~TieredCache() {} virtual std::string Name() = 0; // Insert the given value into the NVM cache. The value is not written // directly. Rather, the SaveToCallback provided by helper_cb will be // used to extract the persistable data in value, which will be written - // to NVM. The implementation may or may not write it to NVM depending + // to this tier. The implementation may or may not write it to NVM depending // on the admission control policy, even if the return status is success. virtual Status Insert(const Slice& key, void* value, Cache::CacheItemHelperCallback helper_cb) = 0; @@ -59,7 +60,7 @@ class NvmCache { // will be used to create the object. The handle returned may not be // ready yet, unless wait=true, in which case Lookup() will block until // the handle is ready - virtual std::unique_ptr Lookup( + virtual std::unique_ptr Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool wait) = 0; // At the discretion of the implementation, erase the data associated @@ -67,7 +68,7 @@ class NvmCache { virtual void Erase(const Slice& key) = 0; // Wait for a collection of handles to become ready - virtual void WaitAll(std::vector handles) = 0; + virtual void WaitAll(std::vector handles) = 0; virtual std::string GetPrintableOptions() const = 0; }; From b42d4a8ad4ef14cb29ac44e43be4050c8ed34b2d Mon Sep 17 00:00:00 2001 From: anand76 Date: Thu, 1 Apr 2021 10:23:13 -0700 Subject: [PATCH 09/15] Fix comments and an ASSERT_STATUS_CHECKED bug --- cache/lru_cache_test.cc | 2 +- include/rocksdb/tiered_cache.h | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 45eb1fbe4..5f75649d5 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -345,7 +345,7 @@ TEST_F(LRUCacheTest, TestTieredCache) { Random rnd(301); std::string str1 = rnd.RandomString(1020); TestItem* item1 = new TestItem(str1.data(), str1.length()); - cache->Insert("k1", item1, helper_cb, str1.length()); + ASSERT_OK(cache->Insert("k1", item1, helper_cb, str1.length())); std::string str2 = rnd.RandomString(1020); TestItem* item2 = new TestItem(str2.data(), str2.length()); // k2 should be demoted to NVM diff --git a/include/rocksdb/tiered_cache.h b/include/rocksdb/tiered_cache.h index 9af168b36..a66e83e0c 100644 --- a/include/rocksdb/tiered_cache.h +++ b/include/rocksdb/tiered_cache.h @@ -48,15 +48,16 @@ class TieredCache { virtual std::string Name() = 0; - // Insert the given value into the NVM cache. The value is not written + // Insert the given value into this tier. The value is not written // directly. Rather, the SaveToCallback provided by helper_cb will be // used to extract the persistable data in value, which will be written - // to this tier. The implementation may or may not write it to NVM depending - // on the admission control policy, even if the return status is success. + // to this tier. The implementation may or may not write it to cache + // depending on the admission control policy, even if the return status is + // success. virtual Status Insert(const Slice& key, void* value, Cache::CacheItemHelperCallback helper_cb) = 0; - // Lookup the data for the given key in the NVM cache. The create_cb + // Lookup the data for the given key in this tier. The create_cb // will be used to create the object. The handle returned may not be // ready yet, unless wait=true, in which case Lookup() will block until // the handle is ready From e295344ae3546fa8f4b2bf0bdde7a63023744e9d Mon Sep 17 00:00:00 2001 From: anand76 Date: Thu, 1 Apr 2021 10:31:43 -0700 Subject: [PATCH 10/15] Fix comment --- cache/lru_cache.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index 65916a8c9..b5301976c 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -258,7 +258,7 @@ void LRUCacheShard::SetCapacity(size_t capacity) { EvictFromLRU(0, &last_reference_list); } - // Try to insert the evicted entries into NVM cache + // Try to insert the evicted entries into tiered cache // Free the entries outside of mutex for performance reasons for (auto entry : last_reference_list) { if (tiered_cache_ && entry->IsTieredCacheCompatible() && From 09df74c540540a55dc978cae4a48a79cdcdb1c69 Mon Sep 17 00:00:00 2001 From: anand76 Date: Tue, 20 Apr 2021 21:33:09 -0700 Subject: [PATCH 11/15] Allow cache_bench and db_bench to use 3rd party tiered cache --- CMakeLists.txt | 3 +- Makefile | 5 +- TARGETS | 14 + buckifier/buckify_rocksdb.py | 5 + cache/cache_bench.cc | 377 +-------------------- cache/cache_bench_tool.cc | 506 +++++++++++++++++++++++++++++ cache/lru_cache.cc | 1 - include/rocksdb/cache.h | 144 ++++---- include/rocksdb/cache_bench_tool.h | 14 + include/rocksdb/configurable.h | 4 + include/rocksdb/tiered_cache.h | 2 + options/configurable.cc | 6 + src.mk | 3 + tools/db_bench_tool.cc | 39 ++- 14 files changed, 667 insertions(+), 456 deletions(-) create mode 100644 cache/cache_bench_tool.cc create mode 100644 include/rocksdb/cache_bench_tool.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 81cec6633..cc6aed647 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1315,7 +1315,8 @@ if(WITH_BENCHMARK_TOOLS) ${ROCKSDB_LIB} ${THIRDPARTY_LIBS}) add_executable(cache_bench${ARTIFACT_SUFFIX} - cache/cache_bench.cc) + cache/cache_bench.cc + cache/cache_bench_tool.cc) target_link_libraries(cache_bench${ARTIFACT_SUFFIX} ${ROCKSDB_LIB} ${GFLAGS_LIB}) diff --git a/Makefile b/Makefile index a974dc3c1..4f2972f9b 100644 --- a/Makefile +++ b/Makefile @@ -511,12 +511,13 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full TEST_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(TEST_LIB_SOURCES) $(MOCK_LIB_SOURCES)) $(GTEST) BENCH_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(BENCH_LIB_SOURCES)) +CACHE_BENCH_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(CACHE_BENCH_LIB_SOURCES)) TOOL_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(TOOL_LIB_SOURCES)) ANALYZE_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(ANALYZER_LIB_SOURCES)) STRESS_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(STRESS_LIB_SOURCES)) ALL_SOURCES = $(LIB_SOURCES) $(TEST_LIB_SOURCES) $(MOCK_LIB_SOURCES) $(GTEST_DIR)/gtest/gtest-all.cc -ALL_SOURCES += $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(ANALYZER_LIB_SOURCES) $(STRESS_LIB_SOURCES) +ALL_SOURCES += $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(CACHE_BENCH_LIB_SOURCES) $(ANALYZER_LIB_SOURCES) $(STRESS_LIB_SOURCES) ALL_SOURCES += $(TEST_MAIN_SOURCES) $(TOOL_MAIN_SOURCES) $(BENCH_MAIN_SOURCES) TESTS = $(patsubst %.cc, %, $(notdir $(TEST_MAIN_SOURCES))) @@ -1462,7 +1463,7 @@ folly_synchronization_distributed_mutex_test: $(OBJ_DIR)/third-party/folly/folly $(AM_LINK) endif -cache_bench: $(OBJ_DIR)/cache/cache_bench.o $(LIBRARY) +cache_bench: $(OBJ_DIR)/cache/cache_bench.o $(CACHE_BENCH_OBJECTS) $(LIBRARY) $(AM_LINK) persistent_cache_bench: $(OBJ_DIR)/utilities/persistent_cache/persistent_cache_bench.o $(LIBRARY) diff --git a/TARGETS b/TARGETS index 61c48ef2c..f2dff8a90 100644 --- a/TARGETS +++ b/TARGETS @@ -784,6 +784,20 @@ cpp_library( link_whole = False, ) +cpp_library( + name = "rocksdb_cache_bench_tools_lib", + srcs = ["cache/cache_bench_tool.cc"], + auto_headers = AutoHeaders.RECURSIVE_GLOB, + arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS, + compiler_flags = ROCKSDB_COMPILER_FLAGS, + os_deps = ROCKSDB_OS_DEPS, + os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS, + preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS, + deps = [":rocksdb_lib"], + external_deps = ROCKSDB_EXTERNAL_DEPS, + link_whole = False, +) + cpp_library( name = "rocksdb_stress_lib", srcs = [ diff --git a/buckifier/buckify_rocksdb.py b/buckifier/buckify_rocksdb.py index 6dfedbce1..3d10e2f6a 100644 --- a/buckifier/buckify_rocksdb.py +++ b/buckifier/buckify_rocksdb.py @@ -169,6 +169,11 @@ def generate_targets(repo_path, deps_map): src_mk.get("ANALYZER_LIB_SOURCES", []) + ["test_util/testutil.cc"], [":rocksdb_lib"]) + # rocksdb_cache_bench_tools_lib + TARGETS.add_library( + "rocksdb_cache_bench_tools_lib", + src_mk.get("CACHE_BENCH_LIB_SOURCES", []), + [":rocksdb_lib"]) # rocksdb_stress_lib TARGETS.add_rocksdb_library( "rocksdb_stress_lib", diff --git a/cache/cache_bench.cc b/cache/cache_bench.cc index 48cf2b44d..05fc252b4 100644 --- a/cache/cache_bench.cc +++ b/cache/cache_bench.cc @@ -1,7 +1,11 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// Copyright (c) 2013-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. #ifndef GFLAGS #include @@ -10,375 +14,8 @@ int main() { return 1; } #else - -#include - -#include -#include -#include - -#include "port/port.h" -#include "rocksdb/cache.h" -#include "rocksdb/db.h" -#include "rocksdb/env.h" -#include "rocksdb/system_clock.h" -#include "util/coding.h" -#include "util/gflags_compat.h" -#include "util/hash.h" -#include "util/mutexlock.h" -#include "util/random.h" - -using GFLAGS_NAMESPACE::ParseCommandLineFlags; - -static constexpr uint32_t KiB = uint32_t{1} << 10; -static constexpr uint32_t MiB = KiB << 10; -static constexpr uint64_t GiB = MiB << 10; - -DEFINE_uint32(threads, 16, "Number of concurrent threads to run."); -DEFINE_uint64(cache_size, 1 * GiB, - "Number of bytes to use as a cache of uncompressed data."); -DEFINE_uint32(num_shard_bits, 6, "shard_bits."); - -DEFINE_double(resident_ratio, 0.25, - "Ratio of keys fitting in cache to keyspace."); -DEFINE_uint64(ops_per_thread, 0, - "Number of operations per thread. (Default: 5 * keyspace size)"); -DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added."); - -DEFINE_uint32(skew, 5, "Degree of skew in key selection"); -DEFINE_bool(populate_cache, true, "Populate cache before operations"); - -DEFINE_uint32(lookup_insert_percent, 87, - "Ratio of lookup (+ insert on not found) to total workload " - "(expressed as a percentage)"); -DEFINE_uint32(insert_percent, 2, - "Ratio of insert to total workload (expressed as a percentage)"); -DEFINE_uint32(lookup_percent, 10, - "Ratio of lookup to total workload (expressed as a percentage)"); -DEFINE_uint32(erase_percent, 1, - "Ratio of erase to total workload (expressed as a percentage)"); - -DEFINE_bool(use_clock_cache, false, ""); - -namespace ROCKSDB_NAMESPACE { - -class CacheBench; -namespace { -// State shared by all concurrent executions of the same benchmark. -class SharedState { - public: - explicit SharedState(CacheBench* cache_bench) - : cv_(&mu_), - num_initialized_(0), - start_(false), - num_done_(0), - cache_bench_(cache_bench) {} - - ~SharedState() {} - - port::Mutex* GetMutex() { - return &mu_; - } - - port::CondVar* GetCondVar() { - return &cv_; - } - - CacheBench* GetCacheBench() const { - return cache_bench_; - } - - void IncInitialized() { - num_initialized_++; - } - - void IncDone() { - num_done_++; - } - - bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; } - - bool AllDone() const { return num_done_ >= FLAGS_threads; } - - void SetStart() { - start_ = true; - } - - bool Started() const { - return start_; - } - - private: - port::Mutex mu_; - port::CondVar cv_; - - uint64_t num_initialized_; - bool start_; - uint64_t num_done_; - - CacheBench* cache_bench_; -}; - -// Per-thread state for concurrent executions of the same benchmark. -struct ThreadState { - uint32_t tid; - Random64 rnd; - SharedState* shared; - - ThreadState(uint32_t index, SharedState* _shared) - : tid(index), rnd(1000 + index), shared(_shared) {} -}; - -struct KeyGen { - char key_data[27]; - - Slice GetRand(Random64& rnd, uint64_t max_key) { - uint64_t raw = rnd.Next(); - // Skew according to setting - for (uint32_t i = 0; i < FLAGS_skew; ++i) { - raw = std::min(raw, rnd.Next()); - } - uint64_t key = FastRange64(raw, max_key); - // Variable size and alignment - size_t off = key % 8; - key_data[0] = char{42}; - EncodeFixed64(key_data + 1, key); - key_data[9] = char{11}; - EncodeFixed64(key_data + 10, key); - key_data[18] = char{4}; - EncodeFixed64(key_data + 19, key); - return Slice(&key_data[off], sizeof(key_data) - off); - } -}; - -char* createValue(Random64& rnd) { - char* rv = new char[FLAGS_value_bytes]; - // Fill with some filler data, and take some CPU time - for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) { - EncodeFixed64(rv + i, rnd.Next()); - } - return rv; -} - -void deleter(const Slice& /*key*/, void* value) { - delete[] static_cast(value); -} -} // namespace - -class CacheBench { - static constexpr uint64_t kHundredthUint64 = - std::numeric_limits::max() / 100U; - - public: - CacheBench() - : max_key_(static_cast(FLAGS_cache_size / FLAGS_resident_ratio / - FLAGS_value_bytes)), - lookup_insert_threshold_(kHundredthUint64 * - FLAGS_lookup_insert_percent), - insert_threshold_(lookup_insert_threshold_ + - kHundredthUint64 * FLAGS_insert_percent), - lookup_threshold_(insert_threshold_ + - kHundredthUint64 * FLAGS_lookup_percent), - erase_threshold_(lookup_threshold_ + - kHundredthUint64 * FLAGS_erase_percent) { - if (erase_threshold_ != 100U * kHundredthUint64) { - fprintf(stderr, "Percentages must add to 100.\n"); - exit(1); - } - if (FLAGS_use_clock_cache) { - cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits); - if (!cache_) { - fprintf(stderr, "Clock cache not supported.\n"); - exit(1); - } - } else { - cache_ = NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits); - } - if (FLAGS_ops_per_thread == 0) { - FLAGS_ops_per_thread = 5 * max_key_; - } - } - - ~CacheBench() {} - - void PopulateCache() { - Random64 rnd(1); - KeyGen keygen; - for (uint64_t i = 0; i < 2 * FLAGS_cache_size; i += FLAGS_value_bytes) { - cache_->Insert(keygen.GetRand(rnd, max_key_), createValue(rnd), - FLAGS_value_bytes, &deleter); - } - } - - bool Run() { - ROCKSDB_NAMESPACE::Env* env = ROCKSDB_NAMESPACE::Env::Default(); - const auto& clock = env->GetSystemClock(); - - PrintEnv(); - SharedState shared(this); - std::vector > threads(FLAGS_threads); - for (uint32_t i = 0; i < FLAGS_threads; i++) { - threads[i].reset(new ThreadState(i, &shared)); - env->StartThread(ThreadBody, threads[i].get()); - } - { - MutexLock l(shared.GetMutex()); - while (!shared.AllInitialized()) { - shared.GetCondVar()->Wait(); - } - // Record start time - uint64_t start_time = clock->NowMicros(); - - // Start all threads - shared.SetStart(); - shared.GetCondVar()->SignalAll(); - - // Wait threads to complete - while (!shared.AllDone()) { - shared.GetCondVar()->Wait(); - } - - // Record end time - uint64_t end_time = clock->NowMicros(); - double elapsed = static_cast(end_time - start_time) * 1e-6; - uint32_t qps = static_cast( - static_cast(FLAGS_threads * FLAGS_ops_per_thread) / elapsed); - fprintf(stdout, "Complete in %.3f s; QPS = %u\n", elapsed, qps); - } - return true; - } - - private: - std::shared_ptr cache_; - const uint64_t max_key_; - // Cumulative thresholds in the space of a random uint64_t - const uint64_t lookup_insert_threshold_; - const uint64_t insert_threshold_; - const uint64_t lookup_threshold_; - const uint64_t erase_threshold_; - - static void ThreadBody(void* v) { - ThreadState* thread = static_cast(v); - SharedState* shared = thread->shared; - - { - MutexLock l(shared->GetMutex()); - shared->IncInitialized(); - if (shared->AllInitialized()) { - shared->GetCondVar()->SignalAll(); - } - while (!shared->Started()) { - shared->GetCondVar()->Wait(); - } - } - thread->shared->GetCacheBench()->OperateCache(thread); - - { - MutexLock l(shared->GetMutex()); - shared->IncDone(); - if (shared->AllDone()) { - shared->GetCondVar()->SignalAll(); - } - } - } - - void OperateCache(ThreadState* thread) { - // To use looked-up values - uint64_t result = 0; - // To hold handles for a non-trivial amount of time - Cache::Handle* handle = nullptr; - KeyGen gen; - for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { - Slice key = gen.GetRand(thread->rnd, max_key_); - uint64_t random_op = thread->rnd.Next(); - if (random_op < lookup_insert_threshold_) { - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - // do lookup - handle = cache_->Lookup(key); - if (handle) { - // do something with the data - result += NPHash64(static_cast(cache_->Value(handle)), - FLAGS_value_bytes); - } else { - // do insert - cache_->Insert(key, createValue(thread->rnd), FLAGS_value_bytes, - &deleter, &handle); - } - } else if (random_op < insert_threshold_) { - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - // do insert - cache_->Insert(key, createValue(thread->rnd), FLAGS_value_bytes, - &deleter, &handle); - } else if (random_op < lookup_threshold_) { - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - // do lookup - handle = cache_->Lookup(key); - if (handle) { - // do something with the data - result += NPHash64(static_cast(cache_->Value(handle)), - FLAGS_value_bytes); - } - } else if (random_op < erase_threshold_) { - // do erase - cache_->Erase(key); - } else { - // Should be extremely unlikely (noop) - assert(random_op >= kHundredthUint64 * 100U); - } - } - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - } - - void PrintEnv() const { - printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); - printf("Number of threads : %u\n", FLAGS_threads); - printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); - printf("Cache size : %" PRIu64 "\n", FLAGS_cache_size); - printf("Num shard bits : %u\n", FLAGS_num_shard_bits); - printf("Max key : %" PRIu64 "\n", max_key_); - printf("Resident ratio : %g\n", FLAGS_resident_ratio); - printf("Skew degree : %u\n", FLAGS_skew); - printf("Populate cache : %d\n", int{FLAGS_populate_cache}); - printf("Lookup+Insert pct : %u%%\n", FLAGS_lookup_insert_percent); - printf("Insert percentage : %u%%\n", FLAGS_insert_percent); - printf("Lookup percentage : %u%%\n", FLAGS_lookup_percent); - printf("Erase percentage : %u%%\n", FLAGS_erase_percent); - printf("----------------------------\n"); - } -}; -} // namespace ROCKSDB_NAMESPACE - +#include int main(int argc, char** argv) { - ParseCommandLineFlags(&argc, &argv, true); - - if (FLAGS_threads <= 0) { - fprintf(stderr, "threads number <= 0\n"); - exit(1); - } - - ROCKSDB_NAMESPACE::CacheBench bench; - if (FLAGS_populate_cache) { - bench.PopulateCache(); - printf("Population complete\n"); - printf("----------------------------\n"); - } - if (bench.Run()) { - return 0; - } else { - return 1; - } + return ROCKSDB_NAMESPACE::cache_bench_tool(argc, argv); } - #endif // GFLAGS diff --git a/cache/cache_bench_tool.cc b/cache/cache_bench_tool.cc new file mode 100644 index 000000000..db40ddcc5 --- /dev/null +++ b/cache/cache_bench_tool.cc @@ -0,0 +1,506 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef GFLAGS +#include +int main() { + fprintf(stderr, "Please install gflags to run rocksdb tools\n"); + return 1; +} +#else + +#include + +#include +#include +#include + +#include "options/configurable_helper.h" +#include "port/port.h" +#include "rocksdb/cache.h" +#include "rocksdb/cache_bench_tool.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/system_clock.h" +#include "rocksdb/tiered_cache.h" +#include "rocksdb/utilities/object_registry.h" +#include "rocksdb/utilities/options_type.h" +#include "util/coding.h" +#include "util/gflags_compat.h" +#include "util/hash.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "util/stop_watch.h" + +using GFLAGS_NAMESPACE::ParseCommandLineFlags; + +static constexpr uint32_t KiB = uint32_t{1} << 10; +static constexpr uint32_t MiB = KiB << 10; +static constexpr uint64_t GiB = MiB << 10; + +DEFINE_uint32(threads, 16, "Number of concurrent threads to run."); +DEFINE_uint64(cache_size, 1 * GiB, + "Number of bytes to use as a cache of uncompressed data."); +DEFINE_uint32(num_shard_bits, 6, "shard_bits."); + +DEFINE_double(resident_ratio, 0.25, + "Ratio of keys fitting in cache to keyspace."); +DEFINE_uint64(ops_per_thread, 0, + "Number of operations per thread. (Default: 5 * keyspace size)"); +DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added."); + +DEFINE_uint32(skew, 5, "Degree of skew in key selection"); +DEFINE_bool(populate_cache, true, "Populate cache before operations"); + +DEFINE_uint32(lookup_insert_percent, 87, + "Ratio of lookup (+ insert on not found) to total workload " + "(expressed as a percentage)"); +DEFINE_uint32(insert_percent, 2, + "Ratio of insert to total workload (expressed as a percentage)"); +DEFINE_uint32(lookup_percent, 10, + "Ratio of lookup to total workload (expressed as a percentage)"); +DEFINE_uint32(erase_percent, 1, + "Ratio of erase to total workload (expressed as a percentage)"); + +DEFINE_bool(use_clock_cache, false, ""); + +DEFINE_bool(skewed, false, "If true, skew the key access distribution"); +DEFINE_string(tiered_cache_uri, "", + "Full URI for creating a custom NVM cache object"); +static class std::shared_ptr tiered_cache; + +namespace ROCKSDB_NAMESPACE { +class CacheBench; + +namespace { +// State shared by all concurrent executions of the same benchmark. +class SharedState { + public: + explicit SharedState(CacheBench* cache_bench) + : cv_(&mu_), + num_initialized_(0), + start_(false), + num_done_(0), + cache_bench_(cache_bench) {} + + ~SharedState() {} + + port::Mutex* GetMutex() { return &mu_; } + + port::CondVar* GetCondVar() { return &cv_; } + + CacheBench* GetCacheBench() const { return cache_bench_; } + + void IncInitialized() { num_initialized_++; } + + void IncDone() { num_done_++; } + + bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; } + + bool AllDone() const { return num_done_ >= FLAGS_threads; } + + void SetStart() { start_ = true; } + + bool Started() const { return start_; } + + private: + port::Mutex mu_; + port::CondVar cv_; + + uint64_t num_initialized_; + bool start_; + uint64_t num_done_; + + CacheBench* cache_bench_; +}; + +class Stats { + private: + uint64_t hits; + uint64_t misses; + uint64_t lookup_us; + uint64_t insert_us; + + public: + Stats() : hits(0), misses(0), lookup_us(0), insert_us(0) {} + + void AddHits(uint64_t inc) { hits += inc; } + void AddMisses(uint64_t inc) { misses += inc; } + void AddLookupUs(uint64_t lookup) { lookup_us += lookup; } + void AddInsertUs(uint64_t insert) { insert_us += insert; } + void Merge(const Stats& other) { + hits += other.hits; + misses += other.misses; + lookup_us += other.lookup_us; + insert_us += other.insert_us; + } + void Report() { + fprintf(stdout, "%lu hits, %lu misses, %lu lookup_ns, %lu insert_ns\n", + hits, misses, lookup_us, insert_us); + fflush(stdout); + } +}; + +// Per-thread state for concurrent executions of the same benchmark. +struct ThreadState { + uint32_t tid; + Random64 rnd; + Stats stats; + SharedState* shared; + + ThreadState(uint32_t index, SharedState* _shared) + : tid(index), rnd(1000 + index), shared(_shared) {} +}; + +struct KeyGen { + char key_data[27]; + + Slice GetRand(Random64& rnd, uint64_t max_key, int max_log) { + uint64_t key = 0; + if (!FLAGS_skewed) { + uint64_t raw = rnd.Next(); + // Skew according to setting + for (uint32_t i = 0; i < FLAGS_skew; ++i) { + raw = std::min(raw, rnd.Next()); + } + key = FastRange64(raw, max_key); + } else { + key = rnd.Skewed(max_log); + if (key > max_key) { + key -= max_key; + } + } + // Variable size and alignment + size_t off = key % 8; + key_data[0] = char{42}; + EncodeFixed64(key_data + 1, key); + key_data[9] = char{11}; + EncodeFixed64(key_data + 10, key); + key_data[18] = char{4}; + EncodeFixed64(key_data + 19, key); + return Slice(&key_data[off], sizeof(key_data) - off); + } +}; + +char* createValue(Random64& rnd) { + char* rv = new char[FLAGS_value_bytes]; + // Fill with some filler data, and take some CPU time + for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) { + EncodeFixed64(rv + i, rnd.Next()); + } + return rv; +} + +void helperCallback(Cache::SizeCallback* size_cb, + Cache::SaveToCallback* save_cb, + Cache::DeletionCallback* del_cb) { + if (size_cb) { + *size_cb = [](void* /*obj*/) -> size_t { return FLAGS_value_bytes; }; + } + if (save_cb) { + *save_cb = [](void* obj, size_t /*offset*/, size_t size, + void* out) -> Status { + memcpy(out, obj, size); + return Status::OK(); + }; + } + if (del_cb) { + *del_cb = [](const Slice& /*key*/, void* obj) -> void { + delete[] static_cast(obj); + }; + } +} +} // namespace + +class CacheBench { + static constexpr uint64_t kHundredthUint64 = + std::numeric_limits::max() / 100U; + + public: + CacheBench() + : max_key_(static_cast(FLAGS_cache_size / FLAGS_resident_ratio / + FLAGS_value_bytes)), + lookup_insert_threshold_(kHundredthUint64 * + FLAGS_lookup_insert_percent), + insert_threshold_(lookup_insert_threshold_ + + kHundredthUint64 * FLAGS_insert_percent), + lookup_threshold_(insert_threshold_ + + kHundredthUint64 * FLAGS_lookup_percent), + erase_threshold_(lookup_threshold_ + + kHundredthUint64 * FLAGS_erase_percent), + skewed_(FLAGS_skewed) { + if (erase_threshold_ != 100U * kHundredthUint64) { + fprintf(stderr, "Percentages must add to 100.\n"); + exit(1); + } + + if (skewed_) { + uint64_t max_key = max_key_; + max_log_ = 0; + while (max_key >>= 1) max_log_++; + if (max_key > (1u << max_log_)) max_log_++; + } + + if (FLAGS_use_clock_cache) { + cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits); + if (!cache_) { + fprintf(stderr, "Clock cache not supported.\n"); + exit(1); + } + } else { + LRUCacheOptions opts(FLAGS_cache_size, FLAGS_num_shard_bits, false, 0.5); + if (!FLAGS_tiered_cache_uri.empty()) { + Status s = ObjectRegistry::NewInstance()->NewSharedObject( + FLAGS_tiered_cache_uri, &tiered_cache); + if (tiered_cache == nullptr) { + fprintf(stderr, + "No tiered cache registered matching string: %s status=%s\n", + FLAGS_tiered_cache_uri.c_str(), s.ToString().c_str()); + exit(1); + } + opts.tiered_cache = tiered_cache; + } + + cache_ = NewLRUCache(opts); + } + if (FLAGS_ops_per_thread == 0) { + FLAGS_ops_per_thread = 5 * max_key_; + } + } + + ~CacheBench() {} + + void PopulateCache() { + Random64 rnd(1); + KeyGen keygen; + for (uint64_t i = 0; i < 10 * FLAGS_cache_size; i += FLAGS_value_bytes) { + cache_->Insert(keygen.GetRand(rnd, max_key_, max_log_), createValue(rnd), + helperCallback, FLAGS_value_bytes); + } + } + + bool Run() { + ROCKSDB_NAMESPACE::Env* env = ROCKSDB_NAMESPACE::Env::Default(); + const auto& clock = env->GetSystemClock(); + + PrintEnv(); + SharedState shared(this); + std::vector > threads(FLAGS_threads); + for (uint32_t i = 0; i < FLAGS_threads; i++) { + threads[i].reset(new ThreadState(i, &shared)); + env->StartThread(ThreadBody, threads[i].get()); + } + { + MutexLock l(shared.GetMutex()); + while (!shared.AllInitialized()) { + shared.GetCondVar()->Wait(); + } + // Record start time + uint64_t start_time = clock->NowMicros(); + + // Start all threads + shared.SetStart(); + shared.GetCondVar()->SignalAll(); + + // Wait threads to complete + while (!shared.AllDone()) { + shared.GetCondVar()->Wait(); + } + + // Record end time + uint64_t end_time = clock->NowMicros(); + double elapsed = static_cast(end_time - start_time) * 1e-6; + uint32_t qps = static_cast( + static_cast(FLAGS_threads * FLAGS_ops_per_thread) / elapsed); + fprintf(stdout, "Complete in %.3f s; QPS = %u\n", elapsed, qps); + + Stats merge_stats; + for (uint32_t i = 0; i < FLAGS_threads; ++i) { + merge_stats.Merge(threads[i]->stats); + } + merge_stats.Report(); + } + return true; + } + + private: + std::shared_ptr cache_; + const uint64_t max_key_; + // Cumulative thresholds in the space of a random uint64_t + const uint64_t lookup_insert_threshold_; + const uint64_t insert_threshold_; + const uint64_t lookup_threshold_; + const uint64_t erase_threshold_; + const bool skewed_; + int max_log_; + + static void ThreadBody(void* v) { + ThreadState* thread = static_cast(v); + SharedState* shared = thread->shared; + + { + MutexLock l(shared->GetMutex()); + shared->IncInitialized(); + if (shared->AllInitialized()) { + shared->GetCondVar()->SignalAll(); + } + while (!shared->Started()) { + shared->GetCondVar()->Wait(); + } + } + thread->shared->GetCacheBench()->OperateCache(thread); + + { + MutexLock l(shared->GetMutex()); + shared->IncDone(); + if (shared->AllDone()) { + shared->GetCondVar()->SignalAll(); + } + } + } + + void OperateCache(ThreadState* thread) { + // To use looked-up values + uint64_t result = 0; + // To hold handles for a non-trivial amount of time + Cache::Handle* handle = nullptr; + KeyGen gen; + ROCKSDB_NAMESPACE::Env* env = ROCKSDB_NAMESPACE::Env::Default(); + SystemClock* clock = env->GetSystemClock().get(); + for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { + Slice key = gen.GetRand(thread->rnd, max_key_, max_log_); + uint64_t random_op = thread->rnd.Next(); + Cache::CreateCallback create_cb = + [](void* buf, size_t size, void** out_obj, size_t* charge) -> Status { + *out_obj = reinterpret_cast(new char[size]); + memcpy(*out_obj, buf, size); + *charge = size; + return Status::OK(); + }; + + if (random_op < lookup_insert_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // do lookup + { + uint64_t elapsed = 0; + { + StopWatch sw(clock, nullptr, 0, &elapsed); + handle = cache_->Lookup(key, helperCallback, create_cb, + Cache::Priority::LOW, true); + } + thread->stats.AddLookupUs(elapsed); + } + if (handle) { + thread->stats.AddHits(1); + // do something with the data + result += NPHash64(static_cast(cache_->Value(handle)), + FLAGS_value_bytes); + } else { + thread->stats.AddMisses(1); + // do insert + { + uint64_t elapsed = 0; + { + StopWatch sw(clock, nullptr, 0, &elapsed); + cache_->Insert(key, createValue(thread->rnd), helperCallback, + FLAGS_value_bytes, &handle); + } + thread->stats.AddInsertUs(elapsed); + } + } + } else if (random_op < insert_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // do insert + { + uint64_t elapsed = 0; + { + StopWatch sw(clock, nullptr, 0, &elapsed); + cache_->Insert(key, createValue(thread->rnd), helperCallback, + FLAGS_value_bytes, &handle); + } + thread->stats.AddInsertUs(elapsed); + } + } else if (random_op < lookup_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // do lookup + { + uint64_t elapsed = 0; + { + StopWatch sw(clock, nullptr, 0, &elapsed); + handle = cache_->Lookup(key, helperCallback, create_cb, + Cache::Priority::LOW, true); + } + thread->stats.AddLookupUs(elapsed); + } + if (handle) { + thread->stats.AddHits(1); + // do something with the data + result += NPHash64(static_cast(cache_->Value(handle)), + FLAGS_value_bytes); + } + } else if (random_op < erase_threshold_) { + // do erase + cache_->Erase(key); + } else { + // Should be extremely unlikely (noop) + assert(random_op >= kHundredthUint64 * 100U); + } + } + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + } + + void PrintEnv() const { + printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); + printf("Number of threads : %u\n", FLAGS_threads); + printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); + printf("Cache size : %" PRIu64 "\n", FLAGS_cache_size); + printf("Num shard bits : %u\n", FLAGS_num_shard_bits); + printf("Max key : %" PRIu64 "\n", max_key_); + printf("Resident ratio : %g\n", FLAGS_resident_ratio); + printf("Skew degree : %u\n", FLAGS_skew); + printf("Populate cache : %d\n", int{FLAGS_populate_cache}); + printf("Lookup+Insert pct : %u%%\n", FLAGS_lookup_insert_percent); + printf("Insert percentage : %u%%\n", FLAGS_insert_percent); + printf("Lookup percentage : %u%%\n", FLAGS_lookup_percent); + printf("Erase percentage : %u%%\n", FLAGS_erase_percent); + printf("----------------------------\n"); + } +}; + +int cache_bench_tool(int argc, char** argv) { + ParseCommandLineFlags(&argc, &argv, true); + + if (FLAGS_threads <= 0) { + fprintf(stderr, "threads number <= 0\n"); + exit(1); + } + + ROCKSDB_NAMESPACE::CacheBench bench; + if (FLAGS_populate_cache) { + bench.PopulateCache(); + printf("Population complete\n"); + printf("----------------------------\n"); + } + if (bench.Run()) { + return 0; + } else { + return 1; + } +} +} // namespace ROCKSDB_NAMESPACE + +#endif // GFLAGS diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index b5301976c..a5b49218a 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -377,7 +377,6 @@ Cache::Handle* LRUCacheShard::Lookup( e->SetPromoted(true); e->SetTieredCacheCompatible(true); e->info_.helper_cb = helper_cb; - e->charge = tiered_handle->Size(); e->key_length = key.size(); e->hash = hash; e->refs = 0; diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index bae4f0d91..9b9164eb6 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -248,38 +248,6 @@ class Cache { Handle** handle = nullptr, Priority priority = Priority::LOW) = 0; - // Insert a mapping from key->value into the volatile cache and assign it - // the specified charge against the total cache capacity. - // If strict_capacity_limit is true and cache reaches its full capacity, - // return Status::Incomplete. - // - // If handle is not nullptr, returns a handle that corresponds to the - // mapping. The caller must call this->Release(handle) when the returned - // mapping is no longer needed. In case of error caller is responsible to - // cleanup the value (i.e. calling "deleter"). - // - // If handle is nullptr, it is as if Release is called immediately after - // insert. In case of error value will be cleanup. - // - // Regardless of whether the item was inserted into the volatile cache, - // it will attempt to insert it into the NVM cache if one is configured. - // The block cache implementation must support the NVM tier, otherwise - // the item is only inserted into the volatile tier. It may - // defer the insertion to NVM as it sees fit. The NVM - // cache may or may not write it to NVM depending on its admission - // policy. - // - // When the inserted entry is no longer needed, the key and - // value will be passed to "deleter". - virtual Status Insert(const Slice& key, void* value, - CacheItemHelperCallback helper_cb, size_t charge, - Handle** handle = nullptr, - Priority priority = Priority::LOW) { - DeletionCallback delete_cb = nullptr; - (*helper_cb)(nullptr, nullptr, &delete_cb); - return Insert(key, value, charge, delete_cb, handle, priority); - } - // If the cache has no mapping for "key", returns nullptr. // // Else return a handle that corresponds to the mapping. The caller @@ -289,25 +257,6 @@ class Cache { // function. virtual Handle* Lookup(const Slice& key, Statistics* stats = nullptr) = 0; - // Lookup the key in the volatile and NVM tiers (if one is configured). - // The create_cb callback function object will be used to contruct the - // cached object. - // If none of the tiers have the mapping for the key, rturns nullptr. - // Else, returns a handle that corresponds to the mapping. - // - // The handle returned may not be ready. The caller should call isReady() - // to check if the item value is ready, and call Wait() or WaitAll() if - // its not ready. The caller should then call Value() to check if the - // item was successfully retrieved. If unsuccessful (perhaps due to an - // IO error), Value() will return nullptr. - virtual Handle* Lookup(const Slice& key, - CacheItemHelperCallback /*helper_cb*/, - const CreateCallback& /*create_cb*/, - Priority /*priority*/, bool /*wait*/, - Statistics* stats = nullptr) { - return Lookup(key, stats); - } - // Increments the reference count for the handle if it refers to an entry in // the cache. Returns true if refcount was incremented; otherwise, returns // false. @@ -328,27 +277,6 @@ class Cache { // REQUIRES: handle must have been returned by a method on *this. virtual bool Release(Handle* handle, bool force_erase = false) = 0; - // Release a mapping returned by a previous Lookup(). The "useful" - // parameter specifies whether the data was actually used or not, - // which may be used by the cache implementation to decide whether - // to consider it as a hit for retention purposes. - virtual bool Release(Handle* handle, bool /*useful*/, bool force_erase) { - return Release(handle, force_erase); - } - - // Determines if the handle returned by Lookup() has a valid value yet. - virtual bool isReady(Handle* /*handle*/) { return true; } - - // If the handle returned by Lookup() is not ready yet, wait till it - // becomes ready. - // Note: A ready handle doesn't necessarily mean it has a valid value. The - // user should call Value() and check for nullptr. - virtual void Wait(Handle* /*handle*/) {} - - // Wait for a vector of handles to become ready. As with Wait(), the user - // should check the Value() of each handle for nullptr - virtual void WaitAll(std::vector& /*handles*/) {} - // Return the value encapsulated in a handle returned by a // successful Lookup(). // REQUIRES: handle must not have been released yet. @@ -417,6 +345,78 @@ class Cache { MemoryAllocator* memory_allocator() const { return memory_allocator_.get(); } + // Insert a mapping from key->value into the volatile cache and assign it + // the specified charge against the total cache capacity. + // If strict_capacity_limit is true and cache reaches its full capacity, + // return Status::Incomplete. + // + // If handle is not nullptr, returns a handle that corresponds to the + // mapping. The caller must call this->Release(handle) when the returned + // mapping is no longer needed. In case of error caller is responsible to + // cleanup the value (i.e. calling "deleter"). + // + // If handle is nullptr, it is as if Release is called immediately after + // insert. In case of error value will be cleanup. + // + // Regardless of whether the item was inserted into the volatile cache, + // it will attempt to insert it into the NVM cache if one is configured. + // The block cache implementation must support the NVM tier, otherwise + // the item is only inserted into the volatile tier. It may + // defer the insertion to NVM as it sees fit. The NVM + // cache may or may not write it to NVM depending on its admission + // policy. + // + // When the inserted entry is no longer needed, the key and + // value will be passed to "deleter". + virtual Status Insert(const Slice& key, void* value, + CacheItemHelperCallback helper_cb, size_t charge, + Handle** handle = nullptr, + Priority priority = Priority::LOW) { + DeletionCallback delete_cb = nullptr; + (*helper_cb)(nullptr, nullptr, &delete_cb); + return Insert(key, value, charge, delete_cb, handle, priority); + } + + // Lookup the key in the volatile and NVM tiers (if one is configured). + // The create_cb callback function object will be used to contruct the + // cached object. + // If none of the tiers have the mapping for the key, rturns nullptr. + // Else, returns a handle that corresponds to the mapping. + // + // The handle returned may not be ready. The caller should call isReady() + // to check if the item value is ready, and call Wait() or WaitAll() if + // its not ready. The caller should then call Value() to check if the + // item was successfully retrieved. If unsuccessful (perhaps due to an + // IO error), Value() will return nullptr. + virtual Handle* Lookup(const Slice& key, + CacheItemHelperCallback /*helper_cb*/, + const CreateCallback& /*create_cb*/, + Priority /*priority*/, bool /*wait*/, + Statistics* stats = nullptr) { + return Lookup(key, stats); + } + + // Release a mapping returned by a previous Lookup(). The "useful" + // parameter specifies whether the data was actually used or not, + // which may be used by the cache implementation to decide whether + // to consider it as a hit for retention purposes. + virtual bool Release(Handle* handle, bool /*useful*/, bool force_erase) { + return Release(handle, force_erase); + } + + // Determines if the handle returned by Lookup() has a valid value yet. + virtual bool isReady(Handle* /*handle*/) { return true; } + + // If the handle returned by Lookup() is not ready yet, wait till it + // becomes ready. + // Note: A ready handle doesn't necessarily mean it has a valid value. The + // user should call Value() and check for nullptr. + virtual void Wait(Handle* /*handle*/) {} + + // Wait for a vector of handles to become ready. As with Wait(), the user + // should check the Value() of each handle for nullptr + virtual void WaitAll(std::vector& /*handles*/) {} + private: std::shared_ptr memory_allocator_; }; diff --git a/include/rocksdb/cache_bench_tool.h b/include/rocksdb/cache_bench_tool.h new file mode 100644 index 000000000..413ce1593 --- /dev/null +++ b/include/rocksdb/cache_bench_tool.h @@ -0,0 +1,14 @@ +// Copyright (c) 2013-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" + +namespace ROCKSDB_NAMESPACE { + +int cache_bench_tool(int argc, char** argv); +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/configurable.h b/include/rocksdb/configurable.h index f6f5cc726..5971ec44e 100644 --- a/include/rocksdb/configurable.h +++ b/include/rocksdb/configurable.h @@ -356,4 +356,8 @@ class Configurable { // Configurable option via std::vector options_; }; + +extern void RegisterConfigurableOptions( + Configurable& configurable, const std::string& name, void* opt_ptr, + const std::unordered_map* opt_map); } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/tiered_cache.h b/include/rocksdb/tiered_cache.h index a66e83e0c..e022db909 100644 --- a/include/rocksdb/tiered_cache.h +++ b/include/rocksdb/tiered_cache.h @@ -48,6 +48,8 @@ class TieredCache { virtual std::string Name() = 0; + static const char* Type() { return "TieredCache"; } + // Insert the given value into this tier. The value is not written // directly. Rather, the SaveToCallback provided by helper_cb will be // used to extract the persistable data in value, which will be written diff --git a/options/configurable.cc b/options/configurable.cc index aa2b957c4..f1e1e789b 100644 --- a/options/configurable.cc +++ b/options/configurable.cc @@ -31,6 +31,12 @@ void ConfigurableHelper::RegisterOptions( configurable.options_.emplace_back(opts); } +void RegisterConfigurableOptions( + Configurable& configurable, const std::string& name, void* opt_ptr, + const std::unordered_map* type_map) { + ConfigurableHelper::RegisterOptions(configurable, name, opt_ptr, type_map); +} + //************************************************************************* // // Methods for Initializing and Validating Configurable Objects diff --git a/src.mk b/src.mk index 1e487f4e8..2841e355f 100644 --- a/src.mk +++ b/src.mk @@ -320,6 +320,9 @@ MOCK_LIB_SOURCES = \ BENCH_LIB_SOURCES = \ tools/db_bench_tool.cc \ +CACHE_BENCH_LIB_SOURCES = \ + cache/cache_bench_tool.cc \ + STRESS_LIB_SOURCES = \ db_stress_tool/batched_ops_stress.cc \ db_stress_tool/cf_consistency_stress.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index ac5677407..561aadaef 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -53,8 +53,10 @@ #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/stats_history.h" +#include "rocksdb/tiered_cache.h" #include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/options_type.h" #include "rocksdb/utilities/options_util.h" #include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/transaction.h" @@ -1409,6 +1411,10 @@ DEFINE_bool(read_with_latest_user_timestamp, true, "If true, always use the current latest timestamp for read. If " "false, choose a random timestamp from the past."); +DEFINE_string(tiered_cache_uri, "", + "Full URI for creating a custom tiered cache object"); +static class std::shared_ptr tiered_cache; + static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit); @@ -2757,22 +2763,35 @@ class Benchmark { } return cache; } else { - if (FLAGS_use_cache_memkind_kmem_allocator) { + LRUCacheOptions opts( + static_cast(capacity), FLAGS_cache_numshardbits, + false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio, #ifdef MEMKIND - return NewLRUCache( - static_cast(capacity), FLAGS_cache_numshardbits, - false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio, - std::make_shared()); - + FLAGS_use_cache_memkind_kmem_allocator + ? std::make_shared() + : nullptr #else + nullptr +#endif + ); + if (FLAGS_use_cache_memkind_kmem_allocator) { +#ifndef MEMKIND fprintf(stderr, "Memkind library is not linked with the binary."); exit(1); #endif - } else { - return NewLRUCache( - static_cast(capacity), FLAGS_cache_numshardbits, - false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio); } + if (!FLAGS_tiered_cache_uri.empty()) { + Status s = ObjectRegistry::NewInstance()->NewSharedObject( + FLAGS_tiered_cache_uri, &tiered_cache); + if (tiered_cache == nullptr) { + fprintf(stderr, + "No tiered cache registered matching string: %s status=%s\n", + FLAGS_tiered_cache_uri.c_str(), s.ToString().c_str()); + exit(1); + } + opts.tiered_cache = tiered_cache; + } + return NewLRUCache(opts); } } From d0a0c91017681b654810c7a5c6fb712222bf2e3a Mon Sep 17 00:00:00 2001 From: anand76 Date: Mon, 26 Apr 2021 13:35:48 -0700 Subject: [PATCH 12/15] Fix build issues from merge --- include/rocksdb/cache.h | 72 ---------------------------------- include/rocksdb/configurable.h | 4 -- options/configurable.cc | 6 --- 3 files changed, 82 deletions(-) diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 561799662..3b65d06a4 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -248,38 +248,6 @@ class Cache { Handle** handle = nullptr, Priority priority = Priority::LOW) = 0; - // Insert a mapping from key->value into the volatile cache and assign it - // the specified charge against the total cache capacity. - // If strict_capacity_limit is true and cache reaches its full capacity, - // return Status::Incomplete. - // - // If handle is not nullptr, returns a handle that corresponds to the - // mapping. The caller must call this->Release(handle) when the returned - // mapping is no longer needed. In case of error caller is responsible to - // cleanup the value (i.e. calling "deleter"). - // - // If handle is nullptr, it is as if Release is called immediately after - // insert. In case of error value will be cleanup. - // - // Regardless of whether the item was inserted into the volatile cache, - // it will attempt to insert it into the NVM cache if one is configured. - // The block cache implementation must support the NVM tier, otherwise - // the item is only inserted into the volatile tier. It may - // defer the insertion to NVM as it sees fit. The NVM - // cache may or may not write it to NVM depending on its admission - // policy. - // - // When the inserted entry is no longer needed, the key and - // value will be passed to "deleter". - virtual Status Insert(const Slice& key, void* value, - CacheItemHelperCallback helper_cb, size_t charge, - Handle** handle = nullptr, - Priority priority = Priority::LOW) { - DeletionCallback delete_cb = nullptr; - (*helper_cb)(nullptr, nullptr, &delete_cb); - return Insert(key, value, charge, delete_cb, handle, priority); - } - // If the cache has no mapping for "key", returns nullptr. // // Else return a handle that corresponds to the mapping. The caller @@ -289,25 +257,6 @@ class Cache { // function. virtual Handle* Lookup(const Slice& key, Statistics* stats = nullptr) = 0; - // Lookup the key in the volatile and NVM tiers (if one is configured). - // The create_cb callback function object will be used to contruct the - // cached object. - // If none of the tiers have the mapping for the key, rturns nullptr. - // Else, returns a handle that corresponds to the mapping. - // - // The handle returned may not be ready. The caller should call isReady() - // to check if the item value is ready, and call Wait() or WaitAll() if - // its not ready. The caller should then call Value() to check if the - // item was successfully retrieved. If unsuccessful (perhaps due to an - // IO error), Value() will return nullptr. - virtual Handle* Lookup(const Slice& key, - CacheItemHelperCallback /*helper_cb*/, - const CreateCallback& /*create_cb*/, - Priority /*priority*/, bool /*wait*/, - Statistics* stats = nullptr) { - return Lookup(key, stats); - } - // Increments the reference count for the handle if it refers to an entry in // the cache. Returns true if refcount was incremented; otherwise, returns // false. @@ -328,27 +277,6 @@ class Cache { // REQUIRES: handle must have been returned by a method on *this. virtual bool Release(Handle* handle, bool force_erase = false) = 0; - // Release a mapping returned by a previous Lookup(). The "useful" - // parameter specifies whether the data was actually used or not, - // which may be used by the cache implementation to decide whether - // to consider it as a hit for retention purposes. - virtual bool Release(Handle* handle, bool /*useful*/, bool force_erase) { - return Release(handle, force_erase); - } - - // Determines if the handle returned by Lookup() has a valid value yet. - virtual bool isReady(Handle* /*handle*/) { return true; } - - // If the handle returned by Lookup() is not ready yet, wait till it - // becomes ready. - // Note: A ready handle doesn't necessarily mean it has a valid value. The - // user should call Value() and check for nullptr. - virtual void Wait(Handle* /*handle*/) {} - - // Wait for a vector of handles to become ready. As with Wait(), the user - // should check the Value() of each handle for nullptr - virtual void WaitAll(std::vector& /*handles*/) {} - // Return the value encapsulated in a handle returned by a // successful Lookup(). // REQUIRES: handle must not have been released yet. diff --git a/include/rocksdb/configurable.h b/include/rocksdb/configurable.h index 92883794c..b56072dbe 100644 --- a/include/rocksdb/configurable.h +++ b/include/rocksdb/configurable.h @@ -385,8 +385,4 @@ class Configurable { // Configurable option via std::vector options_; }; - -extern void RegisterConfigurableOptions( - Configurable& configurable, const std::string& name, void* opt_ptr, - const std::unordered_map* opt_map); } // namespace ROCKSDB_NAMESPACE diff --git a/options/configurable.cc b/options/configurable.cc index 651d81924..f425f193c 100644 --- a/options/configurable.cc +++ b/options/configurable.cc @@ -31,12 +31,6 @@ void Configurable::RegisterOptions( options_.emplace_back(opts); } -void RegisterConfigurableOptions( - Configurable& configurable, const std::string& name, void* opt_ptr, - const std::unordered_map* type_map) { - ConfigurableHelper::RegisterOptions(configurable, name, opt_ptr, type_map); -} - //************************************************************************* // // Methods for Initializing and Validating Configurable Objects From 8f0de0554f6d49689798b53d3e567f9879b623d0 Mon Sep 17 00:00:00 2001 From: anand76 Date: Tue, 27 Apr 2021 17:20:52 -0700 Subject: [PATCH 13/15] Address comments --- TARGETS | 1 + cache/cache_bench_tool.cc | 2 +- include/rocksdb/cache.h | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/TARGETS b/TARGETS index 1504f1323..816f1c3c1 100644 --- a/TARGETS +++ b/TARGETS @@ -803,6 +803,7 @@ cpp_library( os_deps = ROCKSDB_OS_DEPS, os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS, preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS, + include_paths = ROCKSDB_INCLUDE_PATHS, deps = [":rocksdb_lib"], external_deps = ROCKSDB_EXTERNAL_DEPS, link_whole = False, diff --git a/cache/cache_bench_tool.cc b/cache/cache_bench_tool.cc index db40ddcc5..e372907c6 100644 --- a/cache/cache_bench_tool.cc +++ b/cache/cache_bench_tool.cc @@ -137,7 +137,7 @@ class Stats { insert_us += other.insert_us; } void Report() { - fprintf(stdout, "%lu hits, %lu misses, %lu lookup_ns, %lu insert_ns\n", + fprintf(stdout, "%lu hits, %lu misses, %lu lookup_us, %lu insert_us\n", hits, misses, lookup_us, insert_us); fflush(stdout); } diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 3b65d06a4..4e5bdeeb8 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -373,6 +373,7 @@ class Cache { Handle** handle = nullptr, Priority priority = Priority::LOW) { DeletionCallback delete_cb = nullptr; + assert(helper_cb != nullptr); (*helper_cb)(nullptr, nullptr, &delete_cb); return Insert(key, value, charge, delete_cb, handle, priority); } From 5c04610b9c2e3b1d115586b36b782f083036e5ab Mon Sep 17 00:00:00 2001 From: anand76 Date: Wed, 28 Apr 2021 10:03:08 -0700 Subject: [PATCH 14/15] Fix CircleCI build failures --- cache/cache_bench_tool.cc | 13 ++----------- tools/db_bench_tool.cc | 4 ++++ 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/cache/cache_bench_tool.cc b/cache/cache_bench_tool.cc index e372907c6..62f31de1c 100644 --- a/cache/cache_bench_tool.cc +++ b/cache/cache_bench_tool.cc @@ -3,14 +3,6 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -#ifndef GFLAGS -#include -int main() { - fprintf(stderr, "Please install gflags to run rocksdb tools\n"); - return 1; -} -#else - #include #include @@ -197,7 +189,7 @@ void helperCallback(Cache::SizeCallback* size_cb, Cache::SaveToCallback* save_cb, Cache::DeletionCallback* del_cb) { if (size_cb) { - *size_cb = [](void* /*obj*/) -> size_t { return FLAGS_value_bytes; }; + *size_cb = [](void * /*obj*/) -> size_t { return FLAGS_value_bytes; }; } if (save_cb) { *save_cb = [](void* obj, size_t /*offset*/, size_t size, @@ -236,9 +228,9 @@ class CacheBench { exit(1); } + max_log_ = 0; if (skewed_) { uint64_t max_key = max_key_; - max_log_ = 0; while (max_key >>= 1) max_log_++; if (max_key > (1u << max_log_)) max_log_++; } @@ -503,4 +495,3 @@ int cache_bench_tool(int argc, char** argv) { } } // namespace ROCKSDB_NAMESPACE -#endif // GFLAGS diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index b7265f7b8..98db70330 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1414,9 +1414,11 @@ DEFINE_bool(read_with_latest_user_timestamp, true, "If true, always use the current latest timestamp for read. If " "false, choose a random timestamp from the past."); +#ifndef ROCKSDB_LITE DEFINE_string(tiered_cache_uri, "", "Full URI for creating a custom tiered cache object"); static class std::shared_ptr tiered_cache; +#endif static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit); @@ -2796,6 +2798,7 @@ class Benchmark { exit(1); #endif } +#ifndef ROCKSDB_LITE if (!FLAGS_tiered_cache_uri.empty()) { Status s = ObjectRegistry::NewInstance()->NewSharedObject( FLAGS_tiered_cache_uri, &tiered_cache); @@ -2807,6 +2810,7 @@ class Benchmark { } opts.tiered_cache = tiered_cache; } +#endif return NewLRUCache(opts); } } From befd813e95b07cf7f6a6f9df3c8346852713cead Mon Sep 17 00:00:00 2001 From: anand76 Date: Wed, 28 Apr 2021 11:01:48 -0700 Subject: [PATCH 15/15] More build failure fixes --- cache/cache_bench_tool.cc | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/cache/cache_bench_tool.cc b/cache/cache_bench_tool.cc index 62f31de1c..b452780ff 100644 --- a/cache/cache_bench_tool.cc +++ b/cache/cache_bench_tool.cc @@ -3,6 +3,9 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#ifdef GFLAGS +#include "rocksdb/cache_bench_tool.h" + #include #include @@ -12,7 +15,6 @@ #include "options/configurable_helper.h" #include "port/port.h" #include "rocksdb/cache.h" -#include "rocksdb/cache_bench_tool.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/system_clock.h" @@ -59,9 +61,11 @@ DEFINE_uint32(erase_percent, 1, DEFINE_bool(use_clock_cache, false, ""); DEFINE_bool(skewed, false, "If true, skew the key access distribution"); +#ifndef ROCKSDB_LITE DEFINE_string(tiered_cache_uri, "", "Full URI for creating a custom NVM cache object"); static class std::shared_ptr tiered_cache; +#endif namespace ROCKSDB_NAMESPACE { class CacheBench; @@ -129,8 +133,9 @@ class Stats { insert_us += other.insert_us; } void Report() { - fprintf(stdout, "%lu hits, %lu misses, %lu lookup_us, %lu insert_us\n", - hits, misses, lookup_us, insert_us); + fprintf(stdout, "%llu hits, %llu misses, %llu lookup_us, %llu insert_us\n", + (unsigned long long)hits, (unsigned long long)misses, + (unsigned long long)lookup_us, (unsigned long long)insert_us); fflush(stdout); } }; @@ -189,7 +194,7 @@ void helperCallback(Cache::SizeCallback* size_cb, Cache::SaveToCallback* save_cb, Cache::DeletionCallback* del_cb) { if (size_cb) { - *size_cb = [](void * /*obj*/) -> size_t { return FLAGS_value_bytes; }; + *size_cb = [](void* /*obj*/) -> size_t { return FLAGS_value_bytes; }; } if (save_cb) { *save_cb = [](void* obj, size_t /*offset*/, size_t size, @@ -243,6 +248,7 @@ class CacheBench { } } else { LRUCacheOptions opts(FLAGS_cache_size, FLAGS_num_shard_bits, false, 0.5); +#ifndef ROCKSDB_LITE if (!FLAGS_tiered_cache_uri.empty()) { Status s = ObjectRegistry::NewInstance()->NewSharedObject( FLAGS_tiered_cache_uri, &tiered_cache); @@ -254,6 +260,7 @@ class CacheBench { } opts.tiered_cache = tiered_cache; } +#endif cache_ = NewLRUCache(opts); } @@ -494,4 +501,4 @@ int cache_bench_tool(int argc, char** argv) { } } } // namespace ROCKSDB_NAMESPACE - +#endif