Object lifetime in cache
Summary: Any non-raw-data dependent object must be destructed before the table closes. There was a bug of not doing that for filter object. This patch fixes the bug and adds a unit test to prevent such bugs in future. Closes https://github.com/facebook/rocksdb/pull/2246 Differential Revision: D5001318 Pulled By: maysamyabandeh fbshipit-source-id: 6d8772e58765485868094b92964da82ef9730b6d
This commit is contained in:
parent
fdaefa0309
commit
40af2381ec
@ -187,6 +187,10 @@ class Cache {
|
||||
|
||||
virtual std::string GetPrintableOptions() const { return ""; }
|
||||
|
||||
// Mark the last inserted object as being a raw data block. This will be used
|
||||
// in tests. The default implementation does nothing.
|
||||
virtual void TEST_mark_as_data_block(const Slice& key, size_t charge) {}
|
||||
|
||||
private:
|
||||
// No copying allowed
|
||||
Cache(const Cache&);
|
||||
|
@ -913,6 +913,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
|
||||
s = block_cache->Insert(
|
||||
block_cache_key, block->value, block->value->usable_size(),
|
||||
&DeleteCachedEntry<Block>, &(block->cache_handle));
|
||||
block_cache->TEST_mark_as_data_block(block_cache_key,
|
||||
block->value->usable_size());
|
||||
if (s.ok()) {
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
if (is_index) {
|
||||
@ -994,6 +996,8 @@ Status BlockBasedTable::PutDataBlockToCache(
|
||||
s = block_cache->Insert(
|
||||
block_cache_key, block->value, block->value->usable_size(),
|
||||
&DeleteCachedEntry<Block>, &(block->cache_handle), priority);
|
||||
block_cache->TEST_mark_as_data_block(block_cache_key,
|
||||
block->value->usable_size());
|
||||
if (s.ok()) {
|
||||
assert(block->cache_handle != nullptr);
|
||||
RecordTick(statistics, BLOCK_CACHE_ADD);
|
||||
@ -2090,7 +2094,7 @@ void BlockBasedTable::Close() {
|
||||
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
|
||||
// Get the filter block key
|
||||
auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
|
||||
rep_->footer.metaindex_handle(), cache_key);
|
||||
rep_->filter_handle, cache_key);
|
||||
rep_->table_options.block_cache.get()->Erase(key);
|
||||
// Get the index block key
|
||||
key = GetCacheKeyFromOffset(rep_->cache_key_prefix,
|
||||
|
@ -376,9 +376,9 @@ struct BlockBasedTable::CachableEntry {
|
||||
CachableEntry(TValue* _value, Cache::Handle* _cache_handle)
|
||||
: value(_value), cache_handle(_cache_handle) {}
|
||||
CachableEntry() : CachableEntry(nullptr, nullptr) {}
|
||||
void Release(Cache* cache) {
|
||||
void Release(Cache* cache, bool force_erase = false) {
|
||||
if (cache_handle) {
|
||||
cache->Release(cache_handle);
|
||||
cache->Release(cache_handle, force_erase);
|
||||
value = nullptr;
|
||||
cache_handle = nullptr;
|
||||
}
|
||||
|
@ -89,9 +89,19 @@ PartitionedFilterBlockReader::PartitionedFilterBlockReader(
|
||||
}
|
||||
|
||||
PartitionedFilterBlockReader::~PartitionedFilterBlockReader() {
|
||||
ReadLock rl(&mu_);
|
||||
for (auto it = handle_list_.begin(); it != handle_list_.end(); ++it) {
|
||||
table_->rep_->table_options.block_cache.get()->Release(*it);
|
||||
{
|
||||
ReadLock rl(&mu_);
|
||||
for (auto it = handle_list_.begin(); it != handle_list_.end(); ++it) {
|
||||
table_->rep_->table_options.block_cache.get()->Release(*it);
|
||||
}
|
||||
}
|
||||
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
|
||||
for (auto it = filter_block_set_.begin(); it != filter_block_set_.end();
|
||||
++it) {
|
||||
auto key = BlockBasedTable::GetCacheKey(table_->rep_->cache_key_prefix,
|
||||
table_->rep_->cache_key_prefix_size,
|
||||
*it, cache_key);
|
||||
table_->rep_->table_options.block_cache.get()->Erase(key);
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,8 +116,6 @@ bool PartitionedFilterBlockReader::KeyMayMatch(
|
||||
if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) {
|
||||
return true;
|
||||
}
|
||||
// This is the user key vs. the full key in the partition index. We assume
|
||||
// that user key <= full key
|
||||
auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr);
|
||||
if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range
|
||||
return false;
|
||||
@ -198,15 +206,18 @@ PartitionedFilterBlockReader::GetFilterPartition(Slice* handle_value,
|
||||
}
|
||||
auto filter =
|
||||
table_->GetFilter(fltr_blk_handle, is_a_filter_partition, no_io);
|
||||
if (pin_cached_filters && filter.IsSet()) {
|
||||
WriteLock wl(&mu_);
|
||||
std::pair<uint64_t, FilterBlockReader*> pair(fltr_blk_handle.offset(),
|
||||
filter.value);
|
||||
auto succ = filter_cache_.insert(pair).second;
|
||||
if (succ) {
|
||||
handle_list_.push_back(filter.cache_handle);
|
||||
} // Otherwise it is already inserted by a concurrent thread
|
||||
*cached = true;
|
||||
if (filter.IsSet()) {
|
||||
filter_block_set_.insert(fltr_blk_handle);
|
||||
if (pin_cached_filters) {
|
||||
WriteLock wl(&mu_);
|
||||
std::pair<uint64_t, FilterBlockReader*> pair(fltr_blk_handle.offset(),
|
||||
filter.value);
|
||||
auto succ = filter_cache_.insert(pair).second;
|
||||
if (succ) {
|
||||
handle_list_.push_back(filter.cache_handle);
|
||||
} // Otherwise it is already inserted by a concurrent thread
|
||||
*cached = true;
|
||||
}
|
||||
}
|
||||
return filter;
|
||||
} else {
|
||||
|
@ -87,6 +87,12 @@ class PartitionedFilterBlockReader : public FilterBlockReader {
|
||||
const BlockBasedTable* table_;
|
||||
std::unordered_map<uint64_t, FilterBlockReader*> filter_cache_;
|
||||
autovector<Cache::Handle*> handle_list_;
|
||||
struct BlockHandleCmp {
|
||||
bool operator()(const BlockHandle& lhs, const BlockHandle& rhs) const {
|
||||
return lhs.offset() < rhs.offset();
|
||||
}
|
||||
};
|
||||
std::set<BlockHandle, BlockHandleCmp> filter_block_set_;
|
||||
port::RWMutex mu_;
|
||||
};
|
||||
|
||||
|
@ -9,7 +9,6 @@
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <algorithm>
|
||||
@ -19,6 +18,7 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "cache/lru_cache.h"
|
||||
#include "db/dbformat.h"
|
||||
#include "db/memtable.h"
|
||||
#include "db/write_batch_internal.h"
|
||||
@ -2127,6 +2127,137 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) {
|
||||
}
|
||||
}
|
||||
|
||||
// A wrapper around LRICache that also keeps track of data blocks (in contrast
|
||||
// with the objects) in the cache. The class is very simple and can be used only
|
||||
// for trivial tests.
|
||||
class MockCache : public LRUCache {
|
||||
public:
|
||||
MockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
|
||||
double high_pri_pool_ratio)
|
||||
: LRUCache(capacity, num_shard_bits, strict_capacity_limit,
|
||||
high_pri_pool_ratio) {}
|
||||
virtual Status Insert(const Slice& key, void* value, size_t charge,
|
||||
void (*deleter)(const Slice& key, void* value),
|
||||
Handle** handle = nullptr,
|
||||
Priority priority = Priority::LOW) override {
|
||||
// Replace the deleter with our own so that we keep track of data blocks
|
||||
// erased from the cache
|
||||
deleters_[key.ToString()] = deleter;
|
||||
return ShardedCache::Insert(key, value, charge, &MockDeleter, handle,
|
||||
priority);
|
||||
}
|
||||
// This is called by the application right after inserting a data block
|
||||
virtual void TEST_mark_as_data_block(const Slice& key,
|
||||
size_t charge) override {
|
||||
marked_data_in_cache_[key.ToString()] = charge;
|
||||
marked_size_ += charge;
|
||||
}
|
||||
using DeleterFunc = void (*)(const Slice& key, void* value);
|
||||
static std::map<std::string, DeleterFunc> deleters_;
|
||||
static std::map<std::string, size_t> marked_data_in_cache_;
|
||||
static size_t marked_size_;
|
||||
static void MockDeleter(const Slice& key, void* value) {
|
||||
// If the item was marked for being data block, decrease its usage from the
|
||||
// total data block usage of the cache
|
||||
if (marked_data_in_cache_.find(key.ToString()) !=
|
||||
marked_data_in_cache_.end()) {
|
||||
marked_size_ -= marked_data_in_cache_[key.ToString()];
|
||||
}
|
||||
// Then call the origianl deleter
|
||||
assert(deleters_.find(key.ToString()) != deleters_.end());
|
||||
auto deleter = deleters_[key.ToString()];
|
||||
deleter(key, value);
|
||||
}
|
||||
};
|
||||
|
||||
size_t MockCache::marked_size_ = 0;
|
||||
std::map<std::string, MockCache::DeleterFunc> MockCache::deleters_;
|
||||
std::map<std::string, size_t> MockCache::marked_data_in_cache_;
|
||||
|
||||
// Block cache can contain raw data blocks as well as general objects. If an
|
||||
// object depends on the table to be live, it then must be destructed before the
|
||||
// table is closed. This test makese sure that the only items remains in the
|
||||
// cache after the table is closed are raw data blocks.
|
||||
TEST_F(BlockBasedTableTest, NoObjectInCacheAfterTableClose) {
|
||||
for (auto index_type :
|
||||
{BlockBasedTableOptions::IndexType::kBinarySearch,
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch}) {
|
||||
for (bool block_based_filter : {true, false}) {
|
||||
for (bool partition_filter : {true, false}) {
|
||||
if (partition_filter &&
|
||||
(block_based_filter ||
|
||||
index_type !=
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch)) {
|
||||
continue;
|
||||
}
|
||||
for (bool index_and_filter_in_cache : {true, false}) {
|
||||
for (bool pin_l0 : {true, false}) {
|
||||
if (pin_l0 && !index_and_filter_in_cache) {
|
||||
continue;
|
||||
}
|
||||
// Create a table
|
||||
Options opt;
|
||||
unique_ptr<InternalKeyComparator> ikc;
|
||||
ikc.reset(new test::PlainInternalKeyComparator(opt.comparator));
|
||||
opt.compression = kNoCompression;
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.block_size = 1024;
|
||||
table_options.index_type =
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
||||
table_options.pin_l0_filter_and_index_blocks_in_cache = pin_l0;
|
||||
table_options.partition_filters = partition_filter;
|
||||
table_options.cache_index_and_filter_blocks =
|
||||
index_and_filter_in_cache;
|
||||
// big enough so we don't ever lose cached values.
|
||||
table_options.block_cache = std::shared_ptr<rocksdb::Cache>(
|
||||
new MockCache(16 * 1024 * 1024, 4, false, 0.0));
|
||||
table_options.filter_policy.reset(
|
||||
rocksdb::NewBloomFilterPolicy(10, block_based_filter));
|
||||
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
|
||||
TableConstructor c(BytewiseComparator());
|
||||
std::string user_key = "k01";
|
||||
std::string key =
|
||||
InternalKey(user_key, 0, kTypeValue).Encode().ToString();
|
||||
c.Add(key, "hello");
|
||||
std::vector<std::string> keys;
|
||||
stl_wrappers::KVMap kvmap;
|
||||
const ImmutableCFOptions ioptions(opt);
|
||||
c.Finish(opt, ioptions, table_options, *ikc, &keys, &kvmap);
|
||||
|
||||
// Doing a read to make index/filter loaded into the cache
|
||||
auto table_reader =
|
||||
dynamic_cast<BlockBasedTable*>(c.GetTableReader());
|
||||
PinnableSlice value;
|
||||
GetContext get_context(opt.comparator, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, user_key, &value,
|
||||
nullptr, nullptr, nullptr, nullptr);
|
||||
InternalKey ikey(user_key, 0, kTypeValue);
|
||||
auto s = table_reader->Get(ReadOptions(), key, &get_context);
|
||||
ASSERT_EQ(get_context.State(), GetContext::kFound);
|
||||
ASSERT_STREQ(value.data(), "hello");
|
||||
|
||||
// Close the table
|
||||
c.ResetTableReader();
|
||||
|
||||
auto usage = table_options.block_cache->GetUsage();
|
||||
auto pinned_usage = table_options.block_cache->GetPinnedUsage();
|
||||
// The only usage must be for marked data blocks
|
||||
ASSERT_EQ(usage, MockCache::marked_size_);
|
||||
// There must be some pinned data since PinnableSlice has not
|
||||
// released them yet
|
||||
ASSERT_GT(pinned_usage, 0);
|
||||
// Release pinnable slice reousrces
|
||||
value.Reset();
|
||||
pinned_usage = table_options.block_cache->GetPinnedUsage();
|
||||
ASSERT_EQ(pinned_usage, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(BlockBasedTableTest, BlockCacheLeak) {
|
||||
// Check that when we reopen a table we don't lose access to blocks already
|
||||
// in the cache. This test checks whether the Table actually makes use of the
|
||||
|
Loading…
x
Reference in New Issue
Block a user