From b6b72687c626ab438c5225b5209fe7f0d18fdf3f Mon Sep 17 00:00:00 2001 From: Fosco Marotto Date: Thu, 4 Oct 2018 15:17:30 -0700 Subject: [PATCH] Revert "Introduce CacheAllocator, a custom allocator for cache blocks (#4437)" This reverts commit 1cf5deb8fdecb7f63ce5ce1a0e942222a95f881e. --- HISTORY.md | 1 - cache/lru_cache.cc | 19 +++---- cache/lru_cache.h | 3 +- cache/sharded_cache.cc | 9 +--- cache/sharded_cache.h | 3 +- include/rocksdb/cache.h | 28 +++------- include/rocksdb/cache_allocator.h | 29 ----------- table/block_based_table_builder.cc | 4 +- table/block_based_table_reader.cc | 69 +++++++++--------------- table/block_based_table_reader.h | 5 +- table/block_fetcher.cc | 17 +++--- table/block_fetcher.h | 9 +--- table/format.cc | 41 ++++++--------- table/format.h | 21 ++------ table/plain_table_reader.h | 4 +- table/table_test.cc | 72 ------------------------- tools/db_bench_tool.cc | 23 ++++---- util/cache_allocator.h | 38 -------------- util/compression.h | 84 +++++++++++++++--------------- 19 files changed, 129 insertions(+), 350 deletions(-) delete mode 100644 include/rocksdb/cache_allocator.h delete mode 100644 util/cache_allocator.h diff --git a/HISTORY.md b/HISTORY.md index d0503aced..b4baa7162 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,7 +7,6 @@ ### New Features * TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. Could be used for optimizing certain transactions or during recovery. -* Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache. ### Bug Fixes * Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction. diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index 9f3acd16a..d4cbb9a45 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -461,10 +461,8 @@ std::string LRUCacheShard::GetPrintableOptions() const { } LRUCache::LRUCache(size_t capacity, int num_shard_bits, - bool strict_capacity_limit, double high_pri_pool_ratio, - std::shared_ptr allocator) - : ShardedCache(capacity, num_shard_bits, strict_capacity_limit, - std::move(allocator)) { + bool strict_capacity_limit, double high_pri_pool_ratio) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) { num_shards_ = 1 << num_shard_bits; shards_ = reinterpret_cast( port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_)); @@ -539,14 +537,12 @@ double LRUCache::GetHighPriPoolRatio() { std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts) { return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, cache_opts.strict_capacity_limit, - cache_opts.high_pri_pool_ratio, - cache_opts.cache_allocator); + cache_opts.high_pri_pool_ratio); } -std::shared_ptr NewLRUCache( - size_t capacity, int num_shard_bits, bool strict_capacity_limit, - double high_pri_pool_ratio, - std::shared_ptr cache_allocator) { +std::shared_ptr NewLRUCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit, + double high_pri_pool_ratio) { if (num_shard_bits >= 20) { return nullptr; // the cache cannot be sharded into too many fine pieces } @@ -558,8 +554,7 @@ std::shared_ptr NewLRUCache( num_shard_bits = GetDefaultCacheShardBits(capacity); } return std::make_shared(capacity, num_shard_bits, - strict_capacity_limit, high_pri_pool_ratio, - std::move(cache_allocator)); + strict_capacity_limit, high_pri_pool_ratio); } } // namespace rocksdb diff --git a/cache/lru_cache.h b/cache/lru_cache.h index 0b925a166..3c067f0c1 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -279,8 +279,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard { class LRUCache : public ShardedCache { public: LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, - double high_pri_pool_ratio, - std::shared_ptr cache_allocator = nullptr); + double high_pri_pool_ratio); virtual ~LRUCache(); virtual const char* Name() const override { return "LRUCache"; } virtual CacheShard* GetShard(int shard) override; diff --git a/cache/sharded_cache.cc b/cache/sharded_cache.cc index 3fef82a79..6a0a22282 100644 --- a/cache/sharded_cache.cc +++ b/cache/sharded_cache.cc @@ -20,10 +20,8 @@ namespace rocksdb { ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, - bool strict_capacity_limit, - std::shared_ptr allocator) - : Cache(std::move(allocator)), - num_shard_bits_(num_shard_bits), + bool strict_capacity_limit) + : num_shard_bits_(num_shard_bits), capacity_(capacity), strict_capacity_limit_(strict_capacity_limit), last_id_(1) {} @@ -144,9 +142,6 @@ std::string ShardedCache::GetPrintableOptions() const { strict_capacity_limit_); ret.append(buffer); } - snprintf(buffer, kBufferSize, " cache_allocator : %s\n", - cache_allocator() ? cache_allocator()->Name() : "None"); - ret.append(buffer); ret.append(GetShard(0)->GetPrintableOptions()); return ret; } diff --git a/cache/sharded_cache.h b/cache/sharded_cache.h index 9876a882b..4f9dea2ad 100644 --- a/cache/sharded_cache.h +++ b/cache/sharded_cache.h @@ -47,8 +47,7 @@ class CacheShard { // Keys are sharded by the highest num_shard_bits bits of hash value. class ShardedCache : public Cache { public: - ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, - std::shared_ptr cache_allocator = nullptr); + ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit); virtual ~ShardedCache() = default; virtual const char* Name() const override = 0; virtual CacheShard* GetShard(int shard) = 0; diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index d4d2b6510..da3b934d8 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -25,7 +25,6 @@ #include #include #include -#include "rocksdb/cache_allocator.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" @@ -59,20 +58,13 @@ struct LRUCacheOptions { // BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority. double high_pri_pool_ratio = 0.0; - // If non-nullptr will use this allocator instead of system allocator when - // allocating memory for cache blocks. Call this method before you start using - // the cache! - std::shared_ptr cache_allocator; - LRUCacheOptions() {} LRUCacheOptions(size_t _capacity, int _num_shard_bits, - bool _strict_capacity_limit, double _high_pri_pool_ratio, - std::shared_ptr _cache_allocator = nullptr) + bool _strict_capacity_limit, double _high_pri_pool_ratio) : capacity(_capacity), num_shard_bits(_num_shard_bits), strict_capacity_limit(_strict_capacity_limit), - high_pri_pool_ratio(_high_pri_pool_ratio), - cache_allocator(std::move(_cache_allocator)) {} + high_pri_pool_ratio(_high_pri_pool_ratio) {} }; // Create a new cache with a fixed size capacity. The cache is sharded @@ -83,10 +75,10 @@ struct LRUCacheOptions { // high_pri_pool_pct. // num_shard_bits = -1 means it is automatically determined: every shard // will be at least 512KB and number of shard bits will not exceed 6. -extern std::shared_ptr NewLRUCache( - size_t capacity, int num_shard_bits = -1, - bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0, - std::shared_ptr cache_allocator = nullptr); +extern std::shared_ptr NewLRUCache(size_t capacity, + int num_shard_bits = -1, + bool strict_capacity_limit = false, + double high_pri_pool_ratio = 0.0); extern std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts); @@ -99,15 +91,13 @@ extern std::shared_ptr NewClockCache(size_t capacity, int num_shard_bits = -1, bool strict_capacity_limit = false); - class Cache { public: // Depending on implementation, cache entries with high priority could be less // likely to get evicted than low priority entries. enum class Priority { HIGH, LOW }; - Cache(std::shared_ptr allocator = nullptr) - : cache_allocator_(std::move(allocator)) {} + Cache() {} // Destroys all existing entries by calling the "deleter" // function that was passed via the Insert() function. @@ -238,14 +228,10 @@ class Cache { virtual void TEST_mark_as_data_block(const Slice& /*key*/, size_t /*charge*/) {} - CacheAllocator* cache_allocator() const { return cache_allocator_.get(); } - private: // No copying allowed Cache(const Cache&); Cache& operator=(const Cache&); - - std::shared_ptr cache_allocator_; }; } // namespace rocksdb diff --git a/include/rocksdb/cache_allocator.h b/include/rocksdb/cache_allocator.h deleted file mode 100644 index 5bbec0e8a..000000000 --- a/include/rocksdb/cache_allocator.h +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -#pragma once - -// CacheAllocator is an interface that a client can implement to supply custom -// cache allocation and deallocation methods. See rocksdb/cache.h for more -// information. -// All methods should be thread-safe. -class CacheAllocator { - public: - virtual ~CacheAllocator() = default; - - // Name of the cache allocator, printed in the log - virtual const char* Name() const = 0; - - // Allocate a block of at least size size - virtual void* Allocate(size_t size) = 0; - // Deallocate previously allocated block - virtual void Deallocate(void* p) = 0; - // Returns the memory size of the block allocated at p. The default - // implementation that just returns the original allocation_size is fine. - virtual size_t UsableSize(void* /*p*/, size_t allocation_size) const { - // default implementation just returns the allocation size - return allocation_size; - } -}; diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 26c14d21d..59c385d65 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -39,7 +39,6 @@ #include "table/full_filter_block.h" #include "table/table_builder.h" -#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -655,8 +654,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, size_t size = block_contents.size(); - auto ubuf = - AllocateBlock(size + 1, block_cache_compressed->cache_allocator()); + std::unique_ptr ubuf(new char[size + 1]); memcpy(ubuf.get(), block_contents.data(), size); ubuf[size] = type; diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 7e7cec1b1..9f2e02d68 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -80,12 +80,11 @@ Status ReadBlockFromFile( std::unique_ptr* result, const ImmutableCFOptions& ioptions, bool do_uncompress, const Slice& compression_dict, const PersistentCacheOptions& cache_options, SequenceNumber global_seqno, - size_t read_amp_bytes_per_bit, CacheAllocator* allocator = nullptr, - const bool immortal_file = false) { + size_t read_amp_bytes_per_bit, const bool immortal_file = false) { BlockContents contents; - BlockFetcher block_fetcher( - file, prefetch_buffer, footer, options, handle, &contents, ioptions, - do_uncompress, compression_dict, cache_options, allocator, immortal_file); + BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle, + &contents, ioptions, do_uncompress, + compression_dict, cache_options, immortal_file); Status s = block_fetcher.ReadBlockContents(); if (s.ok()) { result->reset(new Block(std::move(contents), global_seqno, @@ -95,13 +94,6 @@ Status ReadBlockFromFile( return s; } -inline CacheAllocator* GetCacheAllocator( - const BlockBasedTableOptions& table_options) { - return table_options.block_cache.get() - ? table_options.block_cache->cache_allocator() - : nullptr; -} - // Delete the resource that is held by the iterator. template void DeleteHeldResource(void* arg, void* /*ignored*/) { @@ -1158,8 +1150,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, rep->footer.metaindex_handle(), &meta, rep->ioptions, true /* decompress */, Slice() /*compression dict*/, rep->persistent_cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */, - GetCacheAllocator(rep->table_options)); + 0 /* read_amp_bytes_per_bit */); if (!s.ok()) { ROCKS_LOG_ERROR(rep->ioptions.info_log, @@ -1182,7 +1173,7 @@ Status BlockBasedTable::GetDataBlockFromCache( const ImmutableCFOptions& ioptions, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index, - GetContext* get_context, CacheAllocator* allocator) { + GetContext* get_context) { Status s; Block* compressed_block = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr; @@ -1239,7 +1230,7 @@ Status BlockBasedTable::GetDataBlockFromCache( compression_dict); s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(), compressed_block->size(), &contents, - format_version, ioptions, allocator); + format_version, ioptions); // Insert uncompressed block into block cache if (s.ok()) { @@ -1301,8 +1292,7 @@ Status BlockBasedTable::PutDataBlockToCache( const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions, CachableEntry* block, Block* raw_block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index, - Cache::Priority priority, GetContext* get_context, - CacheAllocator* allocator) { + Cache::Priority priority, GetContext* get_context) { assert(raw_block->compression_type() == kNoCompression || block_cache_compressed != nullptr); @@ -1315,7 +1305,7 @@ Status BlockBasedTable::PutDataBlockToCache( compression_dict); s = UncompressBlockContents(uncompression_ctx, raw_block->data(), raw_block->size(), &contents, format_version, - ioptions, allocator); + ioptions); } if (!s.ok()) { delete raw_block; @@ -1412,8 +1402,7 @@ FilterBlockReader* BlockBasedTable::ReadFilter( BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(), filter_handle, &block, rep->ioptions, false /* decompress */, - dummy_comp_dict, rep->persistent_cache_options, - GetCacheAllocator(rep->table_options)); + dummy_comp_dict, rep->persistent_cache_options); Status s = block_fetcher.ReadBlockContents(); if (!s.ok()) { @@ -1711,9 +1700,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( &block_value, rep->ioptions, rep->blocks_maybe_compressed, compression_dict, rep->persistent_cache_options, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit, - GetCacheAllocator(rep->table_options), - rep->immortal_table); + rep->table_options.read_amp_bytes_per_bit, rep->immortal_table); } if (s.ok()) { block.value = block_value.release(); @@ -1805,8 +1792,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( s = GetDataBlockFromCache( key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, block_entry, rep->table_options.format_version, compression_dict, - rep->table_options.read_amp_bytes_per_bit, is_index, get_context, - GetCacheAllocator(rep->table_options)); + rep->table_options.read_amp_bytes_per_bit, is_index, get_context); if (block_entry->value == nullptr && !no_io && ro.fill_cache) { std::unique_ptr raw_block; @@ -1818,9 +1804,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( block_cache_compressed == nullptr && rep->blocks_maybe_compressed, compression_dict, rep->persistent_cache_options, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit, - GetCacheAllocator(rep->table_options), - rep->immortal_table); + rep->table_options.read_amp_bytes_per_bit, rep->immortal_table); } if (s.ok()) { @@ -1833,7 +1817,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( .cache_index_and_filter_blocks_with_high_priority ? Cache::Priority::HIGH : Cache::Priority::LOW, - get_context, GetCacheAllocator(rep->table_options)); + get_context); } } } @@ -2540,12 +2524,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks( BlockHandle handle = index_iter->value(); BlockContents contents; Slice dummy_comp_dict; - BlockFetcher block_fetcher( - rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, - ReadOptions(), handle, &contents, rep_->ioptions, - false /* decompress */, dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options, - GetCacheAllocator(rep_->table_options)); + BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */, + rep_->footer, ReadOptions(), handle, &contents, + rep_->ioptions, false /* decompress */, + dummy_comp_dict /*compression dict*/, + rep_->persistent_cache_options); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { break; @@ -2567,12 +2550,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks( s = handle.DecodeFrom(&input); BlockContents contents; Slice dummy_comp_dict; - BlockFetcher block_fetcher( - rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, - ReadOptions(), handle, &contents, rep_->ioptions, - false /* decompress */, dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options, - GetCacheAllocator(rep_->table_options)); + BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */, + rep_->footer, ReadOptions(), handle, &contents, + rep_->ioptions, false /* decompress */, + dummy_comp_dict /*compression dict*/, + rep_->persistent_cache_options); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { break; @@ -2876,8 +2858,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file, rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer, ReadOptions(), handle, &block, rep_->ioptions, false /*decompress*/, dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options, - GetCacheAllocator(rep_->table_options)); + rep_->persistent_cache_options); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { rep_->filter.reset(new BlockBasedFilterBlockReader( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 59a0f36b5..3cada0c2c 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -303,8 +303,7 @@ class BlockBasedTable : public TableReader { const ImmutableCFOptions& ioptions, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, - bool is_index = false, GetContext* get_context = nullptr, - CacheAllocator* allocator = nullptr); + bool is_index = false, GetContext* get_context = nullptr); // Put a raw block (maybe compressed) to the corresponding block caches. // This method will perform decompression against raw_block if needed and then @@ -323,7 +322,7 @@ class BlockBasedTable : public TableReader { CachableEntry* block, Block* raw_block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index = false, Cache::Priority pri = Cache::Priority::LOW, - GetContext* get_context = nullptr, CacheAllocator* allocator = nullptr); + GetContext* get_context = nullptr); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 489705758..ea97066ec 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -17,9 +17,8 @@ #include "rocksdb/env.h" #include "table/block.h" #include "table/block_based_table_reader.h" -#include "table/format.h" #include "table/persistent_cache_helper.h" -#include "util/cache_allocator.h" +#include "table/format.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -108,11 +107,9 @@ bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() { if (cache_options_.persistent_cache && cache_options_.persistent_cache->IsCompressed()) { // lookup uncompressed cache mode p-cache - std::unique_ptr raw_data; status_ = PersistentCacheHelper::LookupRawPage( - cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize); + cache_options_, handle_, &heap_buf_, block_size_ + kBlockTrailerSize); if (status_.ok()) { - heap_buf_ = CacheAllocationPtr(raw_data.release()); used_buf_ = heap_buf_.get(); slice_ = Slice(heap_buf_.get(), block_size_); return true; @@ -135,7 +132,7 @@ void BlockFetcher::PrepareBufferForBlockFromFile() { // trivially allocated stack buffer instead of needing a full malloc() used_buf_ = &stack_buf_[0]; } else { - heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_); + heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]); used_buf_ = heap_buf_.get(); } } @@ -173,7 +170,7 @@ void BlockFetcher::GetBlockContents() { // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096 if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) { assert(used_buf_ != heap_buf_.get()); - heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_); + heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]); memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize); } *contents_ = BlockContents(std::move(heap_buf_), block_size_, true, @@ -231,9 +228,9 @@ Status BlockFetcher::ReadBlockContents() { if (do_uncompress_ && compression_type != kNoCompression) { // compressed page, uncompress, update cache UncompressionContext uncompression_ctx(compression_type, compression_dict_); - status_ = UncompressBlockContents(uncompression_ctx, slice_.data(), - block_size_, contents_, footer_.version(), - ioptions_, allocator_); + status_ = + UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_, + contents_, footer_.version(), ioptions_); } else { GetBlockContents(); } diff --git a/table/block_fetcher.h b/table/block_fetcher.h index a8d9d6572..9e0d2448d 100644 --- a/table/block_fetcher.h +++ b/table/block_fetcher.h @@ -11,8 +11,6 @@ #include "table/block.h" #include "table/format.h" -#include "util/cache_allocator.h" - namespace rocksdb { class BlockFetcher { public: @@ -28,7 +26,6 @@ class BlockFetcher { BlockContents* contents, const ImmutableCFOptions& ioptions, bool do_uncompress, const Slice& compression_dict, const PersistentCacheOptions& cache_options, - CacheAllocator* allocator = nullptr, const bool immortal_source = false) : file_(file), prefetch_buffer_(prefetch_buffer), @@ -40,8 +37,7 @@ class BlockFetcher { do_uncompress_(do_uncompress), immortal_source_(immortal_source), compression_dict_(compression_dict), - cache_options_(cache_options), - allocator_(allocator) {} + cache_options_(cache_options) {} Status ReadBlockContents(); private: @@ -58,12 +54,11 @@ class BlockFetcher { const bool immortal_source_; const Slice& compression_dict_; const PersistentCacheOptions& cache_options_; - CacheAllocator* allocator_; Status status_; Slice slice_; char* used_buf_ = nullptr; size_t block_size_; - CacheAllocationPtr heap_buf_; + std::unique_ptr heap_buf_; char stack_buf_[kDefaultStackBufferSize]; bool got_from_prefetch_buffer_ = false; rocksdb::CompressionType compression_type; diff --git a/table/format.cc b/table/format.cc index f565fb14a..16d959c3d 100644 --- a/table/format.cc +++ b/table/format.cc @@ -19,7 +19,6 @@ #include "table/block_based_table_reader.h" #include "table/block_fetcher.h" #include "table/persistent_cache_helper.h" -#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -280,9 +279,8 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, Status UncompressBlockContentsForCompressionType( const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t format_version, - const ImmutableCFOptions& ioptions, - CacheAllocator* allocator) { - CacheAllocationPtr ubuf; + const ImmutableCFOptions& ioptions) { + std::unique_ptr ubuf; assert(uncompression_ctx.type() != kNoCompression && "Invalid compression type"); @@ -298,7 +296,7 @@ Status UncompressBlockContentsForCompressionType( if (!Snappy_GetUncompressedLength(data, n, &ulength)) { return Status::Corruption(snappy_corrupt_msg); } - ubuf = AllocateBlock(ulength, allocator); + ubuf.reset(new char[ulength]); if (!Snappy_Uncompress(data, n, ubuf.get())) { return Status::Corruption(snappy_corrupt_msg); } @@ -306,10 +304,9 @@ Status UncompressBlockContentsForCompressionType( break; } case kZlibCompression: - ubuf = Zlib_Uncompress( + ubuf.reset(Zlib_Uncompress( uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kZlibCompression, format_version), - allocator); + GetCompressFormatForVersion(kZlibCompression, format_version))); if (!ubuf) { static char zlib_corrupt_msg[] = "Zlib not supported or corrupted Zlib compressed block contents"; @@ -319,10 +316,9 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kBZip2Compression: - ubuf = BZip2_Uncompress( + ubuf.reset(BZip2_Uncompress( data, n, &decompress_size, - GetCompressFormatForVersion(kBZip2Compression, format_version), - allocator); + GetCompressFormatForVersion(kBZip2Compression, format_version))); if (!ubuf) { static char bzip2_corrupt_msg[] = "Bzip2 not supported or corrupted Bzip2 compressed block contents"; @@ -332,10 +328,9 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4Compression: - ubuf = LZ4_Uncompress( + ubuf.reset(LZ4_Uncompress( uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4Compression, format_version), - allocator); + GetCompressFormatForVersion(kLZ4Compression, format_version))); if (!ubuf) { static char lz4_corrupt_msg[] = "LZ4 not supported or corrupted LZ4 compressed block contents"; @@ -345,10 +340,9 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4HCCompression: - ubuf = LZ4_Uncompress( + ubuf.reset(LZ4_Uncompress( uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4HCCompression, format_version), - allocator); + GetCompressFormatForVersion(kLZ4HCCompression, format_version))); if (!ubuf) { static char lz4hc_corrupt_msg[] = "LZ4HC not supported or corrupted LZ4HC compressed block contents"; @@ -358,8 +352,6 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kXpressCompression: - // XPRESS allocates memory internally, thus no support for custom - // allocator. ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size)); if (!ubuf) { static char xpress_corrupt_msg[] = @@ -371,8 +363,7 @@ Status UncompressBlockContentsForCompressionType( break; case kZSTD: case kZSTDNotFinalCompression: - ubuf = ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size, - allocator); + ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size)); if (!ubuf) { static char zstd_corrupt_msg[] = "ZSTD not supported or corrupted ZSTD compressed block contents"; @@ -405,13 +396,11 @@ Status UncompressBlockContentsForCompressionType( Status UncompressBlockContents(const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t format_version, - const ImmutableCFOptions& ioptions, - CacheAllocator* allocator) { + const ImmutableCFOptions& ioptions) { assert(data[n] != kNoCompression); assert(data[n] == uncompression_ctx.type()); - return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n, - contents, format_version, - ioptions, allocator); + return UncompressBlockContentsForCompressionType( + uncompression_ctx, data, n, contents, format_version, ioptions); } } // namespace rocksdb diff --git a/table/format.h b/table/format.h index 441d7107e..6e0e99c1c 100644 --- a/table/format.h +++ b/table/format.h @@ -26,7 +26,6 @@ #include "port/port.h" // noexcept #include "table/persistent_cache_options.h" #include "util/file_reader_writer.h" -#include "util/cache_allocator.h" namespace rocksdb { @@ -193,7 +192,7 @@ struct BlockContents { Slice data; // Actual contents of data bool cachable; // True iff data can be cached CompressionType compression_type; - CacheAllocationPtr allocation; + std::unique_ptr allocation; BlockContents() : cachable(false), compression_type(kNoCompression) {} @@ -201,28 +200,16 @@ struct BlockContents { CompressionType _compression_type) : data(_data), cachable(_cachable), compression_type(_compression_type) {} - BlockContents(CacheAllocationPtr&& _data, size_t _size, bool _cachable, + BlockContents(std::unique_ptr&& _data, size_t _size, bool _cachable, CompressionType _compression_type) : data(_data.get(), _size), cachable(_cachable), compression_type(_compression_type), allocation(std::move(_data)) {} - BlockContents(std::unique_ptr&& _data, size_t _size, bool _cachable, - CompressionType _compression_type) - : data(_data.get(), _size), - cachable(_cachable), - compression_type(_compression_type) { - allocation.reset(_data.release()); - } - // The additional memory space taken by the block data. size_t usable_size() const { if (allocation.get() != nullptr) { - auto allocator = allocation.get_deleter().allocator; - if (allocator) { - return allocator->UsableSize(allocation.get(), data.size()); - } #ifdef ROCKSDB_MALLOC_USABLE_SIZE return malloc_usable_size(allocation.get()); #else @@ -265,7 +252,7 @@ extern Status ReadBlockContents( extern Status UncompressBlockContents( const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t compress_format_version, - const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr); + const ImmutableCFOptions& ioptions); // This is an extension to UncompressBlockContents that accepts // a specific compression type. This is used by un-wrapped blocks @@ -273,7 +260,7 @@ extern Status UncompressBlockContents( extern Status UncompressBlockContentsForCompressionType( const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t compress_format_version, - const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr); + const ImmutableCFOptions& ioptions); // Implementation details follow. Clients should ignore, diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index d7a8ed4aa..df08a98fa 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -153,8 +153,8 @@ class PlainTableReader: public TableReader { DynamicBloom bloom_; PlainTableReaderFileInfo file_info_; Arena arena_; - CacheAllocationPtr index_block_alloc_; - CacheAllocationPtr bloom_block_alloc_; + std::unique_ptr index_block_alloc_; + std::unique_ptr bloom_block_alloc_; const ImmutableCFOptions& ioptions_; uint64_t file_size_; diff --git a/table/table_test.cc b/table/table_test.cc index a525c5866..26383fa81 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -2477,78 +2477,6 @@ TEST_P(BlockBasedTableTest, BlockCacheLeak) { c.ResetTableReader(); } -namespace { -class CustomCacheAllocator : public CacheAllocator { - public: - virtual const char* Name() const override { return "CustomCacheAllocator"; } - - void* Allocate(size_t size) override { - ++numAllocations; - auto ptr = new char[size + 16]; - memcpy(ptr, "cache_allocator_", 16); // mangle first 16 bytes - return reinterpret_cast(ptr + 16); - } - void Deallocate(void* p) override { - ++numDeallocations; - char* ptr = reinterpret_cast(p) - 16; - delete[] ptr; - } - - std::atomic numAllocations; - std::atomic numDeallocations; -}; -} // namespace - -TEST_P(BlockBasedTableTest, CacheAllocator) { - auto custom_cache_allocator = std::make_shared(); - { - Options opt; - unique_ptr ikc; - ikc.reset(new test::PlainInternalKeyComparator(opt.comparator)); - opt.compression = kNoCompression; - BlockBasedTableOptions table_options; - table_options.block_size = 1024; - LRUCacheOptions lruOptions; - lruOptions.cache_allocator = custom_cache_allocator; - lruOptions.capacity = 16 * 1024 * 1024; - lruOptions.num_shard_bits = 4; - table_options.block_cache = NewLRUCache(std::move(lruOptions)); - opt.table_factory.reset(NewBlockBasedTableFactory(table_options)); - - TableConstructor c(BytewiseComparator(), - true /* convert_to_internal_key_ */); - c.Add("k01", "hello"); - c.Add("k02", "hello2"); - c.Add("k03", std::string(10000, 'x')); - c.Add("k04", std::string(200000, 'x')); - c.Add("k05", std::string(300000, 'x')); - c.Add("k06", "hello3"); - c.Add("k07", std::string(100000, 'x')); - std::vector keys; - stl_wrappers::KVMap kvmap; - const ImmutableCFOptions ioptions(opt); - const MutableCFOptions moptions(opt); - c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys, &kvmap); - - unique_ptr iter( - c.NewIterator(moptions.prefix_extractor.get())); - iter->SeekToFirst(); - while (iter->Valid()) { - iter->key(); - iter->value(); - iter->Next(); - } - ASSERT_OK(iter->status()); - } - - // out of scope, block cache should have been deleted, all allocations - // deallocated - EXPECT_EQ(custom_cache_allocator->numAllocations.load(), - custom_cache_allocator->numDeallocations.load()); - // make sure that allocations actually happened through the cache allocator - EXPECT_GT(custom_cache_allocator->numAllocations.load(), 0); -} - TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) { // A regression test to avoid data race described in // https://github.com/facebook/rocksdb/issues/1267 diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 9b72f3a91..e3560d6fa 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -3040,7 +3040,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { int64_t bytes = 0; int decompress_size; while (ok && bytes < 1024 * 1048576) { - CacheAllocationPtr uncompressed; + char *uncompressed = nullptr; switch (FLAGS_compression_type_e) { case rocksdb::kSnappyCompression: { // get size and allocate here to make comparison fair @@ -3050,44 +3050,45 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = false; break; } - uncompressed = AllocateBlock(ulength, nullptr); + uncompressed = new char[ulength]; ok = Snappy_Uncompress(compressed.data(), compressed.size(), - uncompressed.get()); + uncompressed); break; } case rocksdb::kZlibCompression: uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed.get() != nullptr; + ok = uncompressed != nullptr; break; case rocksdb::kBZip2Compression: uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed.get() != nullptr; + ok = uncompressed != nullptr; break; case rocksdb::kLZ4Compression: uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed.get() != nullptr; + ok = uncompressed != nullptr; break; case rocksdb::kLZ4HCCompression: uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed.get() != nullptr; + ok = uncompressed != nullptr; break; case rocksdb::kXpressCompression: - uncompressed.reset(XPRESS_Uncompress( - compressed.data(), compressed.size(), &decompress_size)); - ok = uncompressed.get() != nullptr; + uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(), + &decompress_size); + ok = uncompressed != nullptr; break; case rocksdb::kZSTD: uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size); - ok = uncompressed.get() != nullptr; + ok = uncompressed != nullptr; break; default: ok = false; } + delete[] uncompressed; bytes += input.size(); thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress); } diff --git a/util/cache_allocator.h b/util/cache_allocator.h deleted file mode 100644 index 68dd3dcea..000000000 --- a/util/cache_allocator.h +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). -// - -#pragma once - -#include "rocksdb/cache_allocator.h" - -namespace rocksdb { - -struct CustomDeleter { - CustomDeleter(CacheAllocator* a = nullptr) : allocator(a) {} - - void operator()(char* ptr) const { - if (allocator) { - allocator->Deallocate(reinterpret_cast(ptr)); - } else { - delete[] ptr; - } - } - - CacheAllocator* allocator; -}; - -using CacheAllocationPtr = std::unique_ptr; - -inline CacheAllocationPtr AllocateBlock(size_t size, - CacheAllocator* allocator) { - if (allocator) { - auto block = reinterpret_cast(allocator->Allocate(size)); - return CacheAllocationPtr(block, allocator); - } - return CacheAllocationPtr(new char[size]); -} - -} // namespace rocksdb diff --git a/util/compression.h b/util/compression.h index d7bab05ed..e918e14fb 100644 --- a/util/compression.h +++ b/util/compression.h @@ -14,8 +14,6 @@ #include #include "rocksdb/options.h" -#include "rocksdb/table.h" -#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression_context_cache.h" @@ -497,10 +495,11 @@ inline bool Zlib_Compress(const CompressionContext& ctx, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline CacheAllocationPtr Zlib_Uncompress( - const UncompressionContext& ctx, const char* input_data, - size_t input_length, int* decompress_size, uint32_t compress_format_version, - CacheAllocator* allocator = nullptr, int windowBits = -14) { +inline char* Zlib_Uncompress(const UncompressionContext& ctx, + const char* input_data, size_t input_length, + int* decompress_size, + uint32_t compress_format_version, + int windowBits = -14) { #ifdef ZLIB uint32_t output_len = 0; if (compress_format_version == 2) { @@ -542,9 +541,9 @@ inline CacheAllocationPtr Zlib_Uncompress( _stream.next_in = (Bytef*)input_data; _stream.avail_in = static_cast(input_length); - auto output = AllocateBlock(output_len, allocator); + char* output = new char[output_len]; - _stream.next_out = (Bytef*)output.get(); + _stream.next_out = (Bytef*)output; _stream.avail_out = static_cast(output_len); bool done = false; @@ -562,17 +561,19 @@ inline CacheAllocationPtr Zlib_Uncompress( size_t old_sz = output_len; uint32_t output_len_delta = output_len / 5; output_len += output_len_delta < 10 ? 10 : output_len_delta; - auto tmp = AllocateBlock(output_len, allocator); - memcpy(tmp.get(), output.get(), old_sz); - output = std::move(tmp); + char* tmp = new char[output_len]; + memcpy(tmp, output, old_sz); + delete[] output; + output = tmp; // Set more output. - _stream.next_out = (Bytef*)(output.get() + old_sz); + _stream.next_out = (Bytef*)(output + old_sz); _stream.avail_out = static_cast(output_len - old_sz); break; } case Z_BUF_ERROR: default: + delete[] output; inflateEnd(&_stream); return nullptr; } @@ -659,9 +660,9 @@ inline bool BZip2_Compress(const CompressionContext& /*ctx*/, // block header // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format -inline CacheAllocationPtr BZip2_Uncompress( - const char* input_data, size_t input_length, int* decompress_size, - uint32_t compress_format_version, CacheAllocator* allocator = nullptr) { +inline char* BZip2_Uncompress(const char* input_data, size_t input_length, + int* decompress_size, + uint32_t compress_format_version) { #ifdef BZIP2 uint32_t output_len = 0; if (compress_format_version == 2) { @@ -689,9 +690,9 @@ inline CacheAllocationPtr BZip2_Uncompress( _stream.next_in = (char*)input_data; _stream.avail_in = static_cast(input_length); - auto output = AllocateBlock(output_len, allocator); + char* output = new char[output_len]; - _stream.next_out = (char*)output.get(); + _stream.next_out = (char*)output; _stream.avail_out = static_cast(output_len); bool done = false; @@ -708,16 +709,18 @@ inline CacheAllocationPtr BZip2_Uncompress( assert(compress_format_version != 2); uint32_t old_sz = output_len; output_len = output_len * 1.2; - auto tmp = AllocateBlock(output_len, allocator); - memcpy(tmp.get(), output.get(), old_sz); - output = std::move(tmp); + char* tmp = new char[output_len]; + memcpy(tmp, output, old_sz); + delete[] output; + output = tmp; // Set more output. - _stream.next_out = (char*)(output.get() + old_sz); + _stream.next_out = (char*)(output + old_sz); _stream.avail_out = static_cast(output_len - old_sz); break; } default: + delete[] output; BZ2_bzDecompressEnd(&_stream); return nullptr; } @@ -811,12 +814,10 @@ inline bool LZ4_Compress(const CompressionContext& ctx, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx, - const char* input_data, - size_t input_length, - int* decompress_size, - uint32_t compress_format_version, - CacheAllocator* allocator = nullptr) { +inline char* LZ4_Uncompress(const UncompressionContext& ctx, + const char* input_data, size_t input_length, + int* decompress_size, + uint32_t compress_format_version) { #ifdef LZ4 uint32_t output_len = 0; if (compress_format_version == 2) { @@ -836,7 +837,7 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx, input_data += 8; } - auto output = AllocateBlock(output_len, allocator); + char* output = new char[output_len]; #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); if (ctx.dict().size()) { @@ -844,16 +845,17 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx, static_cast(ctx.dict().size())); } *decompress_size = LZ4_decompress_safe_continue( - stream, input_data, output.get(), static_cast(input_length), + stream, input_data, output, static_cast(input_length), static_cast(output_len)); LZ4_freeStreamDecode(stream); #else // up to r123 - *decompress_size = LZ4_decompress_safe(input_data, output.get(), - static_cast(input_length), - static_cast(output_len)); + *decompress_size = + LZ4_decompress_safe(input_data, output, static_cast(input_length), + static_cast(output_len)); #endif // LZ4_VERSION_NUMBER >= 10400 if (*decompress_size < 0) { + delete[] output; return nullptr; } assert(*decompress_size == static_cast(output_len)); @@ -864,7 +866,6 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx, (void)input_length; (void)decompress_size; (void)compress_format_version; - (void)allocator; return nullptr; #endif } @@ -1027,11 +1028,9 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input, // @param compression_dict Data for presetting the compression library's // dictionary. -inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx, - const char* input_data, - size_t input_length, - int* decompress_size, - CacheAllocator* allocator = nullptr) { +inline char* ZSTD_Uncompress(const UncompressionContext& ctx, + const char* input_data, size_t input_length, + int* decompress_size) { #ifdef ZSTD uint32_t output_len = 0; if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, @@ -1039,17 +1038,17 @@ inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx, return nullptr; } - auto output = AllocateBlock(output_len, allocator); + char* output = new char[output_len]; size_t actual_output_length; #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ ZSTD_DCtx* context = ctx.GetZSTDContext(); assert(context != nullptr); actual_output_length = ZSTD_decompress_usingDict( - context, output.get(), output_len, input_data, input_length, - ctx.dict().data(), ctx.dict().size()); + context, output, output_len, input_data, input_length, ctx.dict().data(), + ctx.dict().size()); #else // up to v0.4.x actual_output_length = - ZSTD_decompress(output.get(), output_len, input_data, input_length); + ZSTD_decompress(output, output_len, input_data, input_length); #endif // ZSTD_VERSION_NUMBER >= 500 assert(actual_output_length == output_len); *decompress_size = static_cast(actual_output_length); @@ -1059,7 +1058,6 @@ inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx, (void)input_data; (void)input_length; (void)decompress_size; - (void)allocator; return nullptr; #endif }