From 512a5e3ef822b4908e003aad12ef2ca600744d3d Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Wed, 28 Nov 2018 17:58:08 -0800 Subject: [PATCH] Fix BlockBasedTable not always using memory allocator if available (#4678) Summary: Fix block based table reader not using memory_allocator when allocating index blocks and compression dictionary blocks. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4678 Differential Revision: D13054594 Pulled By: yiwu-arbug fbshipit-source-id: 379f25bcc665395662511c4f873f4b7b55104ce2 --- HISTORY.md | 4 +- table/block_based_table_reader.cc | 141 ++++++++++++++++++------------ table/block_based_table_reader.h | 6 +- table/block_fetcher.cc | 26 ++++-- table/block_fetcher.h | 18 ++-- table/meta_blocks.cc | 44 ++++++---- table/meta_blocks.h | 19 ++-- table/table_test.cc | 11 +-- 8 files changed, 167 insertions(+), 102 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index fe12cb6fb..1781d64a7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,8 +1,8 @@ # Rocksdb Change Log ## Unreleased ### New Features -* Introduced `Memoryllocator`, which lets the user specify custom allocator for memory in block cache. * Improved `DeleteRange` to prevent read performance degradation. The feature is no longer marked as experimental. + ### Public API Change * `NO_ITERATORS` is divided into two counters `NO_ITERATOR_CREATED` and `NO_ITERATOR_DELETE`. Both of them are only increasing now, just as other counters. ### Bug Fixes @@ -11,6 +11,8 @@ ## 5.18.0 (11/12/2018) ### New Features +* Introduced `Memoryllocator` interface, which lets the user specify custom allocator for memory in block cache. +* Introduced `JemallocNodumpAllocator` memory allocator. When being use, block cache will be excluded from core dump. * Introduced `PerfContextByLevel` as part of `PerfContext` which allows storing perf context at each level. Also replaced `__thread` with `thread_local` keyword for perf_context. Added per-level perf context for bloom filter and `Get` query. * With level_compaction_dynamic_level_bytes = true, level multiplier may be adjusted automatically when Level 0 to 1 compaction is lagged behind. * Introduced DB option `atomic_flush`. If true, RocksDB supports flushing multiple column families and atomically committing the result to MANIFEST. Useful when WAL is disabled. diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index a3192b01b..fbc9af4ba 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -78,13 +78,14 @@ Status ReadBlockFromFile( RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const ReadOptions& options, const BlockHandle& handle, std::unique_ptr* result, const ImmutableCFOptions& ioptions, - bool do_uncompress, const Slice& compression_dict, + bool do_uncompress, bool maybe_compressed, const Slice& compression_dict, const PersistentCacheOptions& cache_options, SequenceNumber global_seqno, - size_t read_amp_bytes_per_bit, MemoryAllocator* allocator = nullptr) { + size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator) { BlockContents contents; BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle, &contents, ioptions, do_uncompress, - compression_dict, cache_options, allocator); + maybe_compressed, compression_dict, cache_options, + memory_allocator); Status s = block_fetcher.ReadBlockContents(); if (s.ok()) { result->reset(new Block(std::move(contents), global_seqno, @@ -101,6 +102,13 @@ inline MemoryAllocator* GetMemoryAllocator( : nullptr; } +inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock( + const BlockBasedTableOptions& table_options) { + return table_options.block_cache_compressed.get() + ? table_options.block_cache_compressed->memory_allocator() + : nullptr; +} + // Delete the resource that is held by the iterator. template void DeleteHeldResource(void* arg, void* /*ignored*/) { @@ -222,13 +230,15 @@ class PartitionIndexReader : public IndexReader, public Cleanable { IndexReader** index_reader, const PersistentCacheOptions& cache_options, const int level, const bool index_key_includes_seq, - const bool index_value_is_full) { + const bool index_value_is_full, + MemoryAllocator* memory_allocator) { std::unique_ptr index_block; auto s = ReadBlockFromFile( file, prefetch_buffer, footer, ReadOptions(), index_handle, &index_block, ioptions, true /* decompress */, - Slice() /*compression dict*/, cache_options, - kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); + true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options, + kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, + memory_allocator); if (s.ok()) { *index_reader = new PartitionIndexReader( @@ -401,13 +411,15 @@ class BinarySearchIndexReader : public IndexReader { IndexReader** index_reader, const PersistentCacheOptions& cache_options, const bool index_key_includes_seq, - const bool index_value_is_full) { + const bool index_value_is_full, + MemoryAllocator* memory_allocator) { std::unique_ptr index_block; auto s = ReadBlockFromFile( file, prefetch_buffer, footer, ReadOptions(), index_handle, &index_block, ioptions, true /* decompress */, - Slice() /*compression dict*/, cache_options, - kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); + true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options, + kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, + memory_allocator); if (s.ok()) { *index_reader = new BinarySearchIndexReader( @@ -473,13 +485,15 @@ class HashIndexReader : public IndexReader { InternalIterator* meta_index_iter, IndexReader** index_reader, bool /*hash_index_allow_collision*/, const PersistentCacheOptions& cache_options, - const bool index_key_includes_seq, const bool index_value_is_full) { + const bool index_key_includes_seq, const bool index_value_is_full, + MemoryAllocator* memory_allocator) { std::unique_ptr index_block; auto s = ReadBlockFromFile( file, prefetch_buffer, footer, ReadOptions(), index_handle, &index_block, ioptions, true /* decompress */, - Slice() /*compression dict*/, cache_options, - kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); + true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options, + kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, + memory_allocator); if (!s.ok()) { return s; @@ -517,8 +531,9 @@ class HashIndexReader : public IndexReader { BlockContents prefixes_contents; BlockFetcher prefixes_block_fetcher( file, prefetch_buffer, footer, ReadOptions(), prefixes_handle, - &prefixes_contents, ioptions, true /* decompress */, - dummy_comp_dict /*compression dict*/, cache_options); + &prefixes_contents, ioptions, true /*decompress*/, + true /*maybe_compressed*/, dummy_comp_dict /*compression dict*/, + cache_options, memory_allocator); s = prefixes_block_fetcher.ReadBlockContents(); if (!s.ok()) { return s; @@ -526,8 +541,9 @@ class HashIndexReader : public IndexReader { BlockContents prefixes_meta_contents; BlockFetcher prefixes_meta_block_fetcher( file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle, - &prefixes_meta_contents, ioptions, true /* decompress */, - dummy_comp_dict /*compression dict*/, cache_options); + &prefixes_meta_contents, ioptions, true /*decompress*/, + true /*maybe_compressed*/, dummy_comp_dict /*compression dict*/, + cache_options, memory_allocator); s = prefixes_meta_block_fetcher.ReadBlockContents(); if (!s.ok()) { // TODO: log error @@ -894,7 +910,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, if (s.ok()) { s = ReadProperties(meta_iter->value(), rep->file.get(), prefetch_buffer.get(), rep->footer, rep->ioptions, - &table_properties, false /* compression_type_missing */); + &table_properties, + false /* compression_type_missing */, + nullptr /* memory_allocator */); } if (!s.ok()) { @@ -937,9 +955,10 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, ReadOptions read_options; read_options.verify_checksums = false; BlockFetcher compression_block_fetcher( - rep->file.get(), prefetch_buffer.get(), rep->footer, read_options, - compression_dict_handle, compression_dict_cont.get(), rep->ioptions, false /* decompress */, - Slice() /*compression dict*/, cache_options); + rep->file.get(), prefetch_buffer.get(), rep->footer, read_options, + compression_dict_handle, compression_dict_cont.get(), rep->ioptions, + false /* decompress */, false /*maybe_compressed*/, + Slice() /*compression dict*/, cache_options); s = compression_block_fetcher.ReadBlockContents(); if (!s.ok()) { @@ -1166,9 +1185,10 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, Status s = ReadBlockFromFile( rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(), rep->footer.metaindex_handle(), &meta, rep->ioptions, - true /* decompress */, Slice() /*compression dict*/, - rep->persistent_cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */, GetMemoryAllocator(rep->table_options)); + true /* decompress */, true /*maybe_compressed*/, + Slice() /*compression dict*/, rep->persistent_cache_options, + kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, + GetMemoryAllocator(rep->table_options)); if (!s.ok()) { ROCKS_LOG_ERROR(rep->ioptions.info_log, @@ -1312,8 +1332,8 @@ Status BlockBasedTable::PutDataBlockToCache( CachableEntry* cached_block, BlockContents* raw_block_contents, CompressionType raw_block_comp_type, uint32_t format_version, const Slice& compression_dict, SequenceNumber seq_no, - size_t read_amp_bytes_per_bit, bool is_index, Cache::Priority priority, - GetContext* get_context, MemoryAllocator* allocator) { + size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator, + bool is_index, Cache::Priority priority, GetContext* get_context) { assert(raw_block_comp_type == kNoCompression || block_cache_compressed != nullptr); @@ -1327,7 +1347,7 @@ Status BlockBasedTable::PutDataBlockToCache( s = UncompressBlockContents( uncompression_ctx, raw_block_contents->data.data(), raw_block_contents->data.size(), &uncompressed_block_contents, - format_version, ioptions, allocator); + format_version, ioptions, memory_allocator); } if (!s.ok()) { return s; @@ -1431,11 +1451,11 @@ FilterBlockReader* BlockBasedTable::ReadFilter( Slice dummy_comp_dict; - BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer, - ReadOptions(), filter_handle, &block, - rep->ioptions, false /* decompress */, - dummy_comp_dict, rep->persistent_cache_options, - GetMemoryAllocator(rep->table_options)); + BlockFetcher block_fetcher( + rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(), + filter_handle, &block, rep->ioptions, false /* decompress */, + false /*maybe_compressed*/, dummy_comp_dict, + rep->persistent_cache_options, GetMemoryAllocator(rep->table_options)); Status s = block_fetcher.ReadBlockContents(); if (!s.ok()) { @@ -1736,8 +1756,10 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( READ_BLOCK_GET_MICROS); s = ReadBlockFromFile( rep->file.get(), prefetch_buffer, rep->footer, ro, handle, - &block_value, rep->ioptions, rep->blocks_maybe_compressed, - compression_dict, rep->persistent_cache_options, + &block_value, rep->ioptions, + rep->blocks_maybe_compressed /*do_decompress*/, + rep->blocks_maybe_compressed, compression_dict, + rep->persistent_cache_options, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, rep->table_options.read_amp_bytes_per_bit, GetMemoryAllocator(rep->table_options)); @@ -1857,13 +1879,13 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( BlockContents raw_block_contents; { StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS); - BlockFetcher block_fetcher( rep->file.get(), prefetch_buffer, rep->footer, ro, handle, &raw_block_contents, rep->ioptions, - do_decompress /* do uncompress */, compression_dict, - rep->persistent_cache_options, - GetMemoryAllocator(rep->table_options)); + do_decompress /* do uncompress */, rep->blocks_maybe_compressed, + compression_dict, rep->persistent_cache_options, + GetMemoryAllocator(rep->table_options), + GetMemoryAllocatorForCompressedBlock(rep->table_options)); s = block_fetcher.ReadBlockContents(); raw_block_comp_type = block_fetcher.get_compression_type(); } @@ -1876,12 +1898,13 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions, block_entry, &raw_block_contents, raw_block_comp_type, rep->table_options.format_version, compression_dict, seq_no, - rep->table_options.read_amp_bytes_per_bit, is_index, + rep->table_options.read_amp_bytes_per_bit, + GetMemoryAllocator(rep->table_options), is_index, is_index && rep->table_options .cache_index_and_filter_blocks_with_high_priority ? Cache::Priority::HIGH : Cache::Priority::LOW, - get_context, GetMemoryAllocator(rep->table_options)); + get_context); } } } @@ -2608,12 +2631,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks( BlockHandle handle = index_iter->value(); BlockContents contents; Slice dummy_comp_dict; - BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */, - rep_->footer, ReadOptions(), handle, &contents, - rep_->ioptions, false /* decompress */, - dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options, - GetMemoryAllocator(rep_->table_options)); + BlockFetcher block_fetcher( + rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, + ReadOptions(), handle, &contents, rep_->ioptions, + false /* decompress */, false /*maybe_compressed*/, + dummy_comp_dict /*compression dict*/, rep_->persistent_cache_options); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { break; @@ -2635,12 +2657,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks( s = handle.DecodeFrom(&input); BlockContents contents; Slice dummy_comp_dict; - BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */, - rep_->footer, ReadOptions(), handle, &contents, - rep_->ioptions, false /* decompress */, - dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options, - GetMemoryAllocator(rep_->table_options)); + BlockFetcher block_fetcher( + rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, + ReadOptions(), handle, &contents, rep_->ioptions, + false /* decompress */, false /*maybe_compressed*/, + dummy_comp_dict /*compression dict*/, rep_->persistent_cache_options); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { break; @@ -2729,7 +2750,8 @@ Status BlockBasedTable::CreateIndexReader( rep_->table_properties == nullptr || rep_->table_properties->index_key_is_user_key == 0, rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0); + rep_->table_properties->index_value_is_delta_encoded == 0, + GetMemoryAllocator(rep_->table_options)); } case BlockBasedTableOptions::kBinarySearch: { return BinarySearchIndexReader::Create( @@ -2738,7 +2760,8 @@ Status BlockBasedTable::CreateIndexReader( rep_->table_properties == nullptr || rep_->table_properties->index_key_is_user_key == 0, rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0); + rep_->table_properties->index_value_is_delta_encoded == 0, + GetMemoryAllocator(rep_->table_options)); } case BlockBasedTableOptions::kHashSearch: { std::unique_ptr meta_guard; @@ -2760,7 +2783,8 @@ Status BlockBasedTable::CreateIndexReader( rep_->table_properties == nullptr || rep_->table_properties->index_key_is_user_key == 0, rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0); + rep_->table_properties->index_value_is_delta_encoded == 0, + GetMemoryAllocator(rep_->table_options)); } meta_index_iter = meta_iter_guard.get(); } @@ -2773,7 +2797,8 @@ Status BlockBasedTable::CreateIndexReader( rep_->table_properties == nullptr || rep_->table_properties->index_key_is_user_key == 0, rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0); + rep_->table_properties->index_value_is_delta_encoded == 0, + GetMemoryAllocator(rep_->table_options)); } default: { std::string error_message = @@ -2942,9 +2967,9 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file, BlockFetcher block_fetcher( rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer, ReadOptions(), handle, &block, rep_->ioptions, - false /*decompress*/, dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options, - GetMemoryAllocator(rep_->table_options)); + false /*decompress*/, false /*maybe_compressed*/, + dummy_comp_dict /*compression dict*/, + rep_->persistent_cache_options); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { rep_->filter.reset(new BlockBasedFilterBlockReader( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index bc420b37c..cb6a86566 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -322,9 +322,9 @@ class BlockBasedTable : public TableReader { CachableEntry* block, BlockContents* raw_block_contents, CompressionType raw_block_comp_type, uint32_t format_version, const Slice& compression_dict, SequenceNumber seq_no, - size_t read_amp_bytes_per_bit, bool is_index = false, - Cache::Priority pri = Cache::Priority::LOW, - GetContext* get_context = nullptr, MemoryAllocator* allocator = nullptr); + size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator, + bool is_index = false, Cache::Priority pri = Cache::Priority::LOW, + GetContext* get_context = nullptr); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 14c6e0afb..9ad254a59 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -140,8 +140,13 @@ void BlockFetcher::PrepareBufferForBlockFromFile() { // If we've got a small enough hunk of data, read it in to the // trivially allocated stack buffer instead of needing a full malloc() used_buf_ = &stack_buf_[0]; + } else if (maybe_compressed_ && !do_uncompress_) { + compressed_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, + memory_allocator_compressed_); + used_buf_ = compressed_buf_.get(); } else { - heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_); + heap_buf_ = + AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_); used_buf_ = heap_buf_.get(); } } @@ -168,6 +173,12 @@ void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() { } } +inline void BlockFetcher::CopyBufferToHeap() { + assert(used_buf_ != heap_buf_.get()); + heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_); + memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize); +} + inline void BlockFetcher::GetBlockContents() { if (slice_.data() != used_buf_) { @@ -177,9 +188,14 @@ void BlockFetcher::GetBlockContents() { // page can be either uncompressed or compressed, the buffer either stack // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096 if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) { - assert(used_buf_ != heap_buf_.get()); - heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_); - memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize); + CopyBufferToHeap(); + } else if (used_buf_ == compressed_buf_.get()) { + if (compression_type_ == kNoCompression && + memory_allocator_ != memory_allocator_compressed_) { + CopyBufferToHeap(); + } else { + heap_buf_ = std::move(compressed_buf_); + } } *contents_ = BlockContents(std::move(heap_buf_), block_size_); } @@ -244,7 +260,7 @@ Status BlockFetcher::ReadBlockContents() { compression_dict_); status_ = UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_, contents_, footer_.version(), - ioptions_, allocator_); + ioptions_, memory_allocator_); compression_type_ = kNoCompression; } else { GetBlockContents(); diff --git a/table/block_fetcher.h b/table/block_fetcher.h index 21bbd43e9..aed73a392 100644 --- a/table/block_fetcher.h +++ b/table/block_fetcher.h @@ -10,7 +10,6 @@ #pragma once #include "table/block.h" #include "table/format.h" - #include "util/memory_allocator.h" namespace rocksdb { @@ -26,9 +25,11 @@ class BlockFetcher { FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const ReadOptions& read_options, const BlockHandle& handle, BlockContents* contents, const ImmutableCFOptions& ioptions, - bool do_uncompress, const Slice& compression_dict, + bool do_uncompress, bool maybe_compressed, + const Slice& compression_dict, const PersistentCacheOptions& cache_options, - MemoryAllocator* allocator = nullptr) + MemoryAllocator* memory_allocator = nullptr, + MemoryAllocator* memory_allocator_compressed = nullptr) : file_(file), prefetch_buffer_(prefetch_buffer), footer_(footer), @@ -37,9 +38,11 @@ class BlockFetcher { contents_(contents), ioptions_(ioptions), do_uncompress_(do_uncompress), + maybe_compressed_(maybe_compressed), compression_dict_(compression_dict), cache_options_(cache_options), - allocator_(allocator) {} + memory_allocator_(memory_allocator), + memory_allocator_compressed_(memory_allocator_compressed) {} Status ReadBlockContents(); CompressionType get_compression_type() const { return compression_type_; } @@ -54,14 +57,17 @@ class BlockFetcher { BlockContents* contents_; const ImmutableCFOptions& ioptions_; bool do_uncompress_; + bool maybe_compressed_; const Slice& compression_dict_; const PersistentCacheOptions& cache_options_; - MemoryAllocator* allocator_; + MemoryAllocator* memory_allocator_; + MemoryAllocator* memory_allocator_compressed_; Status status_; Slice slice_; char* used_buf_ = nullptr; size_t block_size_; CacheAllocationPtr heap_buf_; + CacheAllocationPtr compressed_buf_; char stack_buf_[kDefaultStackBufferSize]; bool got_from_prefetch_buffer_ = false; rocksdb::CompressionType compression_type_; @@ -72,6 +78,8 @@ class BlockFetcher { bool TryGetFromPrefetchBuffer(); bool TryGetCompressedBlockFromPersistentCache(); void PrepareBufferForBlockFromFile(); + // Copy content from used_buf_ to new heap buffer. + void CopyBufferToHeap(); void GetBlockContents(); void InsertCompressedBlockToPersistentCacheIfNeeded(); void InsertUncompressedBlockToPersistentCacheIfNeeded(); diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 6a75a6161..fdf8a5612 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -175,7 +175,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const ImmutableCFOptions& ioptions, TableProperties** table_properties, - bool /*compression_type_missing*/) { + bool /*compression_type_missing*/, + MemoryAllocator* memory_allocator) { assert(table_properties); Slice v = handle_value; @@ -191,9 +192,10 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, Slice compression_dict; PersistentCacheOptions cache_options; - BlockFetcher block_fetcher( - file, prefetch_buffer, footer, read_options, handle, &block_contents, - ioptions, false /* decompress */, compression_dict, cache_options); + BlockFetcher block_fetcher(file, prefetch_buffer, footer, read_options, + handle, &block_contents, ioptions, + false /* decompress */, false /*maybe_compressed*/, + compression_dict, cache_options, memory_allocator); s = block_fetcher.ReadBlockContents(); // property block is never compressed. Need to add uncompress logic if we are // to compress it.. @@ -314,9 +316,10 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, uint64_t table_magic_number, - const ImmutableCFOptions &ioptions, + const ImmutableCFOptions& ioptions, TableProperties** properties, - bool compression_type_missing) { + bool compression_type_missing, + MemoryAllocator* memory_allocator) { // -- Read metaindex block Footer footer; auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, @@ -332,10 +335,11 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, Slice compression_dict; PersistentCacheOptions cache_options; - BlockFetcher block_fetcher( - file, nullptr /* prefetch_buffer */, footer, read_options, - metaindex_handle, &metaindex_contents, ioptions, false /* decompress */, - compression_dict, cache_options); + BlockFetcher block_fetcher(file, nullptr /* prefetch_buffer */, footer, + read_options, metaindex_handle, + &metaindex_contents, ioptions, + false /* decompress */, false /*maybe_compressed*/, + compression_dict, cache_options, memory_allocator); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { return s; @@ -358,7 +362,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, TableProperties table_properties; if (found_properties_block == true) { s = ReadProperties(meta_iter->value(), file, nullptr /* prefetch_buffer */, - footer, ioptions, properties, compression_type_missing); + footer, ioptions, properties, compression_type_missing, + memory_allocator); } else { s = Status::NotFound(); } @@ -384,7 +389,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, const ImmutableCFOptions& ioptions, const std::string& meta_block_name, BlockHandle* block_handle, - bool /*compression_type_missing*/) { + bool /*compression_type_missing*/, + MemoryAllocator* memory_allocator) { Footer footer; auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, &footer, table_magic_number); @@ -401,7 +407,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, BlockFetcher block_fetcher( file, nullptr /* prefetch_buffer */, footer, read_options, metaindex_handle, &metaindex_contents, ioptions, - false /* do decompression */, compression_dict, cache_options); + false /* do decompression */, false /*maybe_compressed*/, + compression_dict, cache_options, memory_allocator); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { return s; @@ -423,8 +430,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t table_magic_number, const ImmutableCFOptions& ioptions, const std::string& meta_block_name, - BlockContents* contents, - bool /*compression_type_missing*/) { + BlockContents* contents, bool /*compression_type_missing*/, + MemoryAllocator* memory_allocator) { Status status; Footer footer; status = ReadFooterFromFile(file, prefetch_buffer, file_size, &footer, @@ -443,8 +450,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file, BlockFetcher block_fetcher(file, prefetch_buffer, footer, read_options, metaindex_handle, &metaindex_contents, ioptions, - false /* decompress */, compression_dict, - cache_options); + false /* decompress */, false /*maybe_compressed*/, + compression_dict, cache_options, memory_allocator); status = block_fetcher.ReadBlockContents(); if (!status.ok()) { return status; @@ -470,7 +477,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file, // Reading metablock BlockFetcher block_fetcher2( file, prefetch_buffer, footer, read_options, block_handle, contents, - ioptions, false /* decompress */, compression_dict, cache_options); + ioptions, false /* decompress */, false /*maybe_compressed*/, + compression_dict, cache_options, memory_allocator); return block_fetcher2.ReadBlockContents(); } diff --git a/table/meta_blocks.h b/table/meta_blocks.h index a18c8edc4..1c8fe686c 100644 --- a/table/meta_blocks.h +++ b/table/meta_blocks.h @@ -11,12 +11,13 @@ #include "db/builder.h" #include "db/table_properties_collector.h" -#include "util/kv_map.h" #include "rocksdb/comparator.h" +#include "rocksdb/memory_allocator.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "table/block_builder.h" #include "table/format.h" +#include "util/kv_map.h" namespace rocksdb { @@ -96,7 +97,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const ImmutableCFOptions& ioptions, TableProperties** table_properties, - bool compression_type_missing = false); + bool compression_type_missing = false, + MemoryAllocator* memory_allocator = nullptr); // Directly read the properties from the properties block of a plain table. // @returns a status to indicate if the operation succeeded. On success, @@ -108,9 +110,10 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, // `ReadProperties`, `FindMetaBlock`, and `ReadMetaBlock` Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, uint64_t table_magic_number, - const ImmutableCFOptions &ioptions, + const ImmutableCFOptions& ioptions, TableProperties** properties, - bool compression_type_missing = false); + bool compression_type_missing = false, + MemoryAllocator* memory_allocator = nullptr); // Find the meta block from the meta index block. Status FindMetaBlock(InternalIterator* meta_index_iter, @@ -120,10 +123,11 @@ Status FindMetaBlock(InternalIterator* meta_index_iter, // Find the meta block Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, uint64_t table_magic_number, - const ImmutableCFOptions &ioptions, + const ImmutableCFOptions& ioptions, const std::string& meta_block_name, BlockHandle* block_handle, - bool compression_type_missing = false); + bool compression_type_missing = false, + MemoryAllocator* memory_allocator = nullptr); // Read the specified meta block with name meta_block_name // from `file` and initialize `contents` with contents of this block. @@ -134,6 +138,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, const ImmutableCFOptions& ioptions, const std::string& meta_block_name, BlockContents* contents, - bool compression_type_missing = false); + bool compression_type_missing = false, + MemoryAllocator* memory_allocator = nullptr); } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index 192ba5282..5ec613bec 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3579,10 +3579,10 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { Slice compression_dict; PersistentCacheOptions cache_options; - BlockFetcher block_fetcher(file, nullptr /* prefetch_buffer */, footer, - read_options, handle, contents, ioptions, - false /* decompress */, compression_dict, - cache_options); + BlockFetcher block_fetcher( + file, nullptr /* prefetch_buffer */, footer, read_options, handle, + contents, ioptions, false /* decompress */, + false /*maybe_compressed*/, compression_dict, cache_options); ASSERT_OK(block_fetcher.ReadBlockContents()); }; @@ -3668,7 +3668,8 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) { BlockFetcher block_fetcher( table_reader.get(), nullptr /* prefetch_buffer */, footer, ReadOptions(), metaindex_handle, &metaindex_contents, ioptions, false /* decompress */, - compression_dict, pcache_opts); + false /*maybe_compressed*/, compression_dict, pcache_opts, + nullptr /*memory_allocator*/); ASSERT_OK(block_fetcher.ReadBlockContents()); Block metaindex_block(std::move(metaindex_contents), kDisableGlobalSequenceNumber);