Revert "Introduce CacheAllocator, a custom allocator for cache blocks (#4437)"

This reverts commit 1cf5deb8fdecb7f63ce5ce1a0e942222a95f881e.
This commit is contained in:
Fosco Marotto 2018-10-04 15:17:30 -07:00
parent a1f6142f38
commit b6b72687c6
19 changed files with 129 additions and 350 deletions

View File

@ -7,7 +7,6 @@
### New Features
* TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. Could be used for optimizing certain transactions or during recovery.
* Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache.
### Bug Fixes
* Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction.

19
cache/lru_cache.cc vendored
View File

@ -461,10 +461,8 @@ std::string LRUCacheShard::GetPrintableOptions() const {
}
LRUCache::LRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit, double high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> allocator)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
std::move(allocator)) {
bool strict_capacity_limit, double high_pri_pool_ratio)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
num_shards_ = 1 << num_shard_bits;
shards_ = reinterpret_cast<LRUCacheShard*>(
port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_));
@ -539,14 +537,12 @@ double LRUCache::GetHighPriPoolRatio() {
std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit,
cache_opts.high_pri_pool_ratio,
cache_opts.cache_allocator);
cache_opts.high_pri_pool_ratio);
}
std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> cache_allocator) {
std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit,
double high_pri_pool_ratio) {
if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces
}
@ -558,8 +554,7 @@ std::shared_ptr<Cache> NewLRUCache(
num_shard_bits = GetDefaultCacheShardBits(capacity);
}
return std::make_shared<LRUCache>(capacity, num_shard_bits,
strict_capacity_limit, high_pri_pool_ratio,
std::move(cache_allocator));
strict_capacity_limit, high_pri_pool_ratio);
}
} // namespace rocksdb

3
cache/lru_cache.h vendored
View File

@ -279,8 +279,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard {
class LRUCache : public ShardedCache {
public:
LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
double high_pri_pool_ratio);
virtual ~LRUCache();
virtual const char* Name() const override { return "LRUCache"; }
virtual CacheShard* GetShard(int shard) override;

View File

@ -20,10 +20,8 @@
namespace rocksdb {
ShardedCache::ShardedCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit,
std::shared_ptr<CacheAllocator> allocator)
: Cache(std::move(allocator)),
num_shard_bits_(num_shard_bits),
bool strict_capacity_limit)
: num_shard_bits_(num_shard_bits),
capacity_(capacity),
strict_capacity_limit_(strict_capacity_limit),
last_id_(1) {}
@ -144,9 +142,6 @@ std::string ShardedCache::GetPrintableOptions() const {
strict_capacity_limit_);
ret.append(buffer);
}
snprintf(buffer, kBufferSize, " cache_allocator : %s\n",
cache_allocator() ? cache_allocator()->Name() : "None");
ret.append(buffer);
ret.append(GetShard(0)->GetPrintableOptions());
return ret;
}

View File

