Evict the uncompression dictionary from the block cache upon table close (#5150)
Summary: The uncompression dictionary object has a Statistics pointer that might dangle if the database closed. This patch evicts the dictionary from the block cache when a table is closed, similarly to how index and filter readers are handled. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5150 Differential Revision: D14782422 Pulled By: ltamasi fbshipit-source-id: 0cec9336c742c479aa92206e04521767f1aa9622
This commit is contained in:
parent
306b9adfd8
commit
59ef2ba559
@ -3275,21 +3275,36 @@ void BlockBasedTable::Close() {
|
||||
if (rep_->closed) {
|
||||
return;
|
||||
}
|
||||
rep_->filter_entry.Release(rep_->table_options.block_cache.get());
|
||||
rep_->index_entry.Release(rep_->table_options.block_cache.get());
|
||||
// cleanup index and filter blocks to avoid accessing dangling pointer
|
||||
|
||||
Cache* const cache = rep_->table_options.block_cache.get();
|
||||
|
||||
rep_->filter_entry.Release(cache);
|
||||
rep_->index_entry.Release(cache);
|
||||
|
||||
// cleanup index, filter, and compression dictionary blocks
|
||||
// to avoid accessing dangling pointers
|
||||
if (!rep_->table_options.no_block_cache) {
|
||||
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
|
||||
|
||||
// Get the filter block key
|
||||
auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
|
||||
rep_->filter_handle, cache_key);
|
||||
rep_->table_options.block_cache.get()->Erase(key);
|
||||
cache->Erase(key);
|
||||
|
||||
// Get the index block key
|
||||
key = GetCacheKeyFromOffset(rep_->cache_key_prefix,
|
||||
rep_->cache_key_prefix_size,
|
||||
rep_->dummy_index_reader_offset, cache_key);
|
||||
rep_->table_options.block_cache.get()->Erase(key);
|
||||
cache->Erase(key);
|
||||
|
||||
if (!rep_->compression_dict_handle.IsNull()) {
|
||||
// Get the compression dictionary block key
|
||||
key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
|
||||
rep_->compression_dict_handle, cache_key);
|
||||
cache->Erase(key);
|
||||
}
|
||||
}
|
||||
|
||||
rep_->closed = true;
|
||||
}
|
||||
|
||||
|
@ -329,7 +329,7 @@ class TableConstructor: public Constructor {
|
||||
TableBuilderOptions(ioptions, moptions, internal_comparator,
|
||||
&int_tbl_prop_collector_factories,
|
||||
options.compression, options.sample_for_compression,
|
||||
CompressionOptions(), false /* skip_filters */,
|
||||
options.compression_opts, false /* skip_filters */,
|
||||
column_family_name, level_),
|
||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
|
||||
file_writer_.get()));
|
||||
@ -2317,93 +2317,122 @@ std::map<std::string, size_t> MockCache::marked_data_in_cache_;
|
||||
// table is closed. This test makes sure that the only items remains in the
|
||||
// cache after the table is closed are raw data blocks.
|
||||
TEST_P(BlockBasedTableTest, NoObjectInCacheAfterTableClose) {
|
||||
std::vector<CompressionType> compression_types{kNoCompression};
|
||||
|
||||
// The following are the compression library versions supporting compression
|
||||
// dictionaries. See the test case CacheCompressionDict in the
|
||||
// DBBlockCacheTest suite.
|
||||
#ifdef ZLIB
|
||||
compression_types.push_back(kZlibCompression);
|
||||
#endif // ZLIB
|
||||
#if LZ4_VERSION_NUMBER >= 10400
|
||||
compression_types.push_back(kLZ4Compression);
|
||||
compression_types.push_back(kLZ4HCCompression);
|
||||
#endif // LZ4_VERSION_NUMBER >= 10400
|
||||
#if ZSTD_VERSION_NUMBER >= 500
|
||||
compression_types.push_back(kZSTD);
|
||||
#endif // ZSTD_VERSION_NUMBER >= 500
|
||||
|
||||
for (int level: {-1, 0, 1, 10}) {
|
||||
for (auto index_type :
|
||||
{BlockBasedTableOptions::IndexType::kBinarySearch,
|
||||
for (auto index_type :
|
||||
{BlockBasedTableOptions::IndexType::kBinarySearch,
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch}) {
|
||||
for (bool block_based_filter : {true, false}) {
|
||||
for (bool partition_filter : {true, false}) {
|
||||
if (partition_filter &&
|
||||
(block_based_filter ||
|
||||
index_type !=
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch)) {
|
||||
continue;
|
||||
}
|
||||
for (bool index_and_filter_in_cache : {true, false}) {
|
||||
for (bool pin_l0 : {true, false}) {
|
||||
for (bool pin_top_level : {true, false}) {
|
||||
if (pin_l0 && !index_and_filter_in_cache) {
|
||||
continue;
|
||||
}
|
||||
// Create a table
|
||||
Options opt;
|
||||
std::unique_ptr<InternalKeyComparator> ikc;
|
||||
ikc.reset(new test::PlainInternalKeyComparator(opt.comparator));
|
||||
opt.compression = kNoCompression;
|
||||
BlockBasedTableOptions table_options =
|
||||
GetBlockBasedTableOptions();
|
||||
table_options.block_size = 1024;
|
||||
table_options.index_type =
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
||||
table_options.pin_l0_filter_and_index_blocks_in_cache = pin_l0;
|
||||
table_options.pin_top_level_index_and_filter = pin_top_level;
|
||||
table_options.partition_filters = partition_filter;
|
||||
table_options.cache_index_and_filter_blocks =
|
||||
index_and_filter_in_cache;
|
||||
// big enough so we don't ever lose cached values.
|
||||
table_options.block_cache = std::shared_ptr<rocksdb::Cache>(
|
||||
new MockCache(16 * 1024 * 1024, 4, false, 0.0));
|
||||
table_options.filter_policy.reset(
|
||||
rocksdb::NewBloomFilterPolicy(10, block_based_filter));
|
||||
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
|
||||
bool convert_to_internal_key = false;
|
||||
TableConstructor c(BytewiseComparator(), convert_to_internal_key,
|
||||
level);
|
||||
std::string user_key = "k01";
|
||||
std::string key =
|
||||
InternalKey(user_key, 0, kTypeValue).Encode().ToString();
|
||||
c.Add(key, "hello");
|
||||
std::vector<std::string> keys;
|
||||
stl_wrappers::KVMap kvmap;
|
||||
const ImmutableCFOptions ioptions(opt);
|
||||
const MutableCFOptions moptions(opt);
|
||||
c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys,
|
||||
&kvmap);
|
||||
|
||||
// Doing a read to make index/filter loaded into the cache
|
||||
auto table_reader =
|
||||
dynamic_cast<BlockBasedTable*>(c.GetTableReader());
|
||||
PinnableSlice value;
|
||||
GetContext get_context(opt.comparator, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, user_key, &value,
|
||||
nullptr, nullptr, nullptr, nullptr);
|
||||
InternalKey ikey(user_key, 0, kTypeValue);
|
||||
auto s = table_reader->Get(ReadOptions(), key, &get_context,
|
||||
moptions.prefix_extractor.get());
|
||||
ASSERT_EQ(get_context.State(), GetContext::kFound);
|
||||
ASSERT_STREQ(value.data(), "hello");
|
||||
|
||||
// Close the table
|
||||
c.ResetTableReader();
|
||||
|
||||
auto usage = table_options.block_cache->GetUsage();
|
||||
auto pinned_usage = table_options.block_cache->GetPinnedUsage();
|
||||
// The only usage must be for marked data blocks
|
||||
ASSERT_EQ(usage, MockCache::marked_size_);
|
||||
// There must be some pinned data since PinnableSlice has not
|
||||
// released them yet
|
||||
ASSERT_GT(pinned_usage, 0);
|
||||
// Release pinnable slice reousrces
|
||||
value.Reset();
|
||||
pinned_usage = table_options.block_cache->GetPinnedUsage();
|
||||
ASSERT_EQ(pinned_usage, 0);
|
||||
for (bool block_based_filter : {true, false}) {
|
||||
for (bool partition_filter : {true, false}) {
|
||||
if (partition_filter &&
|
||||
(block_based_filter ||
|
||||
index_type !=
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch)) {
|
||||
continue;
|
||||
}
|
||||
for (bool index_and_filter_in_cache : {true, false}) {
|
||||
for (bool pin_l0 : {true, false}) {
|
||||
for (bool pin_top_level : {true, false}) {
|
||||
if (pin_l0 && !index_and_filter_in_cache) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto compression_type : compression_types) {
|
||||
for (uint32_t max_dict_bytes : {0, 1 << 14}) {
|
||||
if (compression_type == kNoCompression && max_dict_bytes)
|
||||
continue;
|
||||
|
||||
// Create a table
|
||||
Options opt;
|
||||
std::unique_ptr<InternalKeyComparator> ikc;
|
||||
ikc.reset(new test::PlainInternalKeyComparator(
|
||||
opt.comparator));
|
||||
opt.compression = compression_type;
|
||||
opt.compression_opts.max_dict_bytes = max_dict_bytes;
|
||||
BlockBasedTableOptions table_options =
|
||||
GetBlockBasedTableOptions();
|
||||
table_options.block_size = 1024;
|
||||
table_options.index_type = index_type;
|
||||
table_options.pin_l0_filter_and_index_blocks_in_cache =
|
||||
pin_l0;
|
||||
table_options.pin_top_level_index_and_filter =
|
||||
pin_top_level;
|
||||
table_options.partition_filters = partition_filter;
|
||||
table_options.cache_index_and_filter_blocks =
|
||||
index_and_filter_in_cache;
|
||||
// big enough so we don't ever lose cached values.
|
||||
table_options.block_cache = std::make_shared<MockCache>(
|
||||
16 * 1024 * 1024, 4, false, 0.0);
|
||||
table_options.filter_policy.reset(
|
||||
rocksdb::NewBloomFilterPolicy(10, block_based_filter));
|
||||
opt.table_factory.reset(NewBlockBasedTableFactory(
|
||||
table_options));
|
||||
|
||||
bool convert_to_internal_key = false;
|
||||
TableConstructor c(BytewiseComparator(),
|
||||
convert_to_internal_key, level);
|
||||
std::string user_key = "k01";
|
||||
std::string key =
|
||||
InternalKey(user_key, 0, kTypeValue).Encode().ToString();
|
||||
c.Add(key, "hello");
|
||||
std::vector<std::string> keys;
|
||||
stl_wrappers::KVMap kvmap;
|
||||
const ImmutableCFOptions ioptions(opt);
|
||||
const MutableCFOptions moptions(opt);
|
||||
c.Finish(opt, ioptions, moptions, table_options, *ikc,
|
||||
&keys, &kvmap);
|
||||
|
||||
// Doing a read to make index/filter loaded into the cache
|
||||
auto table_reader =
|
||||
dynamic_cast<BlockBasedTable*>(c.GetTableReader());
|
||||
PinnableSlice value;
|
||||
GetContext get_context(opt.comparator, nullptr, nullptr,
|
||||
nullptr, GetContext::kNotFound, user_key, &value,
|
||||
nullptr, nullptr, nullptr, nullptr);
|
||||
InternalKey ikey(user_key, 0, kTypeValue);
|
||||
auto s = table_reader->Get(ReadOptions(), key, &get_context,
|
||||
moptions.prefix_extractor.get());
|
||||
ASSERT_EQ(get_context.State(), GetContext::kFound);
|
||||
ASSERT_STREQ(value.data(), "hello");
|
||||
|
||||
// Close the table
|
||||
c.ResetTableReader();
|
||||
|
||||
auto usage = table_options.block_cache->GetUsage();
|
||||
auto pinned_usage =
|
||||
table_options.block_cache->GetPinnedUsage();
|
||||
// The only usage must be for marked data blocks
|
||||
ASSERT_EQ(usage, MockCache::marked_size_);
|
||||
// There must be some pinned data since PinnableSlice has
|
||||
// not released them yet
|
||||
ASSERT_GT(pinned_usage, 0);
|
||||
// Release pinnable slice reousrces
|
||||
value.Reset();
|
||||
pinned_usage = table_options.block_cache->GetPinnedUsage();
|
||||
ASSERT_EQ(pinned_usage, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} // level
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user