Fix the memory leak with pinned partitioned filters
Summary: The existing unit test did not set the level so the check for pinned partitioned filter/index being properly released from the block cache was not properly exercised as they only take effect in level 0. As a result a memory leak in pinned partitioned filters was hidden. The patch fix the test as well as the bug. Closes https://github.com/facebook/rocksdb/pull/3692 Differential Revision: D7559763 Pulled By: maysamyabandeh fbshipit-source-id: 55eff274945838af983c764a7d71e8daff092e4a
This commit is contained in:
parent
65fe8d6cd6
commit
d2bcd7611f
@ -10,6 +10,7 @@
|
|||||||
* Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob.
|
* Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob.
|
||||||
* Fix WAL corruption caused by race condition between user write thread and FlushWAL when two_write_queue is not set.
|
* Fix WAL corruption caused by race condition between user write thread and FlushWAL when two_write_queue is not set.
|
||||||
* Fix `BackupableDBOptions::max_valid_backups_to_open` to not delete backup files when refcount cannot be accurately determined.
|
* Fix `BackupableDBOptions::max_valid_backups_to_open` to not delete backup files when refcount cannot be accurately determined.
|
||||||
|
* Fix memory leak when pin_l0_filter_and_index_blocks_in_cache is used with partitioned filters
|
||||||
|
|
||||||
### Java API Changes
|
### Java API Changes
|
||||||
* Add `BlockBasedTableConfig.setBlockCache` to allow sharing a block cache across DB instances.
|
* Add `BlockBasedTableConfig.setBlockCache` to allow sharing a block cache across DB instances.
|
||||||
|
@ -246,6 +246,13 @@ size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const {
|
|||||||
return idx_on_fltr_blk_->size();
|
return idx_on_fltr_blk_->size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Release the cached entry and decrement its ref count.
|
||||||
|
void ReleaseFilterCachedEntry(void* arg, void* h) {
|
||||||
|
Cache* cache = reinterpret_cast<Cache*>(arg);
|
||||||
|
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
|
||||||
|
cache->Release(handle);
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(myabandeh): merge this with the same function in IndexReader
|
// TODO(myabandeh): merge this with the same function in IndexReader
|
||||||
void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
|
void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
|
||||||
// Before read partitions, prefetch them to avoid lots of IOs
|
// Before read partitions, prefetch them to avoid lots of IOs
|
||||||
@ -304,6 +311,8 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
|
|||||||
if (LIKELY(filter.IsSet())) {
|
if (LIKELY(filter.IsSet())) {
|
||||||
if (pin) {
|
if (pin) {
|
||||||
filter_map_[handle.offset()] = std::move(filter);
|
filter_map_[handle.offset()] = std::move(filter);
|
||||||
|
RegisterCleanup(&ReleaseFilterCachedEntry, block_cache,
|
||||||
|
filter.cache_handle);
|
||||||
} else {
|
} else {
|
||||||
block_cache->Release(filter.cache_handle);
|
block_cache->Release(filter.cache_handle);
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,8 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder {
|
|||||||
size_t num_added_;
|
size_t num_added_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class PartitionedFilterBlockReader : public FilterBlockReader {
|
class PartitionedFilterBlockReader : public FilterBlockReader,
|
||||||
|
public Cleanable {
|
||||||
public:
|
public:
|
||||||
explicit PartitionedFilterBlockReader(const SliceTransform* prefix_extractor,
|
explicit PartitionedFilterBlockReader(const SliceTransform* prefix_extractor,
|
||||||
bool whole_key_filtering,
|
bool whole_key_filtering,
|
||||||
|
@ -302,9 +302,11 @@ class KeyConvertingIterator : public InternalIterator {
|
|||||||
class TableConstructor: public Constructor {
|
class TableConstructor: public Constructor {
|
||||||
public:
|
public:
|
||||||
explicit TableConstructor(const Comparator* cmp,
|
explicit TableConstructor(const Comparator* cmp,
|
||||||
bool convert_to_internal_key = false)
|
bool convert_to_internal_key = false,
|
||||||
|
int level = -1)
|
||||||
: Constructor(cmp),
|
: Constructor(cmp),
|
||||||
convert_to_internal_key_(convert_to_internal_key) {}
|
convert_to_internal_key_(convert_to_internal_key),
|
||||||
|
level_(level) {}
|
||||||
~TableConstructor() { Reset(); }
|
~TableConstructor() { Reset(); }
|
||||||
|
|
||||||
virtual Status FinishImpl(const Options& options,
|
virtual Status FinishImpl(const Options& options,
|
||||||
@ -319,14 +321,12 @@ class TableConstructor: public Constructor {
|
|||||||
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
|
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
|
||||||
int_tbl_prop_collector_factories;
|
int_tbl_prop_collector_factories;
|
||||||
std::string column_family_name;
|
std::string column_family_name;
|
||||||
int unknown_level = -1;
|
|
||||||
builder.reset(ioptions.table_factory->NewTableBuilder(
|
builder.reset(ioptions.table_factory->NewTableBuilder(
|
||||||
TableBuilderOptions(ioptions, internal_comparator,
|
TableBuilderOptions(
|
||||||
&int_tbl_prop_collector_factories,
|
ioptions, internal_comparator, &int_tbl_prop_collector_factories,
|
||||||
options.compression, CompressionOptions(),
|
options.compression, CompressionOptions(),
|
||||||
nullptr /* compression_dict */,
|
nullptr /* compression_dict */, false /* skip_filters */,
|
||||||
false /* skip_filters */, column_family_name,
|
column_family_name, level_),
|
||||||
unknown_level),
|
|
||||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
|
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
|
||||||
file_writer_.get()));
|
file_writer_.get()));
|
||||||
|
|
||||||
@ -351,8 +351,10 @@ class TableConstructor: public Constructor {
|
|||||||
uniq_id_ = cur_uniq_id_++;
|
uniq_id_ = cur_uniq_id_++;
|
||||||
file_reader_.reset(test::GetRandomAccessFileReader(new test::StringSource(
|
file_reader_.reset(test::GetRandomAccessFileReader(new test::StringSource(
|
||||||
GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads)));
|
GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads)));
|
||||||
|
const bool skip_filters = false;
|
||||||
return ioptions.table_factory->NewTableReader(
|
return ioptions.table_factory->NewTableReader(
|
||||||
TableReaderOptions(ioptions, soptions, internal_comparator),
|
TableReaderOptions(ioptions, soptions, internal_comparator,
|
||||||
|
skip_filters, level_),
|
||||||
std::move(file_reader_), GetSink()->contents().size(), &table_reader_);
|
std::move(file_reader_), GetSink()->contents().size(), &table_reader_);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -412,6 +414,7 @@ class TableConstructor: public Constructor {
|
|||||||
unique_ptr<RandomAccessFileReader> file_reader_;
|
unique_ptr<RandomAccessFileReader> file_reader_;
|
||||||
unique_ptr<TableReader> table_reader_;
|
unique_ptr<TableReader> table_reader_;
|
||||||
bool convert_to_internal_key_;
|
bool convert_to_internal_key_;
|
||||||
|
int level_;
|
||||||
|
|
||||||
TableConstructor();
|
TableConstructor();
|
||||||
|
|
||||||
@ -2249,6 +2252,7 @@ std::map<std::string, size_t> MockCache::marked_data_in_cache_;
|
|||||||
// table is closed. This test makes sure that the only items remains in the
|
// table is closed. This test makes sure that the only items remains in the
|
||||||
// cache after the table is closed are raw data blocks.
|
// cache after the table is closed are raw data blocks.
|
||||||
TEST_F(BlockBasedTableTest, NoObjectInCacheAfterTableClose) {
|
TEST_F(BlockBasedTableTest, NoObjectInCacheAfterTableClose) {
|
||||||
|
for (int level: {-1, 0, 1, 10}) {
|
||||||
for (auto index_type :
|
for (auto index_type :
|
||||||
{BlockBasedTableOptions::IndexType::kBinarySearch,
|
{BlockBasedTableOptions::IndexType::kBinarySearch,
|
||||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch}) {
|
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch}) {
|
||||||
@ -2285,7 +2289,9 @@ TEST_F(BlockBasedTableTest, NoObjectInCacheAfterTableClose) {
|
|||||||
rocksdb::NewBloomFilterPolicy(10, block_based_filter));
|
rocksdb::NewBloomFilterPolicy(10, block_based_filter));
|
||||||
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||||
|
|
||||||
TableConstructor c(BytewiseComparator());
|
bool convert_to_internal_key = false;
|
||||||
|
TableConstructor c(BytewiseComparator(), convert_to_internal_key,
|
||||||
|
level);
|
||||||
std::string user_key = "k01";
|
std::string user_key = "k01";
|
||||||
std::string key =
|
std::string key =
|
||||||
InternalKey(user_key, 0, kTypeValue).Encode().ToString();
|
InternalKey(user_key, 0, kTypeValue).Encode().ToString();
|
||||||
@ -2326,6 +2332,7 @@ TEST_F(BlockBasedTableTest, NoObjectInCacheAfterTableClose) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} // level
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(BlockBasedTableTest, BlockCacheLeak) {
|
TEST_F(BlockBasedTableTest, BlockCacheLeak) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user