@ -47,8 +47,7 @@ class CacheShard {
// Keys are sharded by the highest num_shard_bits bits of hash value.
class ShardedCache : public Cache {
public:
ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit);
virtual ~ShardedCache() = default;
virtual const char* Name() const override = 0;
virtual CacheShard* GetShard(int shard) = 0;

View File

@ -25,7 +25,6 @@
#include <stdint.h>
#include <memory>
#include <string>
#include "rocksdb/cache_allocator.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
@ -59,20 +58,13 @@ struct LRUCacheOptions {
// BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority.
double high_pri_pool_ratio = 0.0;
// If non-nullptr will use this allocator instead of system allocator when
// allocating memory for cache blocks. Call this method before you start using
// the cache!
std::shared_ptr<CacheAllocator> cache_allocator;
LRUCacheOptions() {}
LRUCacheOptions(size_t _capacity, int _num_shard_bits,
bool _strict_capacity_limit, double _high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> _cache_allocator = nullptr)
bool _strict_capacity_limit, double _high_pri_pool_ratio)
: capacity(_capacity),
num_shard_bits(_num_shard_bits),
strict_capacity_limit(_strict_capacity_limit),
high_pri_pool_ratio(_high_pri_pool_ratio),
cache_allocator(std::move(_cache_allocator)) {}
high_pri_pool_ratio(_high_pri_pool_ratio) {}
};
// Create a new cache with a fixed size capacity. The cache is sharded
@ -83,10 +75,10 @@ struct LRUCacheOptions {
// high_pri_pool_pct.
// num_shard_bits = -1 means it is automatically determined: every shard
// will be at least 512KB and number of shard bits will not exceed 6.
extern std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits = -1,
bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0,
std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
extern std::shared_ptr<Cache> NewLRUCache(size_t capacity,
int num_shard_bits = -1,
bool strict_capacity_limit = false,
double high_pri_pool_ratio = 0.0);
extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts);
@ -99,15 +91,13 @@ extern std::shared_ptr<Cache> NewClockCache(size_t capacity,
int num_shard_bits = -1,
bool strict_capacity_limit = false);
class Cache {
public:
// Depending on implementation, cache entries with high priority could be less
// likely to get evicted than low priority entries.
enum class Priority { HIGH, LOW };
Cache(std::shared_ptr<CacheAllocator> allocator = nullptr)
: cache_allocator_(std::move(allocator)) {}
Cache() {}
// Destroys all existing entries by calling the "deleter"
// function that was passed via the Insert() function.
@ -238,14 +228,10 @@ class Cache {
virtual void TEST_mark_as_data_block(const Slice& /*key*/,
size_t /*charge*/) {}
CacheAllocator* cache_allocator() const { return cache_allocator_.get(); }
private:
// No copying allowed
Cache(const Cache&);
Cache& operator=(const Cache&);
std::shared_ptr<CacheAllocator> cache_allocator_;
};
} // namespace rocksdb

View File

@ -1,29 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
// CacheAllocator is an interface that a client can implement to supply custom
// cache allocation and deallocation methods. See rocksdb/cache.h for more
// information.
// All methods should be thread-safe.
class CacheAllocator {
public:
virtual ~CacheAllocator() = default;
// Name of the cache allocator, printed in the log
virtual const char* Name() const = 0;
// Allocate a block of at least size size
virtual void* Allocate(size_t size) = 0;
// Deallocate previously allocated block
virtual void Deallocate(void* p) = 0;
// Returns the memory size of the block allocated at p. The default
// implementation that just returns the original allocation_size is fine.
virtual size_t UsableSize(void* /*p*/, size_t allocation_size) const {
// default implementation just returns the allocation size
return allocation_size;
}
};

View File

@ -39,7 +39,6 @@
#include "table/full_filter_block.h"
#include "table/table_builder.h"
#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -655,8 +654,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
size_t size = block_contents.size();
auto ubuf =
AllocateBlock(size + 1, block_cache_compressed->cache_allocator());
std::unique_ptr<char[]> ubuf(new char[size + 1]);
memcpy(ubuf.get(), block_contents.data(), size);
ubuf[size] = type;

View File

@ -80,12 +80,11 @@ Status ReadBlockFromFile(
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, CacheAllocator* allocator = nullptr,
const bool immortal_file = false) {
size_t read_amp_bytes_per_bit, const bool immortal_file = false) {
BlockContents contents;
BlockFetcher block_fetcher(
file, prefetch_buffer, footer, options, handle, &contents, ioptions,
do_uncompress, compression_dict, cache_options, allocator, immortal_file);
BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
&contents, ioptions, do_uncompress,
compression_dict, cache_options, immortal_file);
Status s = block_fetcher.ReadBlockContents();
if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno,
@ -95,13 +94,6 @@ Status ReadBlockFromFile(
return s;
}
inline CacheAllocator* GetCacheAllocator(
const BlockBasedTableOptions& table_options) {
return table_options.block_cache.get()
? table_options.block_cache->cache_allocator()
: nullptr;
}
// Delete the resource that is held by the iterator.
template <class ResourceType>
void DeleteHeldResource(void* arg, void* /*ignored*/) {
@ -1158,8 +1150,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options, kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */,
GetCacheAllocator(rep->table_options));
0 /* read_amp_bytes_per_bit */);
if (!s.ok()) {
ROCKS_LOG_ERROR(rep->ioptions.info_log,
@ -1182,7 +1173,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
GetContext* get_context, CacheAllocator* allocator) {
GetContext* get_context) {
Status s;
Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
@ -1239,7 +1230,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
compression_dict);
s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
compressed_block->size(), &contents,
format_version, ioptions, allocator);
format_version, ioptions);
// Insert uncompressed block into block cache
if (s.ok()) {
@ -1301,8 +1292,7 @@ Status BlockBasedTable::PutDataBlockToCache(
const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
Cache::Priority priority, GetContext* get_context,
CacheAllocator* allocator) {
Cache::Priority priority, GetContext* get_context) {
assert(raw_block->compression_type() == kNoCompression ||
block_cache_compressed != nullptr);
@ -1315,7 +1305,7 @@ Status BlockBasedTable::PutDataBlockToCache(
compression_dict);
s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
raw_block->size(), &contents, format_version,
ioptions, allocator);
ioptions);
}
if (!s.ok()) {
delete raw_block;
@ -1412,8 +1402,7 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer,
ReadOptions(), filter_handle, &block,
rep->ioptions, false /* decompress */,
dummy_comp_dict, rep->persistent_cache_options,
GetCacheAllocator(rep->table_options));
dummy_comp_dict, rep->persistent_cache_options);
Status s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
@ -1711,9 +1700,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
&block_value, rep->ioptions, rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit,
GetCacheAllocator(rep->table_options),
rep->immortal_table);
rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
}
if (s.ok()) {
block.value = block_value.release();
@ -1805,8 +1792,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
s = GetDataBlockFromCache(
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
block_entry, rep->table_options.format_version, compression_dict,
rep->table_options.read_amp_bytes_per_bit, is_index, get_context,
GetCacheAllocator(rep->table_options));
rep->table_options.read_amp_bytes_per_bit, is_index, get_context);
if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
std::unique_ptr<Block> raw_block;
@ -1818,9 +1804,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
block_cache_compressed == nullptr && rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit,
GetCacheAllocator(rep->table_options),
rep->immortal_table);
rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
}
if (s.ok()) {
@ -1833,7 +1817,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH
: Cache::Priority::LOW,
get_context, GetCacheAllocator(rep->table_options));
get_context);
}
}
}
@ -2540,12 +2524,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
BlockHandle handle = index_iter->value();
BlockContents contents;
Slice dummy_comp_dict;
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
ReadOptions(), handle, &contents, rep_->ioptions,
false /* decompress */, dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options,
GetCacheAllocator(rep_->table_options));
BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
rep_->footer, ReadOptions(), handle, &contents,
rep_->ioptions, false /* decompress */,
dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
@ -2567,12 +2550,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
s = handle.DecodeFrom(&input);
BlockContents contents;
Slice dummy_comp_dict;
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
ReadOptions(), handle, &contents, rep_->ioptions,
false /* decompress */, dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options,
GetCacheAllocator(rep_->table_options));
BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
rep_->footer, ReadOptions(), handle, &contents,
rep_->ioptions, false /* decompress */,
dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
@ -2876,8 +2858,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file,
rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer,
ReadOptions(), handle, &block, rep_->ioptions,
false /*decompress*/, dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options,
GetCacheAllocator(rep_->table_options));
rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
rep_->filter.reset(new BlockBasedFilterBlockReader(

View File

@ -303,8 +303,7 @@ class BlockBasedTable : public TableReader {
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, GetContext* get_context = nullptr,
CacheAllocator* allocator = nullptr);
bool is_index = false, GetContext* get_context = nullptr);
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then
@ -323,7 +322,7 @@ class BlockBasedTable : public TableReader {
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
GetContext* get_context = nullptr, CacheAllocator* allocator = nullptr);
GetContext* get_context = nullptr);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.

View File

@ -17,9 +17,8 @@
#include "rocksdb/env.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
#include "table/format.h"
#include "table/persistent_cache_helper.h"
#include "util/cache_allocator.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -108,11 +107,9 @@ bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
if (cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
// lookup uncompressed cache mode p-cache
std::unique_ptr<char[]> raw_data;
status_ = PersistentCacheHelper::LookupRawPage(
cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
cache_options_, handle_, &heap_buf_, block_size_ + kBlockTrailerSize);
if (status_.ok()) {
heap_buf_ = CacheAllocationPtr(raw_data.release());
used_buf_ = heap_buf_.get();
slice_ = Slice(heap_buf_.get(), block_size_);
return true;
@ -135,7 +132,7 @@ void BlockFetcher::PrepareBufferForBlockFromFile() {
// trivially allocated stack buffer instead of needing a full malloc()
used_buf_ = &stack_buf_[0];
} else {
heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]);
used_buf_ = heap_buf_.get();
}
}
@ -173,7 +170,7 @@ void BlockFetcher::GetBlockContents() {
// or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
assert(used_buf_ != heap_buf_.get());
heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]);
memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
}
*contents_ = BlockContents(std::move(heap_buf_), block_size_, true,
@ -231,9 +228,9 @@ Status BlockFetcher::ReadBlockContents() {
if (do_uncompress_ && compression_type != kNoCompression) {
// compressed page, uncompress, update cache
UncompressionContext uncompression_ctx(compression_type, compression_dict_);
status_ = UncompressBlockContents(uncompression_ctx, slice_.data(),
block_size_, contents_, footer_.version(),
ioptions_, allocator_);
status_ =
UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_,
contents_, footer_.version(), ioptions_);
} else {
GetBlockContents();
}

View File

@ -11,8 +11,6 @@
#include "table/block.h"
#include "table/format.h"
#include "util/cache_allocator.h"
namespace rocksdb {
class BlockFetcher {
public:
@ -28,7 +26,6 @@ class BlockFetcher {
BlockContents* contents, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
CacheAllocator* allocator = nullptr,
const bool immortal_source = false)
: file_(file),
prefetch_buffer_(prefetch_buffer),
@ -40,8 +37,7 @@ class BlockFetcher {
do_uncompress_(do_uncompress),
immortal_source_(immortal_source),
compression_dict_(compression_dict),
cache_options_(cache_options),
allocator_(allocator) {}
cache_options_(cache_options) {}
Status ReadBlockContents();
private:
@ -58,12 +54,11 @@ class BlockFetcher {
const bool immortal_source_;
const Slice& compression_dict_;
const PersistentCacheOptions& cache_options_;
CacheAllocator* allocator_;
Status status_;
Slice slice_;
char* used_buf_ = nullptr;
size_t block_size_;
CacheAllocationPtr heap_buf_;
std::unique_ptr<char[]> heap_buf_;
char stack_buf_[kDefaultStackBufferSize];
bool got_from_prefetch_buffer_ = false;
rocksdb::CompressionType compression_type;

View File

@ -19,7 +19,6 @@
#include "table/block_based_table_reader.h"
#include "table/block_fetcher.h"
#include "table/persistent_cache_helper.h"
#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -280,9 +279,8 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions,
CacheAllocator* allocator) {
CacheAllocationPtr ubuf;
const ImmutableCFOptions& ioptions) {
std::unique_ptr<char[]> ubuf;
assert(uncompression_ctx.type() != kNoCompression &&
"Invalid compression type");
@ -298,7 +296,7 @@ Status UncompressBlockContentsForCompressionType(
if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
return Status::Corruption(snappy_corrupt_msg);
}
ubuf = AllocateBlock(ulength, allocator);
ubuf.reset(new char[ulength]);
if (!Snappy_Uncompress(data, n, ubuf.get())) {
return Status::Corruption(snappy_corrupt_msg);
}
@ -306,10 +304,9 @@ Status UncompressBlockContentsForCompressionType(
break;
}
case kZlibCompression:
ubuf = Zlib_Uncompress(
ubuf.reset(Zlib_Uncompress(
uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kZlibCompression, format_version),
allocator);
GetCompressFormatForVersion(kZlibCompression, format_version)));
if (!ubuf) {
static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents";
@ -319,10 +316,9 @@ Status UncompressBlockContentsForCompressionType(
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kBZip2Compression:
ubuf = BZip2_Uncompress(
ubuf.reset(BZip2_Uncompress(
data, n, &decompress_size,
GetCompressFormatForVersion(kBZip2Compression, format_version),
allocator);
GetCompressFormatForVersion(kBZip2Compression, format_version)));
if (!ubuf) {
static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents";
@ -332,10 +328,9 @@ Status UncompressBlockContentsForCompressionType(
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kLZ4Compression:
ubuf = LZ4_Uncompress(
ubuf.reset(LZ4_Uncompress(
uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4Compression, format_version),
allocator);
GetCompressFormatForVersion(kLZ4Compression, format_version)));
if (!ubuf) {
static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents";
@ -345,10 +340,9 @@ Status UncompressBlockContentsForCompressionType(
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kLZ4HCCompression:
ubuf = LZ4_Uncompress(
ubuf.reset(LZ4_Uncompress(
uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
allocator);
GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
if (!ubuf) {
static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents";
@ -358,8 +352,6 @@ Status UncompressBlockContentsForCompressionType(
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kXpressCompression:
// XPRESS allocates memory internally, thus no support for custom
// allocator.
ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
if (!ubuf) {
static char xpress_corrupt_msg[] =
@ -371,8 +363,7 @@ Status UncompressBlockContentsForCompressionType(
break;
case kZSTD:
case kZSTDNotFinalCompression:
ubuf = ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size,
allocator);
ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size));
if (!ubuf) {
static char zstd_corrupt_msg[] =
"ZSTD not supported or corrupted ZSTD compressed block contents";
@ -405,13 +396,11 @@ Status UncompressBlockContentsForCompressionType(
Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions,
CacheAllocator* allocator) {
const ImmutableCFOptions& ioptions) {
assert(data[n] != kNoCompression);
assert(data[n] == uncompression_ctx.type());
return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n,
contents, format_version,
ioptions, allocator);
return UncompressBlockContentsForCompressionType(
uncompression_ctx, data, n, contents, format_version, ioptions);
}
} // namespace rocksdb

View File

@ -26,7 +26,6 @@
#include "port/port.h" // noexcept
#include "table/persistent_cache_options.h"
#include "util/file_reader_writer.h"
#include "util/cache_allocator.h"
namespace rocksdb {
@ -193,7 +192,7 @@ struct BlockContents {
Slice data; // Actual contents of data
bool cachable; // True iff data can be cached
CompressionType compression_type;
CacheAllocationPtr allocation;
std::unique_ptr<char[]> allocation;
BlockContents() : cachable(false), compression_type(kNoCompression) {}
@ -201,28 +200,16 @@ struct BlockContents {
CompressionType _compression_type)
: data(_data), cachable(_cachable), compression_type(_compression_type) {}
BlockContents(CacheAllocationPtr&& _data, size_t _size, bool _cachable,
BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
CompressionType _compression_type)
: data(_data.get(), _size),
cachable(_cachable),
compression_type(_compression_type),
allocation(std::move(_data)) {}
BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
CompressionType _compression_type)
: data(_data.get(), _size),
cachable(_cachable),
compression_type(_compression_type) {
allocation.reset(_data.release());
}
// The additional memory space taken by the block data.
size_t usable_size() const {
if (allocation.get() != nullptr) {
auto allocator = allocation.get_deleter().allocator;
if (allocator) {
return allocator->UsableSize(allocation.get(), data.size());
}
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
return malloc_usable_size(allocation.get());
#else
@ -265,7 +252,7 @@ extern Status ReadBlockContents(
extern Status UncompressBlockContents(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version,
const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr);
const ImmutableCFOptions& ioptions);
// This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks
@ -273,7 +260,7 @@ extern Status UncompressBlockContents(
extern Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version,
const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr);
const ImmutableCFOptions& ioptions);
// Implementation details follow. Clients should ignore,

View File

@ -153,8 +153,8 @@ class PlainTableReader: public TableReader {
DynamicBloom bloom_;
PlainTableReaderFileInfo file_info_;
Arena arena_;
CacheAllocationPtr index_block_alloc_;
CacheAllocationPtr bloom_block_alloc_;
std::unique_ptr<char[]> index_block_alloc_;
std::unique_ptr<char[]> bloom_block_alloc_;
const ImmutableCFOptions& ioptions_;
uint64_t file_size_;

View File

@ -2477,78 +2477,6 @@ TEST_P(BlockBasedTableTest, BlockCacheLeak) {
c.ResetTableReader();
}
namespace {
class CustomCacheAllocator : public CacheAllocator {
public:
virtual const char* Name() const override { return "CustomCacheAllocator"; }
void* Allocate(size_t size) override {
++numAllocations;
auto ptr = new char[size + 16];
memcpy(ptr, "cache_allocator_", 16); // mangle first 16 bytes
return reinterpret_cast<void*>(ptr + 16);
}
void Deallocate(void* p) override {
++numDeallocations;
char* ptr = reinterpret_cast<char*>(p) - 16;
delete[] ptr;
}
std::atomic<int> numAllocations;
std::atomic<int> numDeallocations;
};
} // namespace
TEST_P(BlockBasedTableTest, CacheAllocator) {
auto custom_cache_allocator = std::make_shared<CustomCacheAllocator>();
{
Options opt;
unique_ptr<InternalKeyComparator> ikc;
ikc.reset(new test::PlainInternalKeyComparator(opt.comparator));
opt.compression = kNoCompression;
BlockBasedTableOptions table_options;
table_options.block_size = 1024;
LRUCacheOptions lruOptions;
lruOptions.cache_allocator = custom_cache_allocator;
lruOptions.capacity = 16 * 1024 * 1024;
lruOptions.num_shard_bits = 4;
table_options.block_cache = NewLRUCache(std::move(lruOptions));
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
TableConstructor c(BytewiseComparator(),
true /* convert_to_internal_key_ */);
c.Add("k01", "hello");
c.Add("k02", "hello2");
c.Add("k03", std::string(10000, 'x'));
c.Add("k04", std::string(200000, 'x'));
c.Add("k05", std::string(300000, 'x'));
c.Add("k06", "hello3");
c.Add("k07", std::string(100000, 'x'));
std::vector<std::string> keys;
stl_wrappers::KVMap kvmap;
const ImmutableCFOptions ioptions(opt);
const MutableCFOptions moptions(opt);
c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys, &kvmap);
unique_ptr<InternalIterator> iter(
c.NewIterator(moptions.prefix_extractor.get()));
iter->SeekToFirst();
while (iter->Valid()) {
iter->key();
iter->value();
iter->Next();
}
ASSERT_OK(iter->status());
}
// out of scope, block cache should have been deleted, all allocations
// deallocated
EXPECT_EQ(custom_cache_allocator->numAllocations.load(),
custom_cache_allocator->numDeallocations.load());
// make sure that allocations actually happened through the cache allocator
EXPECT_GT(custom_cache_allocator->numAllocations.load(), 0);
}
TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) {
// A regression test to avoid data race described in
// https://github.com/facebook/rocksdb/issues/1267

View File

@ -3040,7 +3040,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
int64_t bytes = 0;
int decompress_size;
while (ok && bytes < 1024 * 1048576) {
CacheAllocationPtr uncompressed;
char *uncompressed = nullptr;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression: {
// get size and allocate here to make comparison fair
@ -3050,44 +3050,45 @@ void VerifyDBFromDB(std::string& truth_db_name) {
ok = false;
break;
}
uncompressed = AllocateBlock(ulength, nullptr);
uncompressed = new char[ulength];
ok = Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed.get());
uncompressed);
break;
}
case rocksdb::kZlibCompression:
uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed.get() != nullptr;
ok = uncompressed != nullptr;
break;
case rocksdb::kBZip2Compression:
uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
&decompress_size, 2);
ok = uncompressed.get() != nullptr;
ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4Compression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed.get() != nullptr;
ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed.get() != nullptr;
ok = uncompressed != nullptr;
break;
case rocksdb::kXpressCompression:
uncompressed.reset(XPRESS_Uncompress(
compressed.data(), compressed.size(), &decompress_size));
ok = uncompressed.get() != nullptr;
uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
&decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kZSTD:
uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size);
ok = uncompressed.get() != nullptr;
ok = uncompressed != nullptr;
break;
default:
ok = false;
}
delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
}

View File

@ -1,38 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include "rocksdb/cache_allocator.h"
namespace rocksdb {
struct CustomDeleter {
CustomDeleter(CacheAllocator* a = nullptr) : allocator(a) {}
void operator()(char* ptr) const {
if (allocator) {
allocator->Deallocate(reinterpret_cast<void*>(ptr));
} else {
delete[] ptr;
}
}
CacheAllocator* allocator;
};
using CacheAllocationPtr = std::unique_ptr<char[], CustomDeleter>;
inline CacheAllocationPtr AllocateBlock(size_t size,
CacheAllocator* allocator) {
if (allocator) {
auto block = reinterpret_cast<char*>(allocator->Allocate(size));
return CacheAllocationPtr(block, allocator);
}
return CacheAllocationPtr(new char[size]);
}
} // namespace rocksdb

View File

@ -14,8 +14,6 @@
#include <string>
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression_context_cache.h"
@ -497,10 +495,11 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
// header in varint32 format
// @param compression_dict Data for presetting the compression library's
// dictionary.
inline CacheAllocationPtr Zlib_Uncompress(
const UncompressionContext& ctx, const char* input_data,
size_t input_length, int* decompress_size, uint32_t compress_format_version,
CacheAllocator* allocator = nullptr, int windowBits = -14) {
inline char* Zlib_Uncompress(const UncompressionContext& ctx,
const char* input_data, size_t input_length,
int* decompress_size,
uint32_t compress_format_version,
int windowBits = -14) {
#ifdef ZLIB
uint32_t output_len = 0;
if (compress_format_version == 2) {
@ -542,9 +541,9 @@ inline CacheAllocationPtr Zlib_Uncompress(
_stream.next_in = (Bytef*)input_data;
_stream.avail_in = static_cast<unsigned int>(input_length);
auto output = AllocateBlock(output_len, allocator);
char* output = new char[output_len];
_stream.next_out = (Bytef*)output.get();
_stream.next_out = (Bytef*)output;
_stream.avail_out = static_cast<unsigned int>(output_len);
bool done = false;
@ -562,17 +561,19 @@ inline CacheAllocationPtr Zlib_Uncompress(
size_t old_sz = output_len;
uint32_t output_len_delta = output_len / 5;
output_len += output_len_delta < 10 ? 10 : output_len_delta;
auto tmp = AllocateBlock(output_len, allocator);
memcpy(tmp.get(), output.get(), old_sz);
output = std::move(tmp);
char* tmp = new char[output_len];
memcpy(tmp, output, old_sz);
delete[] output;
output = tmp;
// Set more output.
_stream.next_out = (Bytef*)(output.get() + old_sz);
_stream.next_out = (Bytef*)(output + old_sz);
_stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
break;
}
case Z_BUF_ERROR:
default:
delete[] output;
inflateEnd(&_stream);
return nullptr;
}
@ -659,9 +660,9 @@ inline bool BZip2_Compress(const CompressionContext& /*ctx*/,
// block header
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
inline CacheAllocationPtr BZip2_Uncompress(
const char* input_data, size_t input_length, int* decompress_size,
uint32_t compress_format_version, CacheAllocator* allocator = nullptr) {
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
int* decompress_size,
uint32_t compress_format_version) {
#ifdef BZIP2
uint32_t output_len = 0;
if (compress_format_version == 2) {
@ -689,9 +690,9 @@ inline CacheAllocationPtr BZip2_Uncompress(
_stream.next_in = (char*)input_data;
_stream.avail_in = static_cast<unsigned int>(input_length);
auto output = AllocateBlock(output_len, allocator);
char* output = new char[output_len];
_stream.next_out = (char*)output.get();
_stream.next_out = (char*)output;
_stream.avail_out = static_cast<unsigned int>(output_len);
bool done = false;
@ -708,16 +709,18 @@ inline CacheAllocationPtr BZip2_Uncompress(
assert(compress_format_version != 2);
uint32_t old_sz = output_len;
output_len = output_len * 1.2;
auto tmp = AllocateBlock(output_len, allocator);
memcpy(tmp.get(), output.get(), old_sz);
output = std::move(tmp);
char* tmp = new char[output_len];
memcpy(tmp, output, old_sz);
delete[] output;
output = tmp;
// Set more output.
_stream.next_out = (char*)(output.get() + old_sz);
_stream.next_out = (char*)(output + old_sz);
_stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
break;
}
default:
delete[] output;
BZ2_bzDecompressEnd(&_stream);
return nullptr;
}
@ -811,12 +814,10 @@ inline bool LZ4_Compress(const CompressionContext& ctx,
// header in varint32 format
// @param compression_dict Data for presetting the compression library's
// dictionary.
inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
const char* input_data,
size_t input_length,
int* decompress_size,
uint32_t compress_format_version,
CacheAllocator* allocator = nullptr) {
inline char* LZ4_Uncompress(const UncompressionContext& ctx,
const char* input_data, size_t input_length,
int* decompress_size,
uint32_t compress_format_version) {
#ifdef LZ4
uint32_t output_len = 0;
if (compress_format_version == 2) {
@ -836,7 +837,7 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
input_data += 8;
}
auto output = AllocateBlock(output_len, allocator);
char* output = new char[output_len];
#if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
if (ctx.dict().size()) {
@ -844,16 +845,17 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
static_cast<int>(ctx.dict().size()));
}
*decompress_size = LZ4_decompress_safe_continue(
stream, input_data, output.get(), static_cast<int>(input_length),
stream, input_data, output, static_cast<int>(input_length),
static_cast<int>(output_len));
LZ4_freeStreamDecode(stream);
#else // up to r123
*decompress_size = LZ4_decompress_safe(input_data, output.get(),
static_cast<int>(input_length),
static_cast<int>(output_len));
*decompress_size =
LZ4_decompress_safe(input_data, output, static_cast<int>(input_length),
static_cast<int>(output_len));
#endif // LZ4_VERSION_NUMBER >= 10400
if (*decompress_size < 0) {
delete[] output;
return nullptr;
}
assert(*decompress_size == static_cast<int>(output_len));
@ -864,7 +866,6 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
(void)input_length;
(void)decompress_size;
(void)compress_format_version;
(void)allocator;
return nullptr;
#endif
}
@ -1027,11 +1028,9 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
// @param compression_dict Data for presetting the compression library's
// dictionary.
inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx,
const char* input_data,
size_t input_length,
int* decompress_size,
CacheAllocator* allocator = nullptr) {
inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
const char* input_data, size_t input_length,
int* decompress_size) {
#ifdef ZSTD
uint32_t output_len = 0;
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
@ -1039,17 +1038,17 @@ inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx,
return nullptr;
}
auto output = AllocateBlock(output_len, allocator);
char* output = new char[output_len];
size_t actual_output_length;
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_DCtx* context = ctx.GetZSTDContext();
assert(context != nullptr);
actual_output_length = ZSTD_decompress_usingDict(
context, output.get(), output_len, input_data, input_length,
ctx.dict().data(), ctx.dict().size());
context, output, output_len, input_data, input_length, ctx.dict().data(),
ctx.dict().size());
#else // up to v0.4.x
actual_output_length =
ZSTD_decompress(output.get(), output_len, input_data, input_length);
ZSTD_decompress(output, output_len, input_data, input_length);
#endif // ZSTD_VERSION_NUMBER >= 500
assert(actual_output_length == output_len);
*decompress_size = static_cast<int>(actual_output_length);
@ -1059,7 +1058,6 @@ inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx,
(void)input_data;
(void)input_length;
(void)decompress_size;
(void)allocator;
return nullptr;
#endif
}