Insert range deletion meta-block into block cache
Summary: This handles two issues: (1) range deletion iterator sometimes outlives the table reader that created it, in which case the block must not be destroyed during table reader destruction; and (2) we prefer to read these range tombstone meta-blocks from file fewer times. - Extracted cache-populating logic from NewDataBlockIterator() into a separate function: MaybeLoadDataBlockToCache() - Use MaybeLoadDataBlockToCache() to load range deletion meta-block and pin it through the reader's lifetime. This code reuse works since range deletion meta-block has same format as data blocks. - Use NewDataBlockIterator() to create range deletion iterators, which uses block cache if enabled, otherwise reads the block from file. Either way, the underlying block won't disappear until after the iterator is destroyed. Closes https://github.com/facebook/rocksdb/pull/1459 Differential Revision: D4123175 Pulled By: ajkr fbshipit-source-id: 8f64281
This commit is contained in:
parent
9e7cf3469b
commit
815f54afad
@ -371,7 +371,7 @@ struct BlockBasedTable::Rep {
|
|||||||
filter_type(FilterType::kNoFilter),
|
filter_type(FilterType::kNoFilter),
|
||||||
whole_key_filtering(_table_opt.whole_key_filtering),
|
whole_key_filtering(_table_opt.whole_key_filtering),
|
||||||
prefix_filtering(true),
|
prefix_filtering(true),
|
||||||
range_del_block(nullptr),
|
range_del_handle(BlockHandle::NullBlockHandle()),
|
||||||
global_seqno(kDisableGlobalSequenceNumber) {}
|
global_seqno(kDisableGlobalSequenceNumber) {}
|
||||||
|
|
||||||
const ImmutableCFOptions& ioptions;
|
const ImmutableCFOptions& ioptions;
|
||||||
@ -430,7 +430,10 @@ struct BlockBasedTable::Rep {
|
|||||||
// the LRU cache will never push flush them out, hence they're pinned
|
// the LRU cache will never push flush them out, hence they're pinned
|
||||||
CachableEntry<FilterBlockReader> filter_entry;
|
CachableEntry<FilterBlockReader> filter_entry;
|
||||||
CachableEntry<IndexReader> index_entry;
|
CachableEntry<IndexReader> index_entry;
|
||||||
unique_ptr<Block> range_del_block;
|
// range deletion meta-block is pinned through reader's lifetime when LRU
|
||||||
|
// cache is enabled.
|
||||||
|
CachableEntry<Block> range_del_entry;
|
||||||
|
BlockHandle range_del_handle;
|
||||||
|
|
||||||
// If global_seqno is used, all Keys in this file will have the same
|
// If global_seqno is used, all Keys in this file will have the same
|
||||||
// seqno with value `global_seqno`.
|
// seqno with value `global_seqno`.
|
||||||
@ -702,29 +705,23 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Read the range del meta block
|
// Read the range del meta block
|
||||||
// TODO(wanning&andrewkr): cache range delete tombstone block
|
|
||||||
bool found_range_del_block;
|
bool found_range_del_block;
|
||||||
BlockHandle range_del_handle;
|
|
||||||
s = SeekToRangeDelBlock(meta_iter.get(), &found_range_del_block,
|
s = SeekToRangeDelBlock(meta_iter.get(), &found_range_del_block,
|
||||||
&range_del_handle);
|
&rep->range_del_handle);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log,
|
Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log,
|
||||||
"Error when seeking to range delete tombstones block from file: %s",
|
"Error when seeking to range delete tombstones block from file: %s",
|
||||||
s.ToString().c_str());
|
s.ToString().c_str());
|
||||||
} else {
|
} else {
|
||||||
if (found_range_del_block && !range_del_handle.IsNull()) {
|
if (found_range_del_block && !rep->range_del_handle.IsNull()) {
|
||||||
BlockContents range_del_block_contents;
|
|
||||||
ReadOptions read_options;
|
ReadOptions read_options;
|
||||||
s = ReadBlockContents(rep->file.get(), rep->footer, read_options,
|
s = MaybeLoadDataBlockToCache(rep, read_options, rep->range_del_handle,
|
||||||
range_del_handle, &range_del_block_contents,
|
Slice() /* compression_dict */,
|
||||||
rep->ioptions, false /* decompressed */);
|
&rep->range_del_entry);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log,
|
Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log,
|
||||||
"Encountered error while reading data from range del block %s",
|
"Encountered error while reading data from range del block %s",
|
||||||
s.ToString().c_str());
|
s.ToString().c_str());
|
||||||
} else {
|
|
||||||
rep->range_del_block.reset(new Block(
|
|
||||||
std::move(range_del_block_contents), kDisableGlobalSequenceNumber));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1241,72 +1238,18 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
|
|||||||
|
|
||||||
const bool no_io = (ro.read_tier == kBlockCacheTier);
|
const bool no_io = (ro.read_tier == kBlockCacheTier);
|
||||||
Cache* block_cache = rep->table_options.block_cache.get();
|
Cache* block_cache = rep->table_options.block_cache.get();
|
||||||
Cache* block_cache_compressed =
|
|
||||||
rep->table_options.block_cache_compressed.get();
|
|
||||||
CachableEntry<Block> block;
|
CachableEntry<Block> block;
|
||||||
|
|
||||||
BlockHandle handle;
|
BlockHandle handle;
|
||||||
Slice input = index_value;
|
Slice input = index_value;
|
||||||
// We intentionally allow extra stuff in index_value so that we
|
// We intentionally allow extra stuff in index_value so that we
|
||||||
// can add more features in the future.
|
// can add more features in the future.
|
||||||
Status s = handle.DecodeFrom(&input);
|
Status s = handle.DecodeFrom(&input);
|
||||||
|
|
||||||
if (!s.ok()) {
|
|
||||||
if (input_iter != nullptr) {
|
|
||||||
input_iter->SetStatus(s);
|
|
||||||
return input_iter;
|
|
||||||
} else {
|
|
||||||
return NewErrorInternalIterator(s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Slice compression_dict;
|
Slice compression_dict;
|
||||||
if (rep->compression_dict_block) {
|
if (s.ok()) {
|
||||||
compression_dict = rep->compression_dict_block->data;
|
if (rep->compression_dict_block) {
|
||||||
}
|
compression_dict = rep->compression_dict_block->data;
|
||||||
// If either block cache is enabled, we'll try to read from it.
|
|
||||||
if (block_cache != nullptr || block_cache_compressed != nullptr) {
|
|
||||||
Statistics* statistics = rep->ioptions.statistics;
|
|
||||||
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
|
|
||||||
char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
|
|
||||||
Slice key, /* key to the block cache */
|
|
||||||
ckey /* key to the compressed block cache */;
|
|
||||||
|
|
||||||
// create key for block cache
|
|
||||||
if (block_cache != nullptr) {
|
|
||||||
key = GetCacheKey(rep->cache_key_prefix, rep->cache_key_prefix_size,
|
|
||||||
handle, cache_key);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (block_cache_compressed != nullptr) {
|
|
||||||
ckey = GetCacheKey(rep->compressed_cache_key_prefix,
|
|
||||||
rep->compressed_cache_key_prefix_size, handle,
|
|
||||||
compressed_cache_key);
|
|
||||||
}
|
|
||||||
|
|
||||||
s = GetDataBlockFromCache(
|
|
||||||
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
|
|
||||||
&block, rep->table_options.format_version, compression_dict,
|
|
||||||
rep->table_options.read_amp_bytes_per_bit);
|
|
||||||
|
|
||||||
if (block.value == nullptr && !no_io && ro.fill_cache) {
|
|
||||||
std::unique_ptr<Block> raw_block;
|
|
||||||
{
|
|
||||||
StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
|
|
||||||
s = ReadBlockFromFile(
|
|
||||||
rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions,
|
|
||||||
block_cache_compressed == nullptr, compression_dict,
|
|
||||||
rep->persistent_cache_options, rep->global_seqno,
|
|
||||||
rep->table_options.read_amp_bytes_per_bit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (s.ok()) {
|
|
||||||
s = PutDataBlockToCache(
|
|
||||||
key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions,
|
|
||||||
&block, raw_block.release(), rep->table_options.format_version,
|
|
||||||
compression_dict, rep->table_options.read_amp_bytes_per_bit);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
s = MaybeLoadDataBlockToCache(rep, ro, handle, compression_dict, &block);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Didn't get any data from block caches.
|
// Didn't get any data from block caches.
|
||||||
@ -1337,7 +1280,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
|
|||||||
rep->ioptions.statistics);
|
rep->ioptions.statistics);
|
||||||
if (block.cache_handle != nullptr) {
|
if (block.cache_handle != nullptr) {
|
||||||
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
|
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
|
||||||
block.cache_handle);
|
block.cache_handle);
|
||||||
} else {
|
} else {
|
||||||
iter->RegisterCleanup(&DeleteHeldResource<Block>, block.value, nullptr);
|
iter->RegisterCleanup(&DeleteHeldResource<Block>, block.value, nullptr);
|
||||||
}
|
}
|
||||||
@ -1353,6 +1296,62 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
|
|||||||
return iter;
|
return iter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status BlockBasedTable::MaybeLoadDataBlockToCache(
|
||||||
|
Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
|
||||||
|
Slice compression_dict, CachableEntry<Block>* block_entry) {
|
||||||
|
const bool no_io = (ro.read_tier == kBlockCacheTier);
|
||||||
|
Cache* block_cache = rep->table_options.block_cache.get();
|
||||||
|
Cache* block_cache_compressed =
|
||||||
|
rep->table_options.block_cache_compressed.get();
|
||||||
|
|
||||||
|
// If either block cache is enabled, we'll try to read from it.
|
||||||
|
Status s;
|
||||||
|
if (block_cache != nullptr || block_cache_compressed != nullptr) {
|
||||||
|
Statistics* statistics = rep->ioptions.statistics;
|
||||||
|
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
|
||||||
|
char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
|
||||||
|
Slice key, /* key to the block cache */
|
||||||
|
ckey /* key to the compressed block cache */;
|
||||||
|
|
||||||
|
// create key for block cache
|
||||||
|
if (block_cache != nullptr) {
|
||||||
|
key = GetCacheKey(rep->cache_key_prefix, rep->cache_key_prefix_size,
|
||||||
|
handle, cache_key);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (block_cache_compressed != nullptr) {
|
||||||
|
ckey = GetCacheKey(rep->compressed_cache_key_prefix,
|
||||||
|
rep->compressed_cache_key_prefix_size, handle,
|
||||||
|
compressed_cache_key);
|
||||||
|
}
|
||||||
|
|
||||||
|
s = GetDataBlockFromCache(
|
||||||
|
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
|
||||||
|
block_entry, rep->table_options.format_version, compression_dict,
|
||||||
|
rep->table_options.read_amp_bytes_per_bit);
|
||||||
|
|
||||||
|
if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
|
||||||
|
std::unique_ptr<Block> raw_block;
|
||||||
|
{
|
||||||
|
StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
|
||||||
|
s = ReadBlockFromFile(
|
||||||
|
rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions,
|
||||||
|
block_cache_compressed == nullptr, compression_dict,
|
||||||
|
rep->persistent_cache_options, rep->global_seqno,
|
||||||
|
rep->table_options.read_amp_bytes_per_bit);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (s.ok()) {
|
||||||
|
s = PutDataBlockToCache(
|
||||||
|
key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions,
|
||||||
|
block_entry, raw_block.release(), rep->table_options.format_version,
|
||||||
|
compression_dict, rep->table_options.read_amp_bytes_per_bit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
|
class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
|
||||||
public:
|
public:
|
||||||
BlockEntryIteratorState(BlockBasedTable* table,
|
BlockEntryIteratorState(BlockBasedTable* table,
|
||||||
@ -1489,12 +1488,15 @@ InternalIterator* BlockBasedTable::NewIterator(const ReadOptions& read_options,
|
|||||||
|
|
||||||
InternalIterator* BlockBasedTable::NewRangeTombstoneIterator(
|
InternalIterator* BlockBasedTable::NewRangeTombstoneIterator(
|
||||||
const ReadOptions& read_options) {
|
const ReadOptions& read_options) {
|
||||||
if (rep_->range_del_block.get() != nullptr) {
|
if (rep_->range_del_handle.IsNull()) {
|
||||||
auto iter =
|
return NewEmptyInternalIterator();
|
||||||
rep_->range_del_block->NewIterator(&(rep_->internal_comparator));
|
|
||||||
return iter;
|
|
||||||
}
|
}
|
||||||
return NewEmptyInternalIterator();
|
std::string str;
|
||||||
|
rep_->range_del_handle.EncodeTo(&str);
|
||||||
|
// Even though range_del_entry already references the meta-block when block
|
||||||
|
// cache is enabled, we still call the below function to get another reference
|
||||||
|
// since the caller may need the iterator beyond this table reader's lifetime.
|
||||||
|
return NewDataBlockIterator(rep_, read_options, Slice(str));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool BlockBasedTable::FullFilterKeyMayMatch(const ReadOptions& read_options,
|
bool BlockBasedTable::FullFilterKeyMayMatch(const ReadOptions& read_options,
|
||||||
@ -1968,6 +1970,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
|||||||
void BlockBasedTable::Close() {
|
void BlockBasedTable::Close() {
|
||||||
rep_->filter_entry.Release(rep_->table_options.block_cache.get());
|
rep_->filter_entry.Release(rep_->table_options.block_cache.get());
|
||||||
rep_->index_entry.Release(rep_->table_options.block_cache.get());
|
rep_->index_entry.Release(rep_->table_options.block_cache.get());
|
||||||
|
rep_->range_del_entry.Release(rep_->table_options.block_cache.get());
|
||||||
// cleanup index and filter blocks to avoid accessing dangling pointer
|
// cleanup index and filter blocks to avoid accessing dangling pointer
|
||||||
if (!rep_->table_options.no_block_cache) {
|
if (!rep_->table_options.no_block_cache) {
|
||||||
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
|
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
|
||||||
|
@ -162,6 +162,18 @@ class BlockBasedTable : public TableReader {
|
|||||||
static InternalIterator* NewDataBlockIterator(
|
static InternalIterator* NewDataBlockIterator(
|
||||||
Rep* rep, const ReadOptions& ro, const Slice& index_value,
|
Rep* rep, const ReadOptions& ro, const Slice& index_value,
|
||||||
BlockIter* input_iter = nullptr);
|
BlockIter* input_iter = nullptr);
|
||||||
|
// If block cache enabled (compressed or uncompressed), looks for the block
|
||||||
|
// identified by handle in (1) uncompressed cache, (2) compressed cache, and
|
||||||
|
// then (3) file. If found, inserts into the cache(s) that were searched
|
||||||
|
// unsuccessfully (e.g., if found in file, will add to both uncompressed and
|
||||||
|
// compressed caches if they're enabled).
|
||||||
|
//
|
||||||
|
// @param block_entry value is set to the uncompressed block if found. If
|
||||||
|
// in uncompressed block cache, also sets cache_handle to reference that
|
||||||
|
// block.
|
||||||
|
static Status MaybeLoadDataBlockToCache(
|
||||||
|
Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
|
||||||
|
Slice compression_dict, CachableEntry<Block>* block_entry);
|
||||||
|
|
||||||
// For the following two functions:
|
// For the following two functions:
|
||||||
// if `no_io == true`, we will not try to read filter/index from sst file
|
// if `no_io == true`, we will not try to read filter/index from sst file
|
||||||
|
@ -59,6 +59,9 @@ Status BlockHandle::DecodeFrom(Slice* input) {
|
|||||||
GetVarint64(input, &size_)) {
|
GetVarint64(input, &size_)) {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
} else {
|
} else {
|
||||||
|
// reset in case failure after partially decoding
|
||||||
|
offset_ = 0;
|
||||||
|
size_ = 0;
|
||||||
return Status::Corruption("bad block handle");
|
return Status::Corruption("bad block handle");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -65,8 +65,8 @@ class BlockHandle {
|
|||||||
enum { kMaxEncodedLength = 10 + 10 };
|
enum { kMaxEncodedLength = 10 + 10 };
|
||||||
|
|
||||||
private:
|
private:
|
||||||
uint64_t offset_ = 0;
|
uint64_t offset_;
|
||||||
uint64_t size_ = 0;
|
uint64_t size_;
|
||||||
|
|
||||||
static const BlockHandle kNullBlockHandle;
|
static const BlockHandle kNullBlockHandle;
|
||||||
};
|
};
|
||||||
@ -242,6 +242,9 @@ extern Status UncompressBlockContentsForCompressionType(
|
|||||||
|
|
||||||
// Implementation details follow. Clients should ignore,
|
// Implementation details follow. Clients should ignore,
|
||||||
|
|
||||||
|
// TODO(andrewkr): we should prefer one way of representing a null/uninitialized
|
||||||
|
// BlockHandle. Currently we use zeros for null and use negation-of-zeros for
|
||||||
|
// uninitialized.
|
||||||
inline BlockHandle::BlockHandle()
|
inline BlockHandle::BlockHandle()
|
||||||
: BlockHandle(~static_cast<uint64_t>(0),
|
: BlockHandle(~static_cast<uint64_t>(0),
|
||||||
~static_cast<uint64_t>(0)) {
|
~static_cast<uint64_t>(0)) {
|
||||||
|
@ -47,6 +47,9 @@ namespace {
|
|||||||
Status SeekToMetaBlock(InternalIterator* meta_iter,
|
Status SeekToMetaBlock(InternalIterator* meta_iter,
|
||||||
const std::string& block_name, bool* is_found,
|
const std::string& block_name, bool* is_found,
|
||||||
BlockHandle* block_handle = nullptr) {
|
BlockHandle* block_handle = nullptr) {
|
||||||
|
if (block_handle != nullptr) {
|
||||||
|
*block_handle = BlockHandle::NullBlockHandle();
|
||||||
|
}
|
||||||
*is_found = true;
|
*is_found = true;
|
||||||
meta_iter->Seek(block_name);
|
meta_iter->Seek(block_name);
|
||||||
if (meta_iter->status().ok()) {
|
if (meta_iter->status().ok()) {
|
||||||
|
@ -1177,23 +1177,29 @@ TEST_F(BlockBasedTableTest, RangeDelBlock) {
|
|||||||
c.Finish(options, ioptions, table_options, *internal_cmp, &sorted_keys,
|
c.Finish(options, ioptions, table_options, *internal_cmp, &sorted_keys,
|
||||||
&kvmap);
|
&kvmap);
|
||||||
|
|
||||||
std::unique_ptr<InternalIterator> iter(
|
for (int j = 0; j < 2; ++j) {
|
||||||
c.GetTableReader()->NewRangeTombstoneIterator(ReadOptions()));
|
std::unique_ptr<InternalIterator> iter(
|
||||||
ASSERT_EQ(false, iter->Valid());
|
c.GetTableReader()->NewRangeTombstoneIterator(ReadOptions()));
|
||||||
iter->SeekToFirst();
|
if (j > 0) {
|
||||||
ASSERT_EQ(true, iter->Valid());
|
// For second iteration, delete the table reader object and verify the
|
||||||
for (int i = 0; i < 2; i++) {
|
// iterator can still access its metablock's range tombstones.
|
||||||
ASSERT_TRUE(iter->Valid());
|
c.ResetTableReader();
|
||||||
ParsedInternalKey parsed_key;
|
}
|
||||||
ASSERT_TRUE(ParseInternalKey(iter->key(), &parsed_key));
|
ASSERT_EQ(false, iter->Valid());
|
||||||
RangeTombstone t(parsed_key, iter->value());
|
iter->SeekToFirst();
|
||||||
ASSERT_EQ(t.start_key_, keys[i]);
|
ASSERT_EQ(true, iter->Valid());
|
||||||
ASSERT_EQ(t.end_key_, vals[i]);
|
for (int i = 0; i < 2; i++) {
|
||||||
ASSERT_EQ(t.seq_, i);
|
ASSERT_TRUE(iter->Valid());
|
||||||
iter->Next();
|
ParsedInternalKey parsed_key;
|
||||||
|
ASSERT_TRUE(ParseInternalKey(iter->key(), &parsed_key));
|
||||||
|
RangeTombstone t(parsed_key, iter->value());
|
||||||
|
ASSERT_EQ(t.start_key_, keys[i]);
|
||||||
|
ASSERT_EQ(t.end_key_, vals[i]);
|
||||||
|
ASSERT_EQ(t.seq_, i);
|
||||||
|
iter->Next();
|
||||||
|
}
|
||||||
|
ASSERT_TRUE(!iter->Valid());
|
||||||
}
|
}
|
||||||
ASSERT_TRUE(!iter->Valid());
|
|
||||||
c.ResetTableReader();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(BlockBasedTableTest, FilterPolicyNameProperties) {
|
TEST_F(BlockBasedTableTest, FilterPolicyNameProperties) {
|
||||||
|
Loading…
Reference in New Issue
Block a user