Parallelize secondary cache lookup in MultiGet (#8405)

Summary:
Implement the ```WaitAll()``` interface in ```LRUCache``` to allow callers to issue multiple lookups in parallel and wait for all of them to complete. Modify ```MultiGet``` to use this to parallelize the secondary cache lookups in order to reduce the overall latency. A call to ```cache->Lookup()``` returns a handle that has an incomplete value (nullptr), and the caller can call ```cache->IsReady()``` to check whether the lookup is complete, and pass a vector of handles to ```WaitAll``` to wait for completion. If any of the lookups fail, ```MultiGet``` will read the block from the SST file.

Another change in this PR is to rename ```SecondaryCacheHandle``` to ```SecondaryCacheResultHandle``` as it more accurately describes the return result of the secondary cache lookup, which is more like a future.

Tests:
1. Add unit tests in lru_cache_test
2. Benchmark results with no secondary cache configured
Master -
```
readrandom   :      41.175 micros/op 388562 ops/sec;  106.7 MB/s (7277999 of 7277999 found)
readrandom   :      41.217 micros/op 388160 ops/sec;  106.6 MB/s (7274999 of 7274999 found)
multireadrandom :      10.309 micros/op 1552082 ops/sec; (28908992 of 28908992 found)
multireadrandom :      10.321 micros/op 1550218 ops/sec; (29081984 of 29081984 found)
```

This PR -
```
readrandom   :      41.158 micros/op 388723 ops/sec;  106.8 MB/s (7290999 of 7290999 found)
readrandom   :      41.185 micros/op 388463 ops/sec;  106.7 MB/s (7287999 of 7287999 found)
multireadrandom :      10.277 micros/op 1556801 ops/sec; (29346944 of 29346944 found)
multireadrandom :      10.253 micros/op 1560539 ops/sec; (29274944 of 29274944 found)
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8405

Reviewed By: zhichao-cao

Differential Revision: D29190509

Pulled By: anand1976

fbshipit-source-id: 6f8eff6246712af8a297cfe22ea0d1c3b2a01bb0
This commit is contained in:
anand76 2021-06-18 09:35:03 -07:00 committed by Facebook GitHub Bot
parent e817bc9628
commit 8ea0a2c1bd
15 changed files with 478 additions and 103 deletions

129
cache/lru_cache.cc vendored
View File

@ -309,7 +309,8 @@ void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) {
strict_capacity_limit_ = strict_capacity_limit;
}
Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) {
Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle,
bool free_handle_on_fail) {
Status s = Status::OK();
autovector<LRUHandle*> last_reference_list;
size_t total_charge = e->CalcTotalCharge(metadata_charge_policy_);
@ -323,14 +324,16 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) {
if ((usage_ + total_charge) > capacity_ &&
(strict_capacity_limit_ || handle == nullptr)) {
e->SetInCache(false);
if (handle == nullptr) {
// Don't insert the entry but still return ok, as if the entry inserted
// into cache and get evicted immediately.
e->SetInCache(false);
last_reference_list.push_back(e);
} else {
if (free_handle_on_fail) {
delete[] reinterpret_cast<char*>(e);
*handle = nullptr;
}
s = Status::Incomplete("Insert failed due to LRU cache being full.");
}
} else {
@ -375,6 +378,43 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) {
return s;
}
void LRUCacheShard::Promote(LRUHandle* e) {
SecondaryCacheResultHandle* secondary_handle = e->sec_handle;
assert(secondary_handle->IsReady());
e->SetIncomplete(false);
e->SetInCache(true);
e->SetPromoted(true);
e->value = secondary_handle->Value();
e->charge = secondary_handle->Size();
delete secondary_handle;
// This call could fail if the cache is over capacity and
// strict_capacity_limit_ is true. In such a case, we don't want
// InsertItem() to free the handle, since the item is already in memory
// and the caller will most likely just read from disk if we erase it here.
if (e->value) {
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(e);
Status s = InsertItem(e, &handle, /*free_handle_on_fail=*/false);
if (s.ok()) {
// InsertItem would have taken a reference on the item, so decrement it
// here as we expect the caller to already hold a reference
e->Unref();
} else {
// Item is in memory, but not accounted against the cache capacity.
// When the handle is released, the item should get deleted
assert(!e->InCache());
}
} else {
// Since the secondary cache lookup failed, mark the item as not in cache
// and charge the cache only for metadata usage, i.e handle, key etc
MutexLock l(&mutex_);
e->charge = 0;
e->SetInCache(false);
usage_ += e->CalcTotalCharge(metadata_charge_policy_);
}
}
Cache::Handle* LRUCacheShard::Lookup(
const Slice& key, uint32_t hash,
const ShardedCache::CacheItemHelper* helper,
@ -399,47 +439,44 @@ Cache::Handle* LRUCacheShard::Lookup(
// mutex if we're going to lookup in the secondary cache
// Only support synchronous for now
// TODO: Support asynchronous lookup in secondary cache
if (!e && secondary_cache_ && helper && helper->saveto_cb && wait) {
if (!e && secondary_cache_ && helper && helper->saveto_cb) {
// For objects from the secondary cache, we expect the caller to provide
// a way to create/delete the primary cache object. The only case where
// a deleter would not be required is for dummy entries inserted for
// accounting purposes, which we won't demote to the secondary cache
// anyway.
assert(create_cb && helper->del_cb);
std::unique_ptr<SecondaryCacheHandle> secondary_handle =
std::unique_ptr<SecondaryCacheResultHandle> secondary_handle =
secondary_cache_->Lookup(key, create_cb, wait);
if (secondary_handle != nullptr) {
void* value = nullptr;
e = reinterpret_cast<LRUHandle*>(
new char[sizeof(LRUHandle) - 1 + key.size()]);
e->flags = 0;
e->SetPromoted(true);
e->SetSecondaryCacheCompatible(true);
e->info_.helper = helper;
e->key_length = key.size();
e->hash = hash;
e->refs = 0;
e->next = e->prev = nullptr;
e->SetInCache(true);
e->SetPriority(priority);
memcpy(e->key_data, key.data(), key.size());
e->value = nullptr;
e->sec_handle = secondary_handle.release();
e->Ref();
value = secondary_handle->Value();
e->value = value;
e->charge = secondary_handle->Size();
// This call could nullify e if the cache is over capacity and
// strict_capacity_limit_ is true. In such a case, the caller will try
// to insert later, which might again fail, but its ok as this should
// not be common
// Being conservative here since there could be lookups that are
// actually ok to fail rather than succeed and bloat up the memory
// usage (preloading partitioned index blocks, for example).
Status s = InsertItem(e, reinterpret_cast<Cache::Handle**>(&e));
if (!s.ok()) {
assert(e == nullptr);
(*helper->del_cb)(key, value);
if (wait) {
Promote(e);
if (!e->value) {
// The secondary cache returned a handle, but the lookup failed
e->Unref();
e->Free();
e = nullptr;
}
} else {
// If wait is false, we always return a handle and let the caller
// release the handle after checking for success or failure
e->SetIncomplete(true);
}
}
}
@ -527,7 +564,7 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
e->SetPriority(priority);
memcpy(e->key_data, key.data(), key.size());
return InsertItem(e, handle);
return InsertItem(e, handle, /* free_handle_on_fail */ true);
}
void LRUCacheShard::Erase(const Slice& key, uint32_t hash) {
@ -557,6 +594,21 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) {
}
}
bool LRUCacheShard::IsReady(Cache::Handle* handle) {
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
bool ready = true;
if (e->IsPending()) {
assert(secondary_cache_);
assert(e->sec_handle);
if (e->sec_handle->IsReady()) {
Promote(e);
} else {
ready = false;
}
}
return ready;
}
size_t LRUCacheShard::GetUsage() const {
MutexLock l(&mutex_);
return usage_;
@ -597,6 +649,7 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits,
use_adaptive_mutex, metadata_charge_policy,
/* max_upper_hash_bits */ 32 - num_shard_bits, secondary_cache);
}
secondary_cache_ = secondary_cache;
}
LRUCache::~LRUCache() {
@ -662,6 +715,36 @@ double LRUCache::GetHighPriPoolRatio() {
return result;
}
void LRUCache::WaitAll(std::vector<Handle*>& handles) {
if (secondary_cache_) {
std::vector<SecondaryCacheResultHandle*> sec_handles;
sec_handles.reserve(handles.size());
for (Handle* handle : handles) {
if (!handle) {
continue;
}
LRUHandle* lru_handle = reinterpret_cast<LRUHandle*>(handle);
if (!lru_handle->IsPending()) {
continue;
}
sec_handles.emplace_back(lru_handle->sec_handle);
}
secondary_cache_->WaitAll(sec_handles);
for (Handle* handle : handles) {
if (!handle) {
continue;
}
LRUHandle* lru_handle = reinterpret_cast<LRUHandle*>(handle);
if (!lru_handle->IsPending()) {
continue;
}
uint32_t hash = GetHash(handle);
LRUCacheShard* shard = static_cast<LRUCacheShard*>(GetShard(Shard(hash)));
shard->Promote(lru_handle);
}
}
}
std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,

34
cache/lru_cache.h vendored
View File

@ -56,7 +56,12 @@ struct LRUHandle {
Cache::DeleterFn deleter;
const ShardedCache::CacheItemHelper* helper;
} info_;
// An entry is not added to the LRUHandleTable until the secondary cache
// lookup is complete, so its safe to have this union.
union {
LRUHandle* next_hash;
SecondaryCacheResultHandle* sec_handle;
};
LRUHandle* next;
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
@ -168,8 +173,17 @@ struct LRUHandle {
if (!IsSecondaryCacheCompatible() && info_.deleter) {
(*info_.deleter)(key(), value);
} else if (IsSecondaryCacheCompatible()) {
if (IsPending()) {
assert(sec_handle != nullptr);
SecondaryCacheResultHandle* tmp_sec_handle = sec_handle;
tmp_sec_handle->Wait();
value = tmp_sec_handle->Value();
delete tmp_sec_handle;
}
if (value) {
(*info_.helper->del_cb)(key(), value);
}
}
delete[] reinterpret_cast<char*>(this);
}
@ -293,7 +307,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard {
bool force_erase) override {
return Release(handle, force_erase);
}
virtual bool IsReady(Cache::Handle* /*handle*/) override { return true; }
virtual bool IsReady(Cache::Handle* /*handle*/) override;
virtual void Wait(Cache::Handle* /*handle*/) override {}
virtual bool Ref(Cache::Handle* handle) override;
virtual bool Release(Cache::Handle* handle,
@ -326,10 +340,23 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard {
double GetHighPriPoolRatio();
private:
Status InsertItem(LRUHandle* item, Cache::Handle** handle);
friend class LRUCache;
// Insert an item into the hash table and, if handle is null, insert into
// the LRU list. Older items are evicted as necessary. If the cache is full
// and free_handle_on_fail is true, the item is deleted and handle is set to.
Status InsertItem(LRUHandle* item, Cache::Handle** handle,
bool free_handle_on_fail);
Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge,
DeleterFn deleter, const Cache::CacheItemHelper* helper,
Cache::Handle** handle, Cache::Priority priority);
// Promote an item looked up from the secondary cache to the LRU cache. The
// item is only inserted into the hash table and not the LRU list, and only
// if the cache is not at full capacity, as is the case during Insert. The
// caller should hold a reference on the LRUHandle. When the caller releases
// the last reference, the item is added to the LRU list.
// The item is promoted to the high pri or low pri pool as specified by the
// caller in Lookup.
void Promote(LRUHandle* e);
void LRU_Remove(LRUHandle* e);
void LRU_Insert(LRUHandle* e);
@ -416,7 +443,7 @@ class LRUCache
virtual uint32_t GetHash(Handle* handle) const override;
virtual DeleterFn GetDeleter(Handle* handle) const override;
virtual void DisownData() override;
virtual void WaitAll(std::vector<Handle*>& /*handles*/) override {}
virtual void WaitAll(std::vector<Handle*>& handles) override;
// Retrieves number of elements in LRU, for unit test purpose only
size_t TEST_GetLRUSize();
@ -426,6 +453,7 @@ class LRUCache
private:
LRUCacheShard* shards_ = nullptr;
int num_shards_ = 0;
std::shared_ptr<SecondaryCache> secondary_cache_;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -204,6 +204,19 @@ TEST_F(LRUCacheTest, EntriesWithPriority) {
class TestSecondaryCache : public SecondaryCache {
public:
// Specifies what action to take on a lookup for a particular key
enum ResultType {
SUCCESS,
// Fail lookup immediately
FAIL,
// Defer the result. It will returned after Wait/WaitAll is called
DEFER,
// Defer the result and eventually return failure
DEFER_AND_FAIL
};
using ResultMap = std::unordered_map<std::string, ResultType>;
explicit TestSecondaryCache(size_t capacity)
: num_inserts_(0), num_lookups_(0), inject_failure_(false) {
cache_ = NewLRUCache(capacity, 0, false, 0.5, nullptr,
@ -246,22 +259,37 @@ class TestSecondaryCache : public SecondaryCache {
});
}
std::unique_ptr<SecondaryCacheHandle> Lookup(
std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& key, const Cache::CreateCallback& create_cb,
bool /*wait*/) override {
std::unique_ptr<SecondaryCacheHandle> secondary_handle;
std::string key_str = key.ToString();
TEST_SYNC_POINT_CALLBACK("TestSecondaryCache::Lookup", &key_str);
std::unique_ptr<SecondaryCacheResultHandle> secondary_handle;
ResultType type = ResultType::SUCCESS;
auto iter = result_map_.find(key.ToString());
if (iter != result_map_.end()) {
type = iter->second;
}
if (type == ResultType::FAIL) {
return secondary_handle;
}
Cache::Handle* handle = cache_->Lookup(key);
num_lookups_++;
if (handle) {
void* value;
size_t charge;
void* value = nullptr;
size_t charge = 0;
Status s;
if (type != ResultType::DEFER_AND_FAIL) {
char* ptr = (char*)cache_->Value(handle);
size_t size = DecodeFixed64(ptr);
ptr += sizeof(uint64_t);
Status s = create_cb(ptr, size, &value, &charge);
s = create_cb(ptr, size, &value, &charge);
}
if (s.ok()) {
secondary_handle.reset(
new TestSecondaryCacheHandle(cache_.get(), handle, value, charge));
secondary_handle.reset(new TestSecondaryCacheResultHandle(
cache_.get(), handle, value, charge, type));
} else {
cache_->Release(handle);
}
@ -271,10 +299,18 @@ class TestSecondaryCache : public SecondaryCache {
void Erase(const Slice& /*key*/) override {}
void WaitAll(std::vector<SecondaryCacheHandle*> /*handles*/) override {}
void WaitAll(std::vector<SecondaryCacheResultHandle*> handles) override {
for (SecondaryCacheResultHandle* handle : handles) {
TestSecondaryCacheResultHandle* sec_handle =
static_cast<TestSecondaryCacheResultHandle*>(handle);
sec_handle->SetReady();
}
}
std::string GetPrintableOptions() const override { return ""; }
void SetResultMap(ResultMap&& map) { result_map_ = std::move(map); }
uint32_t num_inserts() { return num_inserts_; }
uint32_t num_lookups() { return num_lookups_; }
@ -294,26 +330,41 @@ class TestSecondaryCache : public SecondaryCache {
}
private:
class TestSecondaryCacheHandle : public SecondaryCacheHandle {
class TestSecondaryCacheResultHandle : public SecondaryCacheResultHandle {
public:
TestSecondaryCacheHandle(Cache* cache, Cache::Handle* handle, void* value,
size_t size)
: cache_(cache), handle_(handle), value_(value), size_(size) {}
~TestSecondaryCacheHandle() override { cache_->Release(handle_); }
TestSecondaryCacheResultHandle(Cache* cache, Cache::Handle* handle,
void* value, size_t size, ResultType type)
: cache_(cache),
handle_(handle),
value_(value),
size_(size),
is_ready_(true) {
if (type != ResultType::SUCCESS) {
is_ready_ = false;
}
}
bool IsReady() override { return true; }
~TestSecondaryCacheResultHandle() override { cache_->Release(handle_); }
bool IsReady() override { return is_ready_; }
void Wait() override {}
void* Value() override { return value_; }
void* Value() override {
assert(is_ready_);
return value_;
}
size_t Size() override { return size_; }
size_t Size() override { return Value() ? size_ : 0; }
void SetReady() { is_ready_ = true; }
private:
Cache* cache_;
Cache::Handle* handle_;
void* value_;
size_t size_;
bool is_ready_;
};
std::shared_ptr<Cache> cache_;
@ -321,6 +372,7 @@ class TestSecondaryCache : public SecondaryCache {
uint32_t num_lookups_;
bool inject_failure_;
std::string db_session_id_;
ResultMap result_map_;
};
class DBSecondaryCacheTest : public DBTestBase {
@ -350,6 +402,7 @@ class LRUSecondaryCacheTest : public LRUCacheTest {
char* Buf() { return buf_.get(); }
size_t Size() { return size_; }
std::string ToString() { return std::string(Buf(), Size()); }
private:
std::unique_ptr<char[]> buf_;
@ -575,14 +628,15 @@ TEST_F(LRUSecondaryCacheTest, FullCapacityTest) {
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
// This lookup should fail, since k1 promotion would have failed due to
// the block cache being at capacity
// k1 promotion should fail due to the block cache being at capacity,
// but the lookup should still succeed
Cache::Handle* handle2;
handle2 = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_EQ(handle2, nullptr);
// Since k1 didn't get promoted, k2 should still be in cache
ASSERT_NE(handle2, nullptr);
// Since k1 didn't get inserted, k2 should still be in cache
cache->Release(handle);
cache->Release(handle2);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
@ -985,6 +1039,141 @@ TEST_F(DBSecondaryCacheTest, SecondaryCacheFailureTest) {
Destroy(options);
}
TEST_F(LRUSecondaryCacheTest, BasicWaitAllTest) {
LRUCacheOptions opts(1024, 2, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(32 * 1024);
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
const int num_keys = 32;
Random rnd(301);
std::vector<std::string> values;
for (int i = 0; i < num_keys; ++i) {
std::string str = rnd.RandomString(1020);
values.emplace_back(str);
TestItem* item = new TestItem(str.data(), str.length());
ASSERT_OK(cache->Insert("k" + std::to_string(i), item,
&LRUSecondaryCacheTest::helper_, str.length()));
}
// Force all entries to be evicted to the secondary cache
cache->SetCapacity(0);
ASSERT_EQ(secondary_cache->num_inserts(), 32u);
cache->SetCapacity(32 * 1024);
secondary_cache->SetResultMap(
{{"k3", TestSecondaryCache::ResultType::DEFER},
{"k4", TestSecondaryCache::ResultType::DEFER_AND_FAIL},
{"k5", TestSecondaryCache::ResultType::FAIL}});
std::vector<Cache::Handle*> results;
for (int i = 0; i < 6; ++i) {
results.emplace_back(
cache->Lookup("k" + std::to_string(i), &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, false));
}
cache->WaitAll(results);
for (int i = 0; i < 6; ++i) {
if (i == 4) {
ASSERT_EQ(cache->Value(results[i]), nullptr);
} else if (i == 5) {
ASSERT_EQ(results[i], nullptr);
continue;
} else {
TestItem* item = static_cast<TestItem*>(cache->Value(results[i]));
ASSERT_EQ(item->ToString(), values[i]);
}
cache->Release(results[i]);
}
cache.reset();
secondary_cache.reset();
}
// In this test, we have one KV pair per data block. We indirectly determine
// the cache key associated with each data block (and thus each KV) by using
// a sync point callback in TestSecondaryCache::Lookup. We then control the
// lookup result by setting the ResultMap.
TEST_F(DBSecondaryCacheTest, TestSecondaryCacheMultiGet) {
LRUCacheOptions opts(1 << 20, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache(
new TestSecondaryCache(2048 * 1024));
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.paranoid_file_checks = true;
DestroyAndReopen(options);
Random rnd(301);
const int N = 8;
std::vector<std::string> keys;
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(4000);
keys.emplace_back(p_v);
ASSERT_OK(Put(Key(i), p_v));
}
ASSERT_OK(Flush());
// After Flush is successful, RocksDB does the paranoid check for the new
// SST file. This will try to lookup all data blocks in the secondary
// cache.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 8u);
cache->SetCapacity(0);
ASSERT_EQ(secondary_cache->num_inserts(), 8u);
cache->SetCapacity(1 << 20);
std::vector<std::string> cache_keys;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TestSecondaryCache::Lookup", [&cache_keys](void* key) -> void {
cache_keys.emplace_back(*(static_cast<std::string*>(key)));
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < N; ++i) {
std::string v = Get(Key(i));
ASSERT_EQ(4000, v.size());
ASSERT_EQ(v, keys[i]);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(secondary_cache->num_lookups(), 16u);
cache->SetCapacity(0);
cache->SetCapacity(1 << 20);
ASSERT_EQ(Get(Key(2)), keys[2]);
ASSERT_EQ(Get(Key(7)), keys[7]);
secondary_cache->SetResultMap(
{{cache_keys[3], TestSecondaryCache::ResultType::DEFER},
{cache_keys[4], TestSecondaryCache::ResultType::DEFER_AND_FAIL},
{cache_keys[5], TestSecondaryCache::ResultType::FAIL}});
std::vector<std::string> mget_keys(
{Key(0), Key(1), Key(2), Key(3), Key(4), Key(5), Key(6), Key(7)});
std::vector<PinnableSlice> values(mget_keys.size());
std::vector<Status> s(keys.size());
std::vector<Slice> key_slices;
for (const std::string& key : mget_keys) {
key_slices.emplace_back(key);
}
uint32_t num_lookups = secondary_cache->num_lookups();
dbfull()->MultiGet(ReadOptions(), dbfull()->DefaultColumnFamily(),
key_slices.size(), key_slices.data(), values.data(),
s.data(), false);
ASSERT_EQ(secondary_cache->num_lookups(), num_lookups + 5);
for (int i = 0; i < N; ++i) {
ASSERT_OK(s[i]);
ASSERT_EQ(values[i].ToString(), keys[i]);
values[i].Reset();
}
Destroy(options);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -115,9 +115,10 @@ class ShardedCache : public Cache {
int GetNumShardBits() const;
uint32_t GetNumShards() const;
private:
protected:
inline uint32_t Shard(uint32_t hash) { return hash & shard_mask_; }
private:
const uint32_t shard_mask_;
mutable port::Mutex capacity_mutex_;
size_t capacity_;

View File

@ -472,7 +472,9 @@ class Cache {
return Release(handle, force_erase);
}
// Determines if the handle returned by Lookup() has a valid value yet.
// Determines if the handle returned by Lookup() has a valid value yet. The
// call is not thread safe and should be called only by someone holding a
// reference to the handle.
virtual bool IsReady(Handle* /*handle*/) { return true; }
// If the handle returned by Lookup() is not ready yet, wait till it
@ -482,7 +484,9 @@ class Cache {
virtual void Wait(Handle* /*handle*/) {}
// Wait for a vector of handles to become ready. As with Wait(), the user
// should check the Value() of each handle for nullptr
// should check the Value() of each handle for nullptr. This call is not
// thread safe and should only be called by the caller holding a reference
// to each of the handles.
virtual void WaitAll(std::vector<Handle*>& /*handles*/) {}
private:

View File

@ -21,9 +21,9 @@ namespace ROCKSDB_NAMESPACE {
// ready, and call Wait() in order to block until it becomes ready.
// The caller must call value() after it becomes ready to determine if the
// handle successfullly read the item.
class SecondaryCacheHandle {
class SecondaryCacheResultHandle {
public:
virtual ~SecondaryCacheHandle() {}
virtual ~SecondaryCacheResultHandle() {}
// Returns whether the handle is ready or not
virtual bool IsReady() = 0;
@ -63,7 +63,7 @@ class SecondaryCache {
// will be used to create the object. The handle returned may not be
// ready yet, unless wait=true, in which case Lookup() will block until
// the handle is ready
virtual std::unique_ptr<SecondaryCacheHandle> Lookup(
virtual std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& key, const Cache::CreateCallback& create_cb, bool wait) = 0;
// At the discretion of the implementation, erase the data associated
@ -71,7 +71,7 @@ class SecondaryCache {
virtual void Erase(const Slice& key) = 0;
// Wait for a collection of handles to become ready
virtual void WaitAll(std::vector<SecondaryCacheHandle*> handles) = 0;
virtual void WaitAll(std::vector<SecondaryCacheResultHandle*> handles) = 0;
virtual std::string GetPrintableOptions() const = 0;
};

View File

@ -350,11 +350,11 @@ void BlockBasedTable::UpdateCacheInsertionMetrics(
}
Cache::Handle* BlockBasedTable::GetEntryFromCache(
Cache* block_cache, const Slice& key, BlockType block_type,
Cache* block_cache, const Slice& key, BlockType block_type, const bool wait,
GetContext* get_context, const Cache::CacheItemHelper* cache_helper,
const Cache::CreateCallback& create_cb, Cache::Priority priority) const {
auto cache_handle =
block_cache->Lookup(key, cache_helper, create_cb, priority, true,
block_cache->Lookup(key, cache_helper, create_cb, priority, wait,
rep_->ioptions.statistics.get());
if (cache_handle != nullptr) {
@ -1104,7 +1104,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, CachableEntry<TBlocklike>* block,
const UncompressionDict& uncompression_dict, BlockType block_type,
GetContext* get_context) const {
const bool wait, GetContext* get_context) const {
const size_t read_amp_bytes_per_bit =
block_type == BlockType::kData
? rep_->table_options.read_amp_bytes_per_bit
@ -1131,7 +1131,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Lookup uncompressed cache first
if (block_cache != nullptr) {
auto cache_handle = GetEntryFromCache(
block_cache, block_cache_key, block_type, get_context,
block_cache, block_cache_key, block_type, wait, get_context,
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), create_cb,
priority);
if (cache_handle != nullptr) {
@ -1399,9 +1399,9 @@ template <typename TBlocklike>
Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<TBlocklike>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents) const {
const bool wait, CachableEntry<TBlocklike>* block_entry,
BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context, BlockContents* contents) const {
assert(block_entry != nullptr);
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep_->table_options.block_cache.get();
@ -1433,8 +1433,10 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
if (!contents) {
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
ro, block_entry, uncompression_dict, block_type,
get_context);
if (block_entry->GetValue()) {
wait, get_context);
// Value could still be null at this point, so check the cache handle
// and update the read pattern for prefetching
if (block_entry->GetValue() || block_entry->GetCacheHandle()) {
// TODO(haoyu): Differentiate cache hit on uncompressed block cache and
// compressed block cache.
is_cache_hit = true;
@ -1450,7 +1452,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// Can't find the block from the cache. If I/O is allowed, read from the
// file.
if (block_entry->GetValue() == nullptr && !no_io && ro.fill_cache) {
if (block_entry->GetValue() == nullptr &&
block_entry->GetCacheHandle() == nullptr && !no_io && ro.fill_cache) {
Statistics* statistics = rep_->ioptions.stats;
const bool maybe_compressed =
block_type != BlockType::kFilter &&
@ -1613,7 +1616,8 @@ void BlockBasedTable::RetrieveMultipleBlocks(
RetrieveBlock(nullptr, options, handle, uncompression_dict,
&(*results)[idx_in_batch], BlockType::kData,
mget_iter->get_context, &lookup_data_block_context,
/* for_compaction */ false, /* use_cache */ true);
/* for_compaction */ false, /* use_cache */ true,
/* wait_for_cache */ true);
}
return;
}
@ -1810,8 +1814,8 @@ void BlockBasedTable::RetrieveMultipleBlocks(
// necessary. Since we're passing the raw block contents, it will
// avoid looking up the block cache
s = MaybeReadBlockAndLoadToCache(
nullptr, options, handle, uncompression_dict, block_entry,
BlockType::kData, mget_iter->get_context,
nullptr, options, handle, uncompression_dict, /*wait=*/true,
block_entry, BlockType::kData, mget_iter->get_context,
&lookup_data_block_context, &raw_block_contents);
// block_entry value could be null if no block cache is present, i.e
@ -1858,22 +1862,23 @@ Status BlockBasedTable::RetrieveBlock(
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<TBlocklike>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
bool for_compaction, bool use_cache) const {
bool for_compaction, bool use_cache, bool wait_for_cache) const {
assert(block_entry);
assert(block_entry->IsEmpty());
Status s;
if (use_cache) {
s = MaybeReadBlockAndLoadToCache(prefetch_buffer, ro, handle,
uncompression_dict, block_entry,
block_type, get_context, lookup_context,
s = MaybeReadBlockAndLoadToCache(
prefetch_buffer, ro, handle, uncompression_dict, wait_for_cache,
block_entry, block_type, get_context, lookup_context,
/*contents=*/nullptr);
if (!s.ok()) {
return s;
}
if (block_entry->GetValue() != nullptr) {
if (block_entry->GetValue() != nullptr ||
block_entry->GetCacheHandle() != nullptr) {
assert(s.ok());
return s;
}
@ -1941,28 +1946,28 @@ template Status BlockBasedTable::RetrieveBlock<BlockContents>(
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<BlockContents>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
bool for_compaction, bool use_cache) const;
bool for_compaction, bool use_cache, bool wait_for_cache) const;
template Status BlockBasedTable::RetrieveBlock<ParsedFullFilterBlock>(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<ParsedFullFilterBlock>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
bool for_compaction, bool use_cache) const;
bool for_compaction, bool use_cache, bool wait_for_cache) const;
template Status BlockBasedTable::RetrieveBlock<Block>(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
bool for_compaction, bool use_cache) const;
bool for_compaction, bool use_cache, bool wait_for_cache) const;
template Status BlockBasedTable::RetrieveBlock<UncompressionDict>(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<UncompressionDict>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
bool for_compaction, bool use_cache) const;
bool for_compaction, bool use_cache, bool wait_for_cache) const;
BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState(
const BlockBasedTable* table,
@ -2479,6 +2484,8 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
{
MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
sst_file_range.end());
std::vector<Cache::Handle*> cache_handles;
bool wait_for_cache_results = false;
CachableEntry<UncompressionDict> uncompression_dict;
Status uncompression_dict_status;
@ -2551,20 +2558,64 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
Status s = RetrieveBlock(
nullptr, ro, handle, dict, &(results.back()), BlockType::kData,
miter->get_context, &lookup_data_block_context,
/* for_compaction */ false, /* use_cache */ true);
/* for_compaction */ false, /* use_cache */ true,
/* wait_for_cache */ false);
if (s.IsIncomplete()) {
s = Status::OK();
}
if (s.ok() && !results.back().IsEmpty()) {
if (results.back().IsReady()) {
// Found it in the cache. Add NULL handle to indicate there is
// nothing to read from disk
// nothing to read from disk.
if (results.back().GetCacheHandle()) {
results.back().UpdateCachedValue();
// Its possible the cache lookup returned a non-null handle,
// but the lookup actually failed to produce a valid value
if (results.back().GetValue() == nullptr) {
block_handles.emplace_back(handle);
total_len += block_size(handle);
}
}
if (results.back().GetValue() != nullptr) {
block_handles.emplace_back(BlockHandle::NullBlockHandle());
}
} else {
// We have to wait for the asynchronous cache lookup to finish,
// and then we may have to read the block from disk anyway
wait_for_cache_results = true;
block_handles.emplace_back(handle);
cache_handles.emplace_back(results.back().GetCacheHandle());
}
} else {
block_handles.emplace_back(handle);
total_len += block_size(handle);
}
}
if (wait_for_cache_results) {
Cache* block_cache = rep_->table_options.block_cache.get();
block_cache->WaitAll(cache_handles);
for (size_t i = 0; i < block_handles.size(); ++i) {
// If this block was a success or failure or not needed because
// the corresponding key is in the same block as a prior key, skip
if (block_handles[i] == BlockHandle::NullBlockHandle() ||
results[i].IsEmpty()) {
continue;
}
results[i].UpdateCachedValue();
void* val = results[i].GetValue();
if (!val) {
// The async cache lookup failed - could be due to an error
// or a false positive. We need to read the data block from
// the SST file
results[i].Reset();
total_len += block_size(block_handles[i]);
} else {
block_handles[i] = BlockHandle::NullBlockHandle();
}
}
}
if (total_len) {
char* scratch = nullptr;
const UncompressionDict& dict = uncompression_dict.GetValue()

View File

@ -274,7 +274,7 @@ class BlockBasedTable : public TableReader {
GetContext* get_context) const;
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
BlockType block_type,
BlockType block_type, const bool wait,
GetContext* get_context,
const Cache::CacheItemHelper* cache_helper,
const Cache::CreateCallback& create_cb,
@ -300,9 +300,9 @@ class BlockBasedTable : public TableReader {
Status MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<TBlocklike>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents) const;
const bool wait, CachableEntry<TBlocklike>* block_entry,
BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context, BlockContents* contents) const;
// Similar to the above, with one crucial difference: it will retrieve the
// block from the file even if there are no caches configured (assuming the
@ -314,7 +314,8 @@ class BlockBasedTable : public TableReader {
CachableEntry<TBlocklike>* block_entry,
BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
bool for_compaction, bool use_cache) const;
bool for_compaction, bool use_cache,
bool wait_for_cache) const;
void RetrieveMultipleBlocks(
const ReadOptions& options, const MultiGetRange* batch,
@ -354,7 +355,7 @@ class BlockBasedTable : public TableReader {
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, CachableEntry<TBlocklike>* block,
const UncompressionDict& uncompression_dict, BlockType block_type,
GetContext* get_context) const;
const bool wait, GetContext* get_context) const;
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then

View File

@ -54,7 +54,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
CachableEntry<Block> block;
s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type,
get_context, lookup_context, for_compaction,
/* use_cache */ true);
/* use_cache */ true, /* wait_for_cache */ true);
if (!s.ok()) {
assert(block.IsEmpty());

View File

@ -162,7 +162,6 @@ public:
}
void SetCachedValue(T* value, Cache* cache, Cache::Handle* cache_handle) {
assert(value != nullptr);
assert(cache != nullptr);
assert(cache_handle != nullptr);
@ -179,6 +178,22 @@ public:
assert(!own_value_);
}
void UpdateCachedValue() {
assert(cache_ != nullptr);
assert(cache_handle_ != nullptr);
value_ = static_cast<T*>(cache_->Value(cache_handle_));
}
bool IsReady() {
if (!own_value_) {
assert(cache_ != nullptr);
assert(cache_handle_ != nullptr);
return cache_->IsReady(cache_handle_);
}
return true;
}
private:
void ReleaseResource() {
if (LIKELY(cache_handle_ != nullptr)) {

View File

@ -30,7 +30,8 @@ Status FilterBlockReaderCommon<TBlocklike>::ReadFilterBlock(
table->RetrieveBlock(prefetch_buffer, read_options, rep->filter_handle,
UncompressionDict::GetEmptyDict(), filter_block,
BlockType::kFilter, get_context, lookup_context,
/* for_compaction */ false, use_cache);
/* for_compaction */ false, use_cache,
/* wait_for_cache */ true);
return s;
}

View File

@ -26,7 +26,8 @@ Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock(
const Status s = table->RetrieveBlock(
prefetch_buffer, read_options, rep->footer.index_handle(),
UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex,
get_context, lookup_context, /* for_compaction */ false, use_cache);
get_context, lookup_context, /* for_compaction */ false, use_cache,
/* wait_for_cache */ true);
return s;
}

View File

@ -296,7 +296,8 @@ Status PartitionedFilterBlockReader::GetFilterPartitionBlock(
table()->RetrieveBlock(prefetch_buffer, read_options, fltr_blk_handle,
UncompressionDict::GetEmptyDict(), filter_block,
BlockType::kFilter, get_context, lookup_context,
/* for_compaction */ false, /* use_cache */ true);
/* for_compaction */ false, /* use_cache */ true,
/* wait_for_cache */ true);
return s;
}
@ -490,8 +491,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
// filter blocks
s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
&block, BlockType::kFilter, nullptr /* get_context */, &lookup_context,
nullptr /* contents */);
/* wait */ true, &block, BlockType::kFilter, nullptr /* get_context */,
&lookup_context, nullptr /* contents */);
if (!s.ok()) {
return s;
}

View File

@ -167,8 +167,8 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
// filter blocks
s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
&block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context,
/*contents=*/nullptr);
/*wait=*/true, &block, BlockType::kIndex, /*get_context=*/nullptr,
&lookup_context, /*contents=*/nullptr);
if (!s.ok()) {
return s;

View File

@ -60,7 +60,7 @@ Status UncompressionDictReader::ReadUncompressionDictionary(
prefetch_buffer, read_options, rep->compression_dict_handle,
UncompressionDict::GetEmptyDict(), uncompression_dict,
BlockType::kCompressionDictionary, get_context, lookup_context,
/* for_compaction */ false, use_cache);
/* for_compaction */ false, use_cache, /* wait_for_cache */ true);
if (!s.ok()) {
ROCKS_LOG_WARN(