fix DeleteRange memory leak for mmap and block cache (#4810)

Summary:
Previously we were cleaning up range tombstone meta-block by calling `ReleaseCachedEntry`, which wouldn't work if `value != nullptr && cache_handle == nullptr`. This happened at least in the case with mmap reads and block cache both enabled. I noticed `NewDataBlockIterator` intends to handle all these cases, so migrated to that instead of `NewUnfragmentedRangeTombstoneIterator`.

Also changed the table-opening logic to fail on `ReadRangeDelBlock` failure, since that can cause data corruption. Added a test case to verify this behavior. Note the test case does not fail on `TryReopen` because failure to preload table handlers is not considered critical. However, it does fail on any read involving that file since it cannot return correct data.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4810

Differential Revision: D13534296

Pulled By: ajkr

fbshipit-source-id: 55dde1111717cea6ec4bf38418daab81ccef3599
This commit is contained in:
Andrew Kryczka 2018-12-20 21:57:18 -08:00 committed by Facebook Github Bot
parent da1c64b6e7
commit e0be1bc4f1
4 changed files with 60 additions and 56 deletions

View File

@ -9,6 +9,8 @@
### Bug Fixes ### Bug Fixes
* Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls. * Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls.
* Fix a memory leak when files with range tombstones are read in mmap mode and block cache is enabled
* Fix handling of corrupt range tombstone blocks such that corruptions cannot cause deleted keys to reappear
## 5.18.0 (11/30/2018) ## 5.18.0 (11/30/2018)
### New Features ### New Features

View File

@ -24,6 +24,8 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "table/block_based_table_builder.h"
#include "table/meta_blocks.h"
#include "util/filename.h" #include "util/filename.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/testharness.h" #include "util/testharness.h"
@ -467,6 +469,39 @@ TEST_F(CorruptionTest, UnrelatedKeys) {
ASSERT_EQ(Value(1000, &tmp2).ToString(), v); ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
} }
TEST_F(CorruptionTest, RangeDeletionCorrupted) {
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "b"));
ASSERT_OK(db_->Flush(FlushOptions()));
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(static_cast<size_t>(1), metadata.size());
std::string filename = dbname_ + metadata[0].name;
std::unique_ptr<RandomAccessFile> file;
ASSERT_OK(options_.env->NewRandomAccessFile(filename, &file, EnvOptions()));
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(file), filename));
uint64_t file_size;
ASSERT_OK(options_.env->GetFileSize(filename, &file_size));
BlockHandle range_del_handle;
ASSERT_OK(FindMetaBlock(
file_reader.get(), file_size, kBlockBasedTableMagicNumber,
ImmutableCFOptions(options_), kRangeDelBlock, &range_del_handle));
ASSERT_OK(TryReopen());
CorruptFile(filename, static_cast<int>(range_del_handle.offset()), 1);
// The test case does not fail on TryReopen because failure to preload table
// handlers is not considered critical.
ASSERT_OK(TryReopen());
std::string val;
// However, it does fail on any read involving that file since that file
// cannot be opened with a corrupt range deletion meta-block.
ASSERT_TRUE(db_->Get(ReadOptions(), "a", &val).IsCorruption());
}
TEST_F(CorruptionTest, FileSystemStateCorrupted) { TEST_F(CorruptionTest, FileSystemStateCorrupted) {
for (int iter = 0; iter < 2; ++iter) { for (int iter = 0; iter < 2; ++iter) {
Options options; Options options;

View File

@ -848,9 +848,12 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
// Disregard return status of ReadRangeDelBlock and ReadCompressionDictBlock.
s = ReadRangeDelBlock(rep, prefetch_buffer.get(), meta_iter.get(), s = ReadRangeDelBlock(rep, prefetch_buffer.get(), meta_iter.get(),
new_table.get(), internal_comparator); internal_comparator);
if (!s.ok()) {
return s;
}
// Disregard return status of ReadCompressionDictBlock.
s = ReadCompressionDictBlock(rep, prefetch_buffer.get(), meta_iter.get()); s = ReadCompressionDictBlock(rep, prefetch_buffer.get(), meta_iter.get());
s = PrefetchIndexAndFilterBlocks(rep, prefetch_buffer.get(), meta_iter.get(), s = PrefetchIndexAndFilterBlocks(rep, prefetch_buffer.get(), meta_iter.get(),
@ -980,33 +983,35 @@ Status BlockBasedTable::ReadPropertiesBlock(
Status BlockBasedTable::ReadRangeDelBlock( Status BlockBasedTable::ReadRangeDelBlock(
Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter,
BlockBasedTable* new_table,
const InternalKeyComparator& internal_comparator) { const InternalKeyComparator& internal_comparator) {
Status s; Status s;
bool found_range_del_block; bool found_range_del_block;
s = SeekToRangeDelBlock(meta_iter, &found_range_del_block, BlockHandle range_del_handle;
&rep->range_del_handle); s = SeekToRangeDelBlock(meta_iter, &found_range_del_block, &range_del_handle);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
rep->ioptions.info_log, rep->ioptions.info_log,
"Error when seeking to range delete tombstones block from file: %s", "Error when seeking to range delete tombstones block from file: %s",
s.ToString().c_str()); s.ToString().c_str());
} else if (found_range_del_block && !rep->range_del_handle.IsNull()) { } else if (found_range_del_block && !range_del_handle.IsNull()) {
ReadOptions read_options; ReadOptions read_options;
s = MaybeReadBlockAndLoadToCache( std::unique_ptr<InternalIterator> iter(NewDataBlockIterator<DataBlockIter>(
prefetch_buffer, rep, read_options, rep->range_del_handle, rep, read_options, range_del_handle, nullptr /* input_iter */,
Slice() /* compression_dict */, &rep->range_del_entry, false /* is_index */, true /* key_includes_seq */,
false /* is_index */, nullptr /* get_context */); true /* index_key_is_full */, nullptr /* get_context */, Status(),
prefetch_buffer));
assert(iter != nullptr);
s = iter->status();
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
rep->ioptions.info_log, rep->ioptions.info_log,
"Encountered error while reading data from range del block %s", "Encountered error while reading data from range del block %s",
s.ToString().c_str()); s.ToString().c_str());
} else {
rep->fragmented_range_dels =
std::make_shared<FragmentedRangeTombstoneList>(std::move(iter),
internal_comparator);
} }
auto iter = std::unique_ptr<InternalIterator>(
new_table->NewUnfragmentedRangeTombstoneIterator(read_options));
rep->fragmented_range_dels = std::make_shared<FragmentedRangeTombstoneList>(
std::move(iter), internal_comparator);
} }
return s; return s;
} }
@ -1837,11 +1842,11 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
assert(block.value != nullptr); assert(block.value != nullptr);
const bool kTotalOrderSeek = true; const bool kTotalOrderSeek = true;
// Block contents are pinned and it is still pinned after the iterator // Block contents are pinned and it is still pinned after the iterator
// is destoryed as long as cleanup functions are moved to another object, // is destroyed as long as cleanup functions are moved to another object,
// when: // when:
// 1. block cache handle is set to be released in cleanup function, or // 1. block cache handle is set to be released in cleanup function, or
// 2. it's pointing to immortable source. If own_bytes is true then we are // 2. it's pointing to immortal source. If own_bytes is true then we are
// not reading data from the original source, weather immortal or not. // not reading data from the original source, whether immortal or not.
// Otherwise, the block is pinned iff the source is immortal. // Otherwise, the block is pinned iff the source is immortal.
bool block_contents_pinned = bool block_contents_pinned =
(block.cache_handle != nullptr || (block.cache_handle != nullptr ||
@ -2415,35 +2420,6 @@ FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator(
rep_->fragmented_range_dels, rep_->internal_comparator, snapshot); rep_->fragmented_range_dels, rep_->internal_comparator, snapshot);
} }
InternalIterator* BlockBasedTable::NewUnfragmentedRangeTombstoneIterator(
const ReadOptions& read_options) {
if (rep_->range_del_handle.IsNull()) {
// The block didn't exist, nullptr indicates no range tombstones.
return nullptr;
}
if (rep_->range_del_entry.cache_handle != nullptr) {
// We have a handle to an uncompressed block cache entry that's held for
// this table's lifetime. Increment its refcount before returning an
// iterator based on it since the returned iterator may outlive this table
// reader.
assert(rep_->range_del_entry.value != nullptr);
Cache* block_cache = rep_->table_options.block_cache.get();
assert(block_cache != nullptr);
if (block_cache->Ref(rep_->range_del_entry.cache_handle)) {
auto iter = rep_->range_del_entry.value->NewIterator<DataBlockIter>(
&rep_->internal_comparator,
rep_->internal_comparator.user_comparator());
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
rep_->range_del_entry.cache_handle);
return iter;
}
}
// The meta-block exists but isn't in uncompressed block cache (maybe
// because it is disabled), so go through the full lookup process.
return NewDataBlockIterator<DataBlockIter>(rep_, read_options,
rep_->range_del_handle);
}
bool BlockBasedTable::FullFilterKeyMayMatch( bool BlockBasedTable::FullFilterKeyMayMatch(
const ReadOptions& read_options, FilterBlockReader* filter, const ReadOptions& read_options, FilterBlockReader* filter,
const Slice& internal_key, const bool no_io, const Slice& internal_key, const bool no_io,
@ -3102,7 +3078,6 @@ void BlockBasedTable::Close() {
} }
rep_->filter_entry.Release(rep_->table_options.block_cache.get()); rep_->filter_entry.Release(rep_->table_options.block_cache.get());
rep_->index_entry.Release(rep_->table_options.block_cache.get()); rep_->index_entry.Release(rep_->table_options.block_cache.get());
rep_->range_del_entry.Release(rep_->table_options.block_cache.get());
// cleanup index and filter blocks to avoid accessing dangling pointer // cleanup index and filter blocks to avoid accessing dangling pointer
if (!rep_->table_options.no_block_cache) { if (!rep_->table_options.no_block_cache) {
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];

View File

@ -365,7 +365,7 @@ class BlockBasedTable : public TableReader {
const SequenceNumber largest_seqno); const SequenceNumber largest_seqno);
static Status ReadRangeDelBlock( static Status ReadRangeDelBlock(
Rep* rep, FilePrefetchBuffer* prefetch_buffer, Rep* rep, FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter, BlockBasedTable* new_table, InternalIterator* meta_iter,
const InternalKeyComparator& internal_comparator); const InternalKeyComparator& internal_comparator);
static Status ReadCompressionDictBlock(Rep* rep, static Status ReadCompressionDictBlock(Rep* rep,
FilePrefetchBuffer* prefetch_buffer, FilePrefetchBuffer* prefetch_buffer,
@ -406,9 +406,6 @@ class BlockBasedTable : public TableReader {
friend class PartitionedFilterBlockReader; friend class PartitionedFilterBlockReader;
friend class PartitionedFilterBlockTest; friend class PartitionedFilterBlockTest;
InternalIterator* NewUnfragmentedRangeTombstoneIterator(
const ReadOptions& read_options);
}; };
// Maitaning state of a two-level iteration on a partitioned index structure // Maitaning state of a two-level iteration on a partitioned index structure
@ -468,7 +465,6 @@ struct BlockBasedTable::Rep {
hash_index_allow_collision(false), hash_index_allow_collision(false),
whole_key_filtering(_table_opt.whole_key_filtering), whole_key_filtering(_table_opt.whole_key_filtering),
prefix_filtering(true), prefix_filtering(true),
range_del_handle(BlockHandle::NullBlockHandle()),
global_seqno(kDisableGlobalSequenceNumber), global_seqno(kDisableGlobalSequenceNumber),
level(_level), level(_level),
immortal_table(_immortal_table) {} immortal_table(_immortal_table) {}
@ -532,10 +528,6 @@ struct BlockBasedTable::Rep {
// push flush them out, hence they're pinned // push flush them out, hence they're pinned
CachableEntry<FilterBlockReader> filter_entry; CachableEntry<FilterBlockReader> filter_entry;
CachableEntry<IndexReader> index_entry; CachableEntry<IndexReader> index_entry;
// range deletion meta-block is pinned through reader's lifetime when LRU
// cache is enabled.
CachableEntry<Block> range_del_entry;
BlockHandle range_del_handle;
std::shared_ptr<const FragmentedRangeTombstoneList> fragmented_range_dels; std::shared_ptr<const FragmentedRangeTombstoneList> fragmented_range_dels;
// If global_seqno is used, all Keys in this file will have the same // If global_seqno is used, all Keys in this file will have the same