diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index 44ce21911..a7310d8dc 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -309,7 +309,8 @@ void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { strict_capacity_limit_ = strict_capacity_limit; } -Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) { +Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle, + bool free_handle_on_fail) { Status s = Status::OK(); autovector last_reference_list; size_t total_charge = e->CalcTotalCharge(metadata_charge_policy_); @@ -323,14 +324,16 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) { if ((usage_ + total_charge) > capacity_ && (strict_capacity_limit_ || handle == nullptr)) { + e->SetInCache(false); if (handle == nullptr) { // Don't insert the entry but still return ok, as if the entry inserted // into cache and get evicted immediately. - e->SetInCache(false); last_reference_list.push_back(e); } else { - delete[] reinterpret_cast(e); - *handle = nullptr; + if (free_handle_on_fail) { + delete[] reinterpret_cast(e); + *handle = nullptr; + } s = Status::Incomplete("Insert failed due to LRU cache being full."); } } else { @@ -375,6 +378,43 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) { return s; } +void LRUCacheShard::Promote(LRUHandle* e) { + SecondaryCacheResultHandle* secondary_handle = e->sec_handle; + + assert(secondary_handle->IsReady()); + e->SetIncomplete(false); + e->SetInCache(true); + e->SetPromoted(true); + e->value = secondary_handle->Value(); + e->charge = secondary_handle->Size(); + delete secondary_handle; + + // This call could fail if the cache is over capacity and + // strict_capacity_limit_ is true. In such a case, we don't want + // InsertItem() to free the handle, since the item is already in memory + // and the caller will most likely just read from disk if we erase it here. + if (e->value) { + Cache::Handle* handle = reinterpret_cast(e); + Status s = InsertItem(e, &handle, /*free_handle_on_fail=*/false); + if (s.ok()) { + // InsertItem would have taken a reference on the item, so decrement it + // here as we expect the caller to already hold a reference + e->Unref(); + } else { + // Item is in memory, but not accounted against the cache capacity. + // When the handle is released, the item should get deleted + assert(!e->InCache()); + } + } else { + // Since the secondary cache lookup failed, mark the item as not in cache + // and charge the cache only for metadata usage, i.e handle, key etc + MutexLock l(&mutex_); + e->charge = 0; + e->SetInCache(false); + usage_ += e->CalcTotalCharge(metadata_charge_policy_); + } +} + Cache::Handle* LRUCacheShard::Lookup( const Slice& key, uint32_t hash, const ShardedCache::CacheItemHelper* helper, @@ -399,47 +439,44 @@ Cache::Handle* LRUCacheShard::Lookup( // mutex if we're going to lookup in the secondary cache // Only support synchronous for now // TODO: Support asynchronous lookup in secondary cache - if (!e && secondary_cache_ && helper && helper->saveto_cb && wait) { + if (!e && secondary_cache_ && helper && helper->saveto_cb) { // For objects from the secondary cache, we expect the caller to provide // a way to create/delete the primary cache object. The only case where // a deleter would not be required is for dummy entries inserted for // accounting purposes, which we won't demote to the secondary cache // anyway. assert(create_cb && helper->del_cb); - std::unique_ptr secondary_handle = + std::unique_ptr secondary_handle = secondary_cache_->Lookup(key, create_cb, wait); if (secondary_handle != nullptr) { - void* value = nullptr; e = reinterpret_cast( new char[sizeof(LRUHandle) - 1 + key.size()]); e->flags = 0; - e->SetPromoted(true); e->SetSecondaryCacheCompatible(true); e->info_.helper = helper; e->key_length = key.size(); e->hash = hash; e->refs = 0; e->next = e->prev = nullptr; - e->SetInCache(true); e->SetPriority(priority); memcpy(e->key_data, key.data(), key.size()); + e->value = nullptr; + e->sec_handle = secondary_handle.release(); + e->Ref(); - value = secondary_handle->Value(); - e->value = value; - e->charge = secondary_handle->Size(); - - // This call could nullify e if the cache is over capacity and - // strict_capacity_limit_ is true. In such a case, the caller will try - // to insert later, which might again fail, but its ok as this should - // not be common - // Being conservative here since there could be lookups that are - // actually ok to fail rather than succeed and bloat up the memory - // usage (preloading partitioned index blocks, for example). - Status s = InsertItem(e, reinterpret_cast(&e)); - if (!s.ok()) { - assert(e == nullptr); - (*helper->del_cb)(key, value); + if (wait) { + Promote(e); + if (!e->value) { + // The secondary cache returned a handle, but the lookup failed + e->Unref(); + e->Free(); + e = nullptr; + } + } else { + // If wait is false, we always return a handle and let the caller + // release the handle after checking for success or failure + e->SetIncomplete(true); } } } @@ -527,7 +564,7 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value, e->SetPriority(priority); memcpy(e->key_data, key.data(), key.size()); - return InsertItem(e, handle); + return InsertItem(e, handle, /* free_handle_on_fail */ true); } void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { @@ -557,6 +594,21 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { } } +bool LRUCacheShard::IsReady(Cache::Handle* handle) { + LRUHandle* e = reinterpret_cast(handle); + bool ready = true; + if (e->IsPending()) { + assert(secondary_cache_); + assert(e->sec_handle); + if (e->sec_handle->IsReady()) { + Promote(e); + } else { + ready = false; + } + } + return ready; +} + size_t LRUCacheShard::GetUsage() const { MutexLock l(&mutex_); return usage_; @@ -597,6 +649,7 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits, use_adaptive_mutex, metadata_charge_policy, /* max_upper_hash_bits */ 32 - num_shard_bits, secondary_cache); } + secondary_cache_ = secondary_cache; } LRUCache::~LRUCache() { @@ -662,6 +715,36 @@ double LRUCache::GetHighPriPoolRatio() { return result; } +void LRUCache::WaitAll(std::vector& handles) { + if (secondary_cache_) { + std::vector sec_handles; + sec_handles.reserve(handles.size()); + for (Handle* handle : handles) { + if (!handle) { + continue; + } + LRUHandle* lru_handle = reinterpret_cast(handle); + if (!lru_handle->IsPending()) { + continue; + } + sec_handles.emplace_back(lru_handle->sec_handle); + } + secondary_cache_->WaitAll(sec_handles); + for (Handle* handle : handles) { + if (!handle) { + continue; + } + LRUHandle* lru_handle = reinterpret_cast(handle); + if (!lru_handle->IsPending()) { + continue; + } + uint32_t hash = GetHash(handle); + LRUCacheShard* shard = static_cast(GetShard(Shard(hash))); + shard->Promote(lru_handle); + } + } +} + std::shared_ptr NewLRUCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, diff --git a/cache/lru_cache.h b/cache/lru_cache.h index f14e4f435..af0155ad9 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -56,7 +56,12 @@ struct LRUHandle { Cache::DeleterFn deleter; const ShardedCache::CacheItemHelper* helper; } info_; - LRUHandle* next_hash; + // An entry is not added to the LRUHandleTable until the secondary cache + // lookup is complete, so its safe to have this union. + union { + LRUHandle* next_hash; + SecondaryCacheResultHandle* sec_handle; + }; LRUHandle* next; LRUHandle* prev; size_t charge; // TODO(opt): Only allow uint32_t? @@ -168,7 +173,16 @@ struct LRUHandle { if (!IsSecondaryCacheCompatible() && info_.deleter) { (*info_.deleter)(key(), value); } else if (IsSecondaryCacheCompatible()) { - (*info_.helper->del_cb)(key(), value); + if (IsPending()) { + assert(sec_handle != nullptr); + SecondaryCacheResultHandle* tmp_sec_handle = sec_handle; + tmp_sec_handle->Wait(); + value = tmp_sec_handle->Value(); + delete tmp_sec_handle; + } + if (value) { + (*info_.helper->del_cb)(key(), value); + } } delete[] reinterpret_cast(this); } @@ -293,7 +307,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { bool force_erase) override { return Release(handle, force_erase); } - virtual bool IsReady(Cache::Handle* /*handle*/) override { return true; } + virtual bool IsReady(Cache::Handle* /*handle*/) override; virtual void Wait(Cache::Handle* /*handle*/) override {} virtual bool Ref(Cache::Handle* handle) override; virtual bool Release(Cache::Handle* handle, @@ -326,10 +340,23 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { double GetHighPriPoolRatio(); private: - Status InsertItem(LRUHandle* item, Cache::Handle** handle); + friend class LRUCache; + // Insert an item into the hash table and, if handle is null, insert into + // the LRU list. Older items are evicted as necessary. If the cache is full + // and free_handle_on_fail is true, the item is deleted and handle is set to. + Status InsertItem(LRUHandle* item, Cache::Handle** handle, + bool free_handle_on_fail); Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, DeleterFn deleter, const Cache::CacheItemHelper* helper, Cache::Handle** handle, Cache::Priority priority); + // Promote an item looked up from the secondary cache to the LRU cache. The + // item is only inserted into the hash table and not the LRU list, and only + // if the cache is not at full capacity, as is the case during Insert. The + // caller should hold a reference on the LRUHandle. When the caller releases + // the last reference, the item is added to the LRU list. + // The item is promoted to the high pri or low pri pool as specified by the + // caller in Lookup. + void Promote(LRUHandle* e); void LRU_Remove(LRUHandle* e); void LRU_Insert(LRUHandle* e); @@ -416,7 +443,7 @@ class LRUCache virtual uint32_t GetHash(Handle* handle) const override; virtual DeleterFn GetDeleter(Handle* handle) const override; virtual void DisownData() override; - virtual void WaitAll(std::vector& /*handles*/) override {} + virtual void WaitAll(std::vector& handles) override; // Retrieves number of elements in LRU, for unit test purpose only size_t TEST_GetLRUSize(); @@ -426,6 +453,7 @@ class LRUCache private: LRUCacheShard* shards_ = nullptr; int num_shards_ = 0; + std::shared_ptr secondary_cache_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 2c87db546..d20fd2463 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -204,6 +204,19 @@ TEST_F(LRUCacheTest, EntriesWithPriority) { class TestSecondaryCache : public SecondaryCache { public: + // Specifies what action to take on a lookup for a particular key + enum ResultType { + SUCCESS, + // Fail lookup immediately + FAIL, + // Defer the result. It will returned after Wait/WaitAll is called + DEFER, + // Defer the result and eventually return failure + DEFER_AND_FAIL + }; + + using ResultMap = std::unordered_map; + explicit TestSecondaryCache(size_t capacity) : num_inserts_(0), num_lookups_(0), inject_failure_(false) { cache_ = NewLRUCache(capacity, 0, false, 0.5, nullptr, @@ -246,22 +259,37 @@ class TestSecondaryCache : public SecondaryCache { }); } - std::unique_ptr Lookup( + std::unique_ptr Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/) override { - std::unique_ptr secondary_handle; + std::string key_str = key.ToString(); + TEST_SYNC_POINT_CALLBACK("TestSecondaryCache::Lookup", &key_str); + + std::unique_ptr secondary_handle; + ResultType type = ResultType::SUCCESS; + auto iter = result_map_.find(key.ToString()); + if (iter != result_map_.end()) { + type = iter->second; + } + if (type == ResultType::FAIL) { + return secondary_handle; + } + Cache::Handle* handle = cache_->Lookup(key); num_lookups_++; if (handle) { - void* value; - size_t charge; - char* ptr = (char*)cache_->Value(handle); - size_t size = DecodeFixed64(ptr); - ptr += sizeof(uint64_t); - Status s = create_cb(ptr, size, &value, &charge); + void* value = nullptr; + size_t charge = 0; + Status s; + if (type != ResultType::DEFER_AND_FAIL) { + char* ptr = (char*)cache_->Value(handle); + size_t size = DecodeFixed64(ptr); + ptr += sizeof(uint64_t); + s = create_cb(ptr, size, &value, &charge); + } if (s.ok()) { - secondary_handle.reset( - new TestSecondaryCacheHandle(cache_.get(), handle, value, charge)); + secondary_handle.reset(new TestSecondaryCacheResultHandle( + cache_.get(), handle, value, charge, type)); } else { cache_->Release(handle); } @@ -271,10 +299,18 @@ class TestSecondaryCache : public SecondaryCache { void Erase(const Slice& /*key*/) override {} - void WaitAll(std::vector /*handles*/) override {} + void WaitAll(std::vector handles) override { + for (SecondaryCacheResultHandle* handle : handles) { + TestSecondaryCacheResultHandle* sec_handle = + static_cast(handle); + sec_handle->SetReady(); + } + } std::string GetPrintableOptions() const override { return ""; } + void SetResultMap(ResultMap&& map) { result_map_ = std::move(map); } + uint32_t num_inserts() { return num_inserts_; } uint32_t num_lookups() { return num_lookups_; } @@ -294,26 +330,41 @@ class TestSecondaryCache : public SecondaryCache { } private: - class TestSecondaryCacheHandle : public SecondaryCacheHandle { + class TestSecondaryCacheResultHandle : public SecondaryCacheResultHandle { public: - TestSecondaryCacheHandle(Cache* cache, Cache::Handle* handle, void* value, - size_t size) - : cache_(cache), handle_(handle), value_(value), size_(size) {} - ~TestSecondaryCacheHandle() override { cache_->Release(handle_); } + TestSecondaryCacheResultHandle(Cache* cache, Cache::Handle* handle, + void* value, size_t size, ResultType type) + : cache_(cache), + handle_(handle), + value_(value), + size_(size), + is_ready_(true) { + if (type != ResultType::SUCCESS) { + is_ready_ = false; + } + } - bool IsReady() override { return true; } + ~TestSecondaryCacheResultHandle() override { cache_->Release(handle_); } + + bool IsReady() override { return is_ready_; } void Wait() override {} - void* Value() override { return value_; } + void* Value() override { + assert(is_ready_); + return value_; + } - size_t Size() override { return size_; } + size_t Size() override { return Value() ? size_ : 0; } + + void SetReady() { is_ready_ = true; } private: Cache* cache_; Cache::Handle* handle_; void* value_; size_t size_; + bool is_ready_; }; std::shared_ptr cache_; @@ -321,6 +372,7 @@ class TestSecondaryCache : public SecondaryCache { uint32_t num_lookups_; bool inject_failure_; std::string db_session_id_; + ResultMap result_map_; }; class DBSecondaryCacheTest : public DBTestBase { @@ -350,6 +402,7 @@ class LRUSecondaryCacheTest : public LRUCacheTest { char* Buf() { return buf_.get(); } size_t Size() { return size_; } + std::string ToString() { return std::string(Buf(), Size()); } private: std::unique_ptr buf_; @@ -575,14 +628,15 @@ TEST_F(LRUSecondaryCacheTest, FullCapacityTest) { handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true); ASSERT_NE(handle, nullptr); - // This lookup should fail, since k1 promotion would have failed due to - // the block cache being at capacity + // k1 promotion should fail due to the block cache being at capacity, + // but the lookup should still succeed Cache::Handle* handle2; handle2 = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true); - ASSERT_EQ(handle2, nullptr); - // Since k1 didn't get promoted, k2 should still be in cache + ASSERT_NE(handle2, nullptr); + // Since k1 didn't get inserted, k2 should still be in cache cache->Release(handle); + cache->Release(handle2); handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true); ASSERT_NE(handle, nullptr); @@ -985,6 +1039,141 @@ TEST_F(DBSecondaryCacheTest, SecondaryCacheFailureTest) { Destroy(options); } +TEST_F(LRUSecondaryCacheTest, BasicWaitAllTest) { + LRUCacheOptions opts(1024, 2, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + std::shared_ptr secondary_cache = + std::make_shared(32 * 1024); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + const int num_keys = 32; + + Random rnd(301); + std::vector values; + for (int i = 0; i < num_keys; ++i) { + std::string str = rnd.RandomString(1020); + values.emplace_back(str); + TestItem* item = new TestItem(str.data(), str.length()); + ASSERT_OK(cache->Insert("k" + std::to_string(i), item, + &LRUSecondaryCacheTest::helper_, str.length())); + } + // Force all entries to be evicted to the secondary cache + cache->SetCapacity(0); + ASSERT_EQ(secondary_cache->num_inserts(), 32u); + cache->SetCapacity(32 * 1024); + + secondary_cache->SetResultMap( + {{"k3", TestSecondaryCache::ResultType::DEFER}, + {"k4", TestSecondaryCache::ResultType::DEFER_AND_FAIL}, + {"k5", TestSecondaryCache::ResultType::FAIL}}); + std::vector results; + for (int i = 0; i < 6; ++i) { + results.emplace_back( + cache->Lookup("k" + std::to_string(i), &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, false)); + } + cache->WaitAll(results); + for (int i = 0; i < 6; ++i) { + if (i == 4) { + ASSERT_EQ(cache->Value(results[i]), nullptr); + } else if (i == 5) { + ASSERT_EQ(results[i], nullptr); + continue; + } else { + TestItem* item = static_cast(cache->Value(results[i])); + ASSERT_EQ(item->ToString(), values[i]); + } + cache->Release(results[i]); + } + + cache.reset(); + secondary_cache.reset(); +} + +// In this test, we have one KV pair per data block. We indirectly determine +// the cache key associated with each data block (and thus each KV) by using +// a sync point callback in TestSecondaryCache::Lookup. We then control the +// lookup result by setting the ResultMap. +TEST_F(DBSecondaryCacheTest, TestSecondaryCacheMultiGet) { + LRUCacheOptions opts(1 << 20, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + std::shared_ptr secondary_cache( + new TestSecondaryCache(2048 * 1024)); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + BlockBasedTableOptions table_options; + table_options.block_cache = cache; + table_options.block_size = 4 * 1024; + table_options.cache_index_and_filter_blocks = false; + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.paranoid_file_checks = true; + DestroyAndReopen(options); + Random rnd(301); + const int N = 8; + std::vector keys; + for (int i = 0; i < N; i++) { + std::string p_v = rnd.RandomString(4000); + keys.emplace_back(p_v); + ASSERT_OK(Put(Key(i), p_v)); + } + + ASSERT_OK(Flush()); + // After Flush is successful, RocksDB does the paranoid check for the new + // SST file. This will try to lookup all data blocks in the secondary + // cache. + ASSERT_EQ(secondary_cache->num_inserts(), 0u); + ASSERT_EQ(secondary_cache->num_lookups(), 8u); + + cache->SetCapacity(0); + ASSERT_EQ(secondary_cache->num_inserts(), 8u); + cache->SetCapacity(1 << 20); + + std::vector cache_keys; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "TestSecondaryCache::Lookup", [&cache_keys](void* key) -> void { + cache_keys.emplace_back(*(static_cast(key))); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + for (int i = 0; i < N; ++i) { + std::string v = Get(Key(i)); + ASSERT_EQ(4000, v.size()); + ASSERT_EQ(v, keys[i]); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(secondary_cache->num_lookups(), 16u); + cache->SetCapacity(0); + cache->SetCapacity(1 << 20); + + ASSERT_EQ(Get(Key(2)), keys[2]); + ASSERT_EQ(Get(Key(7)), keys[7]); + secondary_cache->SetResultMap( + {{cache_keys[3], TestSecondaryCache::ResultType::DEFER}, + {cache_keys[4], TestSecondaryCache::ResultType::DEFER_AND_FAIL}, + {cache_keys[5], TestSecondaryCache::ResultType::FAIL}}); + + std::vector mget_keys( + {Key(0), Key(1), Key(2), Key(3), Key(4), Key(5), Key(6), Key(7)}); + std::vector values(mget_keys.size()); + std::vector s(keys.size()); + std::vector key_slices; + for (const std::string& key : mget_keys) { + key_slices.emplace_back(key); + } + uint32_t num_lookups = secondary_cache->num_lookups(); + dbfull()->MultiGet(ReadOptions(), dbfull()->DefaultColumnFamily(), + key_slices.size(), key_slices.data(), values.data(), + s.data(), false); + ASSERT_EQ(secondary_cache->num_lookups(), num_lookups + 5); + for (int i = 0; i < N; ++i) { + ASSERT_OK(s[i]); + ASSERT_EQ(values[i].ToString(), keys[i]); + values[i].Reset(); + } + Destroy(options); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/cache/sharded_cache.h b/cache/sharded_cache.h index fba6c4331..3e2a20aba 100644 --- a/cache/sharded_cache.h +++ b/cache/sharded_cache.h @@ -115,9 +115,10 @@ class ShardedCache : public Cache { int GetNumShardBits() const; uint32_t GetNumShards() const; - private: + protected: inline uint32_t Shard(uint32_t hash) { return hash & shard_mask_; } + private: const uint32_t shard_mask_; mutable port::Mutex capacity_mutex_; size_t capacity_; diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index d9d99e983..c1ce88dbd 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -472,7 +472,9 @@ class Cache { return Release(handle, force_erase); } - // Determines if the handle returned by Lookup() has a valid value yet. + // Determines if the handle returned by Lookup() has a valid value yet. The + // call is not thread safe and should be called only by someone holding a + // reference to the handle. virtual bool IsReady(Handle* /*handle*/) { return true; } // If the handle returned by Lookup() is not ready yet, wait till it @@ -482,7 +484,9 @@ class Cache { virtual void Wait(Handle* /*handle*/) {} // Wait for a vector of handles to become ready. As with Wait(), the user - // should check the Value() of each handle for nullptr + // should check the Value() of each handle for nullptr. This call is not + // thread safe and should only be called by the caller holding a reference + // to each of the handles. virtual void WaitAll(std::vector& /*handles*/) {} private: diff --git a/include/rocksdb/secondary_cache.h b/include/rocksdb/secondary_cache.h index de5d3d043..221b3e5f2 100644 --- a/include/rocksdb/secondary_cache.h +++ b/include/rocksdb/secondary_cache.h @@ -21,9 +21,9 @@ namespace ROCKSDB_NAMESPACE { // ready, and call Wait() in order to block until it becomes ready. // The caller must call value() after it becomes ready to determine if the // handle successfullly read the item. -class SecondaryCacheHandle { +class SecondaryCacheResultHandle { public: - virtual ~SecondaryCacheHandle() {} + virtual ~SecondaryCacheResultHandle() {} // Returns whether the handle is ready or not virtual bool IsReady() = 0; @@ -63,7 +63,7 @@ class SecondaryCache { // will be used to create the object. The handle returned may not be // ready yet, unless wait=true, in which case Lookup() will block until // the handle is ready - virtual std::unique_ptr Lookup( + virtual std::unique_ptr Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool wait) = 0; // At the discretion of the implementation, erase the data associated @@ -71,7 +71,7 @@ class SecondaryCache { virtual void Erase(const Slice& key) = 0; // Wait for a collection of handles to become ready - virtual void WaitAll(std::vector handles) = 0; + virtual void WaitAll(std::vector handles) = 0; virtual std::string GetPrintableOptions() const = 0; }; diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index fe55ccaf0..7392a0d0b 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -350,11 +350,11 @@ void BlockBasedTable::UpdateCacheInsertionMetrics( } Cache::Handle* BlockBasedTable::GetEntryFromCache( - Cache* block_cache, const Slice& key, BlockType block_type, + Cache* block_cache, const Slice& key, BlockType block_type, const bool wait, GetContext* get_context, const Cache::CacheItemHelper* cache_helper, const Cache::CreateCallback& create_cb, Cache::Priority priority) const { auto cache_handle = - block_cache->Lookup(key, cache_helper, create_cb, priority, true, + block_cache->Lookup(key, cache_helper, create_cb, priority, wait, rep_->ioptions.statistics.get()); if (cache_handle != nullptr) { @@ -1104,7 +1104,7 @@ Status BlockBasedTable::GetDataBlockFromCache( Cache* block_cache, Cache* block_cache_compressed, const ReadOptions& read_options, CachableEntry* block, const UncompressionDict& uncompression_dict, BlockType block_type, - GetContext* get_context) const { + const bool wait, GetContext* get_context) const { const size_t read_amp_bytes_per_bit = block_type == BlockType::kData ? rep_->table_options.read_amp_bytes_per_bit @@ -1131,7 +1131,7 @@ Status BlockBasedTable::GetDataBlockFromCache( // Lookup uncompressed cache first if (block_cache != nullptr) { auto cache_handle = GetEntryFromCache( - block_cache, block_cache_key, block_type, get_context, + block_cache, block_cache_key, block_type, wait, get_context, BlocklikeTraits::GetCacheItemHelper(block_type), create_cb, priority); if (cache_handle != nullptr) { @@ -1399,9 +1399,9 @@ template Status BlockBasedTable::MaybeReadBlockAndLoadToCache( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, - CachableEntry* block_entry, BlockType block_type, - GetContext* get_context, BlockCacheLookupContext* lookup_context, - BlockContents* contents) const { + const bool wait, CachableEntry* block_entry, + BlockType block_type, GetContext* get_context, + BlockCacheLookupContext* lookup_context, BlockContents* contents) const { assert(block_entry != nullptr); const bool no_io = (ro.read_tier == kBlockCacheTier); Cache* block_cache = rep_->table_options.block_cache.get(); @@ -1433,8 +1433,10 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( if (!contents) { s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, ro, block_entry, uncompression_dict, block_type, - get_context); - if (block_entry->GetValue()) { + wait, get_context); + // Value could still be null at this point, so check the cache handle + // and update the read pattern for prefetching + if (block_entry->GetValue() || block_entry->GetCacheHandle()) { // TODO(haoyu): Differentiate cache hit on uncompressed block cache and // compressed block cache. is_cache_hit = true; @@ -1450,7 +1452,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( // Can't find the block from the cache. If I/O is allowed, read from the // file. - if (block_entry->GetValue() == nullptr && !no_io && ro.fill_cache) { + if (block_entry->GetValue() == nullptr && + block_entry->GetCacheHandle() == nullptr && !no_io && ro.fill_cache) { Statistics* statistics = rep_->ioptions.stats; const bool maybe_compressed = block_type != BlockType::kFilter && @@ -1613,7 +1616,8 @@ void BlockBasedTable::RetrieveMultipleBlocks( RetrieveBlock(nullptr, options, handle, uncompression_dict, &(*results)[idx_in_batch], BlockType::kData, mget_iter->get_context, &lookup_data_block_context, - /* for_compaction */ false, /* use_cache */ true); + /* for_compaction */ false, /* use_cache */ true, + /* wait_for_cache */ true); } return; } @@ -1810,8 +1814,8 @@ void BlockBasedTable::RetrieveMultipleBlocks( // necessary. Since we're passing the raw block contents, it will // avoid looking up the block cache s = MaybeReadBlockAndLoadToCache( - nullptr, options, handle, uncompression_dict, block_entry, - BlockType::kData, mget_iter->get_context, + nullptr, options, handle, uncompression_dict, /*wait=*/true, + block_entry, BlockType::kData, mget_iter->get_context, &lookup_data_block_context, &raw_block_contents); // block_entry value could be null if no block cache is present, i.e @@ -1858,22 +1862,23 @@ Status BlockBasedTable::RetrieveBlock( const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache) const { + bool for_compaction, bool use_cache, bool wait_for_cache) const { assert(block_entry); assert(block_entry->IsEmpty()); Status s; if (use_cache) { - s = MaybeReadBlockAndLoadToCache(prefetch_buffer, ro, handle, - uncompression_dict, block_entry, - block_type, get_context, lookup_context, - /*contents=*/nullptr); + s = MaybeReadBlockAndLoadToCache( + prefetch_buffer, ro, handle, uncompression_dict, wait_for_cache, + block_entry, block_type, get_context, lookup_context, + /*contents=*/nullptr); if (!s.ok()) { return s; } - if (block_entry->GetValue() != nullptr) { + if (block_entry->GetValue() != nullptr || + block_entry->GetCacheHandle() != nullptr) { assert(s.ok()); return s; } @@ -1941,28 +1946,28 @@ template Status BlockBasedTable::RetrieveBlock( const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache) const; + bool for_compaction, bool use_cache, bool wait_for_cache) const; template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache) const; + bool for_compaction, bool use_cache, bool wait_for_cache) const; template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache) const; + bool for_compaction, bool use_cache, bool wait_for_cache) const; template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache) const; + bool for_compaction, bool use_cache, bool wait_for_cache) const; BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState( const BlockBasedTable* table, @@ -2479,6 +2484,8 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, { MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(), sst_file_range.end()); + std::vector cache_handles; + bool wait_for_cache_results = false; CachableEntry uncompression_dict; Status uncompression_dict_status; @@ -2551,20 +2558,64 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, Status s = RetrieveBlock( nullptr, ro, handle, dict, &(results.back()), BlockType::kData, miter->get_context, &lookup_data_block_context, - /* for_compaction */ false, /* use_cache */ true); + /* for_compaction */ false, /* use_cache */ true, + /* wait_for_cache */ false); if (s.IsIncomplete()) { s = Status::OK(); } if (s.ok() && !results.back().IsEmpty()) { - // Found it in the cache. Add NULL handle to indicate there is - // nothing to read from disk - block_handles.emplace_back(BlockHandle::NullBlockHandle()); + if (results.back().IsReady()) { + // Found it in the cache. Add NULL handle to indicate there is + // nothing to read from disk. + if (results.back().GetCacheHandle()) { + results.back().UpdateCachedValue(); + // Its possible the cache lookup returned a non-null handle, + // but the lookup actually failed to produce a valid value + if (results.back().GetValue() == nullptr) { + block_handles.emplace_back(handle); + total_len += block_size(handle); + } + } + if (results.back().GetValue() != nullptr) { + block_handles.emplace_back(BlockHandle::NullBlockHandle()); + } + } else { + // We have to wait for the asynchronous cache lookup to finish, + // and then we may have to read the block from disk anyway + wait_for_cache_results = true; + block_handles.emplace_back(handle); + cache_handles.emplace_back(results.back().GetCacheHandle()); + } } else { block_handles.emplace_back(handle); total_len += block_size(handle); } } + if (wait_for_cache_results) { + Cache* block_cache = rep_->table_options.block_cache.get(); + block_cache->WaitAll(cache_handles); + for (size_t i = 0; i < block_handles.size(); ++i) { + // If this block was a success or failure or not needed because + // the corresponding key is in the same block as a prior key, skip + if (block_handles[i] == BlockHandle::NullBlockHandle() || + results[i].IsEmpty()) { + continue; + } + results[i].UpdateCachedValue(); + void* val = results[i].GetValue(); + if (!val) { + // The async cache lookup failed - could be due to an error + // or a false positive. We need to read the data block from + // the SST file + results[i].Reset(); + total_len += block_size(block_handles[i]); + } else { + block_handles[i] = BlockHandle::NullBlockHandle(); + } + } + } + if (total_len) { char* scratch = nullptr; const UncompressionDict& dict = uncompression_dict.GetValue() diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index bc3c87f73..43b56a68c 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -274,7 +274,7 @@ class BlockBasedTable : public TableReader { GetContext* get_context) const; Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, - BlockType block_type, + BlockType block_type, const bool wait, GetContext* get_context, const Cache::CacheItemHelper* cache_helper, const Cache::CreateCallback& create_cb, @@ -300,9 +300,9 @@ class BlockBasedTable : public TableReader { Status MaybeReadBlockAndLoadToCache( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, - CachableEntry* block_entry, BlockType block_type, - GetContext* get_context, BlockCacheLookupContext* lookup_context, - BlockContents* contents) const; + const bool wait, CachableEntry* block_entry, + BlockType block_type, GetContext* get_context, + BlockCacheLookupContext* lookup_context, BlockContents* contents) const; // Similar to the above, with one crucial difference: it will retrieve the // block from the file even if there are no caches configured (assuming the @@ -314,7 +314,8 @@ class BlockBasedTable : public TableReader { CachableEntry* block_entry, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache) const; + bool for_compaction, bool use_cache, + bool wait_for_cache) const; void RetrieveMultipleBlocks( const ReadOptions& options, const MultiGetRange* batch, @@ -354,7 +355,7 @@ class BlockBasedTable : public TableReader { Cache* block_cache, Cache* block_cache_compressed, const ReadOptions& read_options, CachableEntry* block, const UncompressionDict& uncompression_dict, BlockType block_type, - GetContext* get_context) const; + const bool wait, GetContext* get_context) const; // Put a raw block (maybe compressed) to the corresponding block caches. // This method will perform decompression against raw_block if needed and then diff --git a/table/block_based/block_based_table_reader_impl.h b/table/block_based/block_based_table_reader_impl.h index d9cfaa92c..603c62431 100644 --- a/table/block_based/block_based_table_reader_impl.h +++ b/table/block_based/block_based_table_reader_impl.h @@ -54,7 +54,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( CachableEntry block; s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type, get_context, lookup_context, for_compaction, - /* use_cache */ true); + /* use_cache */ true, /* wait_for_cache */ true); if (!s.ok()) { assert(block.IsEmpty()); diff --git a/table/block_based/cachable_entry.h b/table/block_based/cachable_entry.h index 598f1ef57..155097c05 100644 --- a/table/block_based/cachable_entry.h +++ b/table/block_based/cachable_entry.h @@ -162,7 +162,6 @@ public: } void SetCachedValue(T* value, Cache* cache, Cache::Handle* cache_handle) { - assert(value != nullptr); assert(cache != nullptr); assert(cache_handle != nullptr); @@ -179,6 +178,22 @@ public: assert(!own_value_); } + void UpdateCachedValue() { + assert(cache_ != nullptr); + assert(cache_handle_ != nullptr); + + value_ = static_cast(cache_->Value(cache_handle_)); + } + + bool IsReady() { + if (!own_value_) { + assert(cache_ != nullptr); + assert(cache_handle_ != nullptr); + return cache_->IsReady(cache_handle_); + } + return true; + } + private: void ReleaseResource() { if (LIKELY(cache_handle_ != nullptr)) { diff --git a/table/block_based/filter_block_reader_common.cc b/table/block_based/filter_block_reader_common.cc index fa0802669..135fffdf2 100644 --- a/table/block_based/filter_block_reader_common.cc +++ b/table/block_based/filter_block_reader_common.cc @@ -30,7 +30,8 @@ Status FilterBlockReaderCommon::ReadFilterBlock( table->RetrieveBlock(prefetch_buffer, read_options, rep->filter_handle, UncompressionDict::GetEmptyDict(), filter_block, BlockType::kFilter, get_context, lookup_context, - /* for_compaction */ false, use_cache); + /* for_compaction */ false, use_cache, + /* wait_for_cache */ true); return s; } diff --git a/table/block_based/index_reader_common.cc b/table/block_based/index_reader_common.cc index 76f894d59..275ae56dc 100644 --- a/table/block_based/index_reader_common.cc +++ b/table/block_based/index_reader_common.cc @@ -26,7 +26,8 @@ Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock( const Status s = table->RetrieveBlock( prefetch_buffer, read_options, rep->footer.index_handle(), UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex, - get_context, lookup_context, /* for_compaction */ false, use_cache); + get_context, lookup_context, /* for_compaction */ false, use_cache, + /* wait_for_cache */ true); return s; } diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index 80e4b38d8..61cd12587 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -296,7 +296,8 @@ Status PartitionedFilterBlockReader::GetFilterPartitionBlock( table()->RetrieveBlock(prefetch_buffer, read_options, fltr_blk_handle, UncompressionDict::GetEmptyDict(), filter_block, BlockType::kFilter, get_context, lookup_context, - /* for_compaction */ false, /* use_cache */ true); + /* for_compaction */ false, /* use_cache */ true, + /* wait_for_cache */ true); return s; } @@ -490,8 +491,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro, // filter blocks s = table()->MaybeReadBlockAndLoadToCache( prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), - &block, BlockType::kFilter, nullptr /* get_context */, &lookup_context, - nullptr /* contents */); + /* wait */ true, &block, BlockType::kFilter, nullptr /* get_context */, + &lookup_context, nullptr /* contents */); if (!s.ok()) { return s; } diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index 285aa72f7..acb40f125 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -167,8 +167,8 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro, // filter blocks s = table()->MaybeReadBlockAndLoadToCache( prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), - &block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context, - /*contents=*/nullptr); + /*wait=*/true, &block, BlockType::kIndex, /*get_context=*/nullptr, + &lookup_context, /*contents=*/nullptr); if (!s.ok()) { return s; diff --git a/table/block_based/uncompression_dict_reader.cc b/table/block_based/uncompression_dict_reader.cc index 5b50092ff..dae5ddac2 100644 --- a/table/block_based/uncompression_dict_reader.cc +++ b/table/block_based/uncompression_dict_reader.cc @@ -60,7 +60,7 @@ Status UncompressionDictReader::ReadUncompressionDictionary( prefetch_buffer, read_options, rep->compression_dict_handle, UncompressionDict::GetEmptyDict(), uncompression_dict, BlockType::kCompressionDictionary, get_context, lookup_context, - /* for_compaction */ false, use_cache); + /* for_compaction */ false, use_cache, /* wait_for_cache */ true); if (!s.ok()) { ROCKS_LOG_WARN(