enable PinnableSlice for RowCache

Summary:
This patch enables using PinnableSlice for RowCache, changes include
not releasing the cache handle immediately after lookup in TableCache::Get, instead pass a Cleanble function which does Cache::RleaseHandle.
Closes https://github.com/facebook/rocksdb/pull/2492

Differential Revision: D5316216

Pulled By: maysamyabandeh

fbshipit-source-id: d2a684bd7e4ba73772f762e58a82b5f4fbd5d362
This commit is contained in:
Sushma Devendrappa 2017-07-17 14:53:15 -07:00 committed by Facebook Github Bot
parent 00464a3140
commit 0655b58582
9 changed files with 94 additions and 9 deletions

24
cache/lru_cache.cc vendored
View File

@ -157,6 +157,16 @@ void LRUCacheShard::TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri) {
*lru_low_pri = lru_low_pri_;
}
size_t LRUCacheShard::TEST_GetLRUSize() {
LRUHandle* lru_handle = lru_.next;
size_t lru_size = 0;
while (lru_handle != &lru_) {
lru_size++;
lru_handle = lru_handle->next;
}
return lru_size;
}
void LRUCacheShard::LRU_Remove(LRUHandle* e) {
assert(e->next != nullptr);
assert(e->prev != nullptr);
@ -438,11 +448,11 @@ std::string LRUCacheShard::GetPrintableOptions() const {
LRUCache::LRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit, double high_pri_pool_ratio)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
int num_shards = 1 << num_shard_bits;
shards_ = new LRUCacheShard[num_shards];
num_shards_ = 1 << num_shard_bits;
shards_ = new LRUCacheShard[num_shards_];
SetCapacity(capacity);
SetStrictCapacityLimit(strict_capacity_limit);
for (int i = 0; i < num_shards; i++) {
for (int i = 0; i < num_shards_; i++) {
shards_[i].SetHighPriorityPoolRatio(high_pri_pool_ratio);
}
}
@ -471,6 +481,14 @@ uint32_t LRUCache::GetHash(Handle* handle) const {
void LRUCache::DisownData() { shards_ = nullptr; }
size_t LRUCache::TEST_GetLRUSize() {
size_t lru_size_of_all_shards = 0;
for (int i = 0; i < num_shards_; i++) {
lru_size_of_all_shards += shards_[i].TEST_GetLRUSize();
}
return lru_size_of_all_shards;
}
std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit,
double high_pri_pool_ratio) {

8
cache/lru_cache.h vendored
View File

@ -198,6 +198,10 @@ class LRUCacheShard : public CacheShard {
void TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri);
// Retrieves number of elements in LRU, for unit test purpose only
// not threadsafe
size_t TEST_GetLRUSize();
private:
void LRU_Remove(LRUHandle* e);
void LRU_Insert(LRUHandle* e);
@ -267,8 +271,12 @@ class LRUCache : public ShardedCache {
virtual uint32_t GetHash(Handle* handle) const override;
virtual void DisownData() override;
// Retrieves number of elements in LRU, for unit test purpose only
size_t TEST_GetLRUSize();
private:
LRUCacheShard* shards_;
int num_shards_ = 0;
};
} // namespace rocksdb

View File

@ -23,6 +23,7 @@
#include <alloca.h>
#endif
#include "cache/lru_cache.h"
#include "db/db_impl.h"
#include "db/db_test_util.h"
#include "db/dbformat.h"
@ -5320,6 +5321,36 @@ TEST_F(DBTest, RowCache) {
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
}
TEST_F(DBTest, PinnableSliceAndRowCache) {
Options options = CurrentOptions();
options.statistics = rocksdb::CreateDBStatistics();
options.row_cache = NewLRUCache(8192);
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ASSERT_EQ(Get("foo"), "bar");
ASSERT_EQ(
reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
1);
{
PinnableSlice pin_slice;
ASSERT_EQ(Get("foo", &pin_slice), Status::OK());
ASSERT_EQ(pin_slice.ToString(), "bar");
// Entry is already in cache, lookup will remove the element from lru
ASSERT_EQ(
reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
0);
}
// After PinnableSlice destruction element is added back in LRU
ASSERT_EQ(
reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
1);
}
#endif // ROCKSDB_LITE
TEST_F(DBTest, DeletingOldWalAfterDrop) {

View File

@ -686,6 +686,13 @@ std::string DBTestBase::Get(int cf, const std::string& k,
return result;
}
Status DBTestBase::Get(const std::string& k, PinnableSlice* v) {
ReadOptions options;
options.verify_checksums = true;
Status s = dbfull()->Get(options, dbfull()->DefaultColumnFamily(), k, v);
return s;
}
uint64_t DBTestBase::GetNumSnapshots() {
uint64_t int_num;
EXPECT_TRUE(dbfull()->GetIntProperty("rocksdb.num-snapshots", &int_num));

View File

@ -803,6 +803,8 @@ class DBTestBase : public testing::Test {
std::string Get(int cf, const std::string& k,
const Snapshot* snapshot = nullptr);
Status Get(const std::string& k, PinnableSlice* v);
uint64_t GetNumSnapshots();
uint64_t GetTimeOldestSnapshots();

View File

@ -311,6 +311,7 @@ Status TableCache::Get(const ReadOptions& options,
#ifndef ROCKSDB_LITE
IterKey row_cache_key;
std::string row_cache_entry_buffer;
// Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
@ -334,10 +335,26 @@ Status TableCache::Get(const ReadOptions& options,
if (auto row_handle =
ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
// Cleanable routine to release the cache entry
Cleanable value_pinner;
auto release_cache_entry_func = [](void* cache_to_clean,
void* cache_handle) {
((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
};
auto found_row_cache_entry = static_cast<const std::string*>(
ioptions_.row_cache->Value(row_handle));
replayGetContextLog(*found_row_cache_entry, user_key, get_context);
ioptions_.row_cache->Release(row_handle);
// If it comes here value is located on the cache.
// found_row_cache_entry points to the value on cache,
// and value_pinner has cleanup procedure for the cached entry.
// After replayGetContextLog() returns, get_context.pinnable_slice_
// will point to cache entry buffer (or a copy based on that) and
// cleanup routine under value_pinner will be delegated to
// get_context.pinnable_slice_. Cache entry is released when
// get_context.pinnable_slice_ is reset.
value_pinner.RegisterCleanup(release_cache_entry_func,
ioptions_.row_cache.get(), row_handle);
replayGetContextLog(*found_row_cache_entry, user_key, get_context,
&value_pinner);
RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
done = true;
} else {

View File

@ -33,7 +33,7 @@ class Cleanable {
typedef void (*CleanupFunction)(void* arg1, void* arg2);
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
void DelegateCleanupsTo(Cleanable* other);
// DoCkeanup and also resets the pointers for reuse
// DoCleanup and also resets the pointers for reuse
inline void Reset() {
DoCleanup();
cleanup_.function = nullptr;

View File

@ -180,7 +180,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
}
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context) {
GetContext* get_context, Cleanable* value_pinner) {
#ifndef ROCKSDB_LITE
Slice s = replay_log;
while (s.size()) {
@ -194,7 +194,8 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
// Since SequenceNumber is not stored and unknown, we will use
// kMaxSequenceNumber.
get_context->SaveValue(
ParsedInternalKey(user_key, kMaxSequenceNumber, type), value, nullptr);
ParsedInternalKey(user_key, kMaxSequenceNumber, type), value,
value_pinner);
}
#else // ROCKSDB_LITE
assert(false);

View File

@ -86,6 +86,7 @@ class GetContext {
};
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context);
GetContext* get_context,
Cleanable* value_pinner = nullptr);
} // namespace rocksdb