Fix range deletion tombstone ingestion with global seqno (#6429)

Summary:
Original author: jeffrey-xiao

If we are writing a global seqno for an ingested file, the range
tombstone metablock gets accessed and put into the cache during
ingestion preparation. At the time, the global seqno of the ingested
file has not yet been determined, so the cached block will not have a
global seqno. When the file is ingested and we read its range tombstone
metablock, it will be returned from the cache with no global seqno. In
that case, we use the actual seqnos stored in the range tombstones,
which are all zero, so the tombstones cover nothing.

This commit removes global_seqno_ variable from Block. When iterating
over a block, the global seqno for the block is determined by the
iterator instead of storing this mutable attribute in Block.
Additionally, this commit adds a regression test to check that keys are
deleted when ingesting a file with a global seqno and range deletion
tombstones.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6429

Differential Revision: D19961563

Pulled By: ajkr

fbshipit-source-id: 5cf777397fa3e452401f0bf0364b0750492487b7
This commit is contained in:
Andrew Kryczka 2020-02-25 15:29:17 -08:00 committed by Facebook Github Bot
parent d87c10c6ab
commit 69679e7375
12 changed files with 191 additions and 143 deletions

View File

@ -1,4 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Bug Fixes
* Fix a bug where range tombstone blocks in ingested files were cached incorrectly during ingestion. If range tombstones were read from those incorrectly cached blocks, the keys they covered would be exposed.
## 6.8.0 (02/24/2020)
### Java API Changes
* Major breaking changes to Java comparators, toward standardizing on ByteBuffer for performant, locale-neutral operations on keys (#6252).

View File

@ -796,6 +796,45 @@ TEST_F(ExternalSSTFileBasicTest, VerifyChecksumReadahead) {
Destroy(options);
}
TEST_F(ExternalSSTFileBasicTest, IngestRangeDeletionTombstoneWithGlobalSeqno) {
for (int i = 5; i < 25; i++) {
ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), Key(i),
Key(i) + "_val"));
}
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Reopen(options);
SstFileWriter sst_file_writer(EnvOptions(), options);
// file.sst (delete 0 => 30)
std::string file = sst_files_dir_ + "file.sst";
ASSERT_OK(sst_file_writer.Open(file));
ASSERT_OK(sst_file_writer.DeleteRange(Key(0), Key(30)));
ExternalSstFileInfo file_info;
ASSERT_OK(sst_file_writer.Finish(&file_info));
ASSERT_EQ(file_info.file_path, file);
ASSERT_EQ(file_info.num_entries, 0);
ASSERT_EQ(file_info.smallest_key, "");
ASSERT_EQ(file_info.largest_key, "");
ASSERT_EQ(file_info.num_range_del_entries, 1);
ASSERT_EQ(file_info.smallest_range_del_key, Key(0));
ASSERT_EQ(file_info.largest_range_del_key, Key(30));
IngestExternalFileOptions ifo;
ifo.move_files = true;
ifo.snapshot_consistency = true;
ifo.allow_global_seqno = true;
ifo.write_global_seqno = true;
ifo.verify_checksums_before_ingest = false;
ASSERT_OK(db_->IngestExternalFile({file}, ifo));
for (int i = 5; i < 25; i++) {
std::string res;
ASSERT_TRUE(db_->Get(ReadOptions(), Key(i), &res).IsNotFound());
}
}
TEST_P(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) {
int kNumLevels = 7;
Options options = CurrentOptions();

View File

@ -865,14 +865,13 @@ Block::~Block() {
// TEST_SYNC_POINT("Block::~Block");
}
Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
size_t read_amp_bytes_per_bit, Statistics* statistics)
Block::Block(BlockContents&& contents, size_t read_amp_bytes_per_bit,
Statistics* statistics)
: contents_(std::move(contents)),
data_(contents_.data.data()),
size_(contents_.data.size()),
restart_offset_(0),
num_restarts_(0),
global_seqno_(_global_seqno) {
num_restarts_(0) {
TEST_SYNC_POINT("Block::Block:0");
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
@ -925,6 +924,7 @@ Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
DataBlockIter* Block::NewDataIterator(const Comparator* cmp,
const Comparator* ucmp,
SequenceNumber global_seqno,
DataBlockIter* iter, Statistics* stats,
bool block_contents_pinned) {
DataBlockIter* ret_iter;
@ -943,7 +943,7 @@ DataBlockIter* Block::NewDataIterator(const Comparator* cmp,
return ret_iter;
} else {
ret_iter->Initialize(
cmp, ucmp, data_, restart_offset_, num_restarts_, global_seqno_,
cmp, ucmp, data_, restart_offset_, num_restarts_, global_seqno,
read_amp_bitmap_.get(), block_contents_pinned,
data_block_hash_index_.Valid() ? &data_block_hash_index_ : nullptr);
if (read_amp_bitmap_) {
@ -958,10 +958,10 @@ DataBlockIter* Block::NewDataIterator(const Comparator* cmp,
}
IndexBlockIter* Block::NewIndexIterator(
const Comparator* cmp, const Comparator* ucmp, IndexBlockIter* iter,
Statistics* /*stats*/, bool total_order_seek, bool have_first_key,
bool key_includes_seq, bool value_is_full, bool block_contents_pinned,
BlockPrefixIndex* prefix_index) {
const Comparator* cmp, const Comparator* ucmp, SequenceNumber global_seqno,
IndexBlockIter* iter, Statistics* /*stats*/, bool total_order_seek,
bool have_first_key, bool key_includes_seq, bool value_is_full,
bool block_contents_pinned, BlockPrefixIndex* prefix_index) {
IndexBlockIter* ret_iter;
if (iter != nullptr) {
ret_iter = iter;
@ -980,7 +980,7 @@ IndexBlockIter* Block::NewIndexIterator(
BlockPrefixIndex* prefix_index_ptr =
total_order_seek ? nullptr : prefix_index;
ret_iter->Initialize(cmp, ucmp, data_, restart_offset_, num_restarts_,
global_seqno_, prefix_index_ptr, have_first_key,
global_seqno, prefix_index_ptr, have_first_key,
key_includes_seq, value_is_full,
block_contents_pinned);
}

View File

@ -151,8 +151,7 @@ class BlockReadAmpBitmap {
class Block {
public:
// Initialize the block with the specified contents.
explicit Block(BlockContents&& contents, SequenceNumber _global_seqno,
size_t read_amp_bytes_per_bit = 0,
explicit Block(BlockContents&& contents, size_t read_amp_bytes_per_bit = 0,
Statistics* statistics = nullptr);
// No copying allowed
Block(const Block&) = delete;
@ -190,6 +189,7 @@ class Block {
// the key that is just pass the target key.
DataBlockIter* NewDataIterator(const Comparator* comparator,
const Comparator* user_comparator,
SequenceNumber global_seqno,
DataBlockIter* iter = nullptr,
Statistics* stats = nullptr,
bool block_contents_pinned = false);
@ -208,6 +208,7 @@ class Block {
// It is determined by IndexType property of the table.
IndexBlockIter* NewIndexIterator(const Comparator* comparator,
const Comparator* user_comparator,
SequenceNumber global_seqno,
IndexBlockIter* iter, Statistics* stats,
bool total_order_seek, bool have_first_key,
bool key_includes_seq, bool value_is_full,
@ -217,8 +218,6 @@ class Block {
// Report an approximation of how much memory has been used.
size_t ApproximateMemoryUsage() const;
SequenceNumber global_seqno() const { return global_seqno_; }
private:
BlockContents contents_;
const char* data_; // contents_.data.data()
@ -226,10 +225,6 @@ class Block {
uint32_t restart_offset_; // Offset in data_ of restart array
uint32_t num_restarts_;
std::unique_ptr<BlockReadAmpBitmap> read_amp_bitmap_;
// All keys in the block will have seqno = global_seqno_, regardless of
// the encoded value (kDisableGlobalSequenceNumber means disabled)
const SequenceNumber global_seqno_;
DataBlockHashIndex data_block_hash_index_;
};

View File

@ -82,7 +82,6 @@ template <>
class BlocklikeTraits<BlockContents> {
public:
static BlockContents* Create(BlockContents&& contents,
SequenceNumber /* global_seqno */,
size_t /* read_amp_bytes_per_bit */,
Statistics* /* statistics */,
bool /* using_zstd */,
@ -99,7 +98,6 @@ template <>
class BlocklikeTraits<ParsedFullFilterBlock> {
public:
static ParsedFullFilterBlock* Create(BlockContents&& contents,
SequenceNumber /* global_seqno */,
size_t /* read_amp_bytes_per_bit */,
Statistics* /* statistics */,
bool /* using_zstd */,
@ -115,12 +113,10 @@ class BlocklikeTraits<ParsedFullFilterBlock> {
template <>
class BlocklikeTraits<Block> {
public:
static Block* Create(BlockContents&& contents, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, Statistics* statistics,
bool /* using_zstd */,
static Block* Create(BlockContents&& contents, size_t read_amp_bytes_per_bit,
Statistics* statistics, bool /* using_zstd */,
const FilterPolicy* /* filter_policy */) {
return new Block(std::move(contents), global_seqno, read_amp_bytes_per_bit,
statistics);
return new Block(std::move(contents), read_amp_bytes_per_bit, statistics);
}
static uint32_t GetNumRestarts(const Block& block) {
@ -132,7 +128,6 @@ template <>
class BlocklikeTraits<UncompressionDict> {
public:
static UncompressionDict* Create(BlockContents&& contents,
SequenceNumber /* global_seqno */,
size_t /* read_amp_bytes_per_bit */,
Statistics* /* statistics */,
bool using_zstd,
@ -160,9 +155,9 @@ Status ReadBlockFromFile(
std::unique_ptr<TBlocklike>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, bool maybe_compressed, BlockType block_type,
const UncompressionDict& uncompression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator,
bool for_compaction, bool using_zstd, const FilterPolicy* filter_policy) {
const PersistentCacheOptions& cache_options, size_t read_amp_bytes_per_bit,
MemoryAllocator* memory_allocator, bool for_compaction, bool using_zstd,
const FilterPolicy* filter_policy) {
assert(result);
BlockContents contents;
@ -173,8 +168,8 @@ Status ReadBlockFromFile(
Status s = block_fetcher.ReadBlockContents();
if (s.ok()) {
result->reset(BlocklikeTraits<TBlocklike>::Create(
std::move(contents), global_seqno, read_amp_bytes_per_bit,
ioptions.statistics, using_zstd, filter_policy));
std::move(contents), read_amp_bytes_per_bit, ioptions.statistics,
using_zstd, filter_policy));
}
return s;
@ -414,6 +409,7 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
return NewErrorInternalIterator<IndexValue>(s);
}
const BlockBasedTable::Rep* rep = table()->rep_;
InternalIteratorBase<IndexValue>* it = nullptr;
Statistics* kNullStats = nullptr;
@ -426,8 +422,9 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
&partition_map_),
index_block.GetValue()->NewIndexIterator(
internal_comparator(), internal_comparator()->user_comparator(),
nullptr, kNullStats, true, index_has_first_key(),
index_key_includes_seq(), index_value_is_full()));
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats,
true, index_has_first_key(), index_key_includes_seq(),
index_value_is_full()));
} else {
ReadOptions ro;
ro.fill_cache = read_options.fill_cache;
@ -437,8 +434,9 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
table(), ro, *internal_comparator(),
index_block.GetValue()->NewIndexIterator(
internal_comparator(), internal_comparator()->user_comparator(),
nullptr, kNullStats, true, index_has_first_key(),
index_key_includes_seq(), index_value_is_full()),
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats,
true, index_has_first_key(), index_key_includes_seq(),
index_value_is_full()),
false, true, /* prefix_extractor */ nullptr, BlockType::kIndex,
lookup_context ? lookup_context->caller
: TableReaderCaller::kUncategorized);
@ -477,9 +475,9 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
index_block.GetValue()->NewIndexIterator(
internal_comparator(), internal_comparator()->user_comparator(), &biter,
kNullStats, true, index_has_first_key(), index_key_includes_seq(),
index_value_is_full());
internal_comparator(), internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true,
index_has_first_key(), index_key_includes_seq(), index_value_is_full());
// Index partitions are assumed to be consecuitive. Prefetch them all.
// Read the first block offset
biter.SeekToFirst();
@ -590,6 +588,7 @@ class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon {
const ReadOptions& read_options, bool /* disable_prefix_seek */,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) override {
const BlockBasedTable::Rep* rep = table()->get_rep();
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s =
@ -607,9 +606,9 @@ class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon {
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
auto it = index_block.GetValue()->NewIndexIterator(
internal_comparator(), internal_comparator()->user_comparator(), iter,
kNullStats, true, index_has_first_key(), index_key_includes_seq(),
index_value_is_full());
internal_comparator(), internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, true,
index_has_first_key(), index_key_includes_seq(), index_value_is_full());
assert(it != nullptr);
index_block.TransferTo(it);
@ -737,6 +736,7 @@ class HashIndexReader : public BlockBasedTable::IndexReaderCommon {
const ReadOptions& read_options, bool disable_prefix_seek,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) override {
const BlockBasedTable::Rep* rep = table()->get_rep();
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s =
@ -756,10 +756,11 @@ class HashIndexReader : public BlockBasedTable::IndexReaderCommon {
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
auto it = index_block.GetValue()->NewIndexIterator(
internal_comparator(), internal_comparator()->user_comparator(), iter,
kNullStats, total_order_seek, index_has_first_key(),
index_key_includes_seq(), index_value_is_full(),
false /* block_contents_pinned */, prefix_index_.get());
internal_comparator(), internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), iter, kNullStats,
total_order_seek, index_has_first_key(), index_key_includes_seq(),
index_value_is_full(), false /* block_contents_pinned */,
prefix_index_.get());
assert(it != nullptr);
index_block.TransferTo(it);
@ -1661,9 +1662,9 @@ Status BlockBasedTable::ReadMetaIndexBlock(
rep_->footer.metaindex_handle(), &metaindex, rep_->ioptions,
true /* decompress */, true /*maybe_compressed*/, BlockType::kMetaIndex,
UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
GetMemoryAllocator(rep_->table_options), false /* for_compaction */,
rep_->blocks_definitely_zstd_compressed, nullptr /* filter_policy */);
0 /* read_amp_bytes_per_bit */, GetMemoryAllocator(rep_->table_options),
false /* for_compaction */, rep_->blocks_definitely_zstd_compressed,
nullptr /* filter_policy */);
if (!s.ok()) {
ROCKS_LOG_ERROR(rep_->ioptions.info_log,
@ -1675,8 +1676,9 @@ Status BlockBasedTable::ReadMetaIndexBlock(
*metaindex_block = std::move(metaindex);
// meta block uses bytewise comparator.
iter->reset(metaindex_block->get()->NewDataIterator(BytewiseComparator(),
BytewiseComparator()));
iter->reset(metaindex_block->get()->NewDataIterator(
BytewiseComparator(), BytewiseComparator(),
kDisableGlobalSequenceNumber));
return Status::OK();
}
@ -1750,8 +1752,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
if (s.ok()) {
std::unique_ptr<TBlocklike> block_holder(
BlocklikeTraits<TBlocklike>::Create(
std::move(contents), rep_->get_global_seqno(block_type),
read_amp_bytes_per_bit, statistics,
std::move(contents), read_amp_bytes_per_bit, statistics,
rep_->blocks_definitely_zstd_compressed,
rep_->table_options.filter_policy.get())); // uncompressed block
@ -1786,7 +1787,7 @@ Status BlockBasedTable::PutDataBlockToCache(
Cache* block_cache, Cache* block_cache_compressed,
CachableEntry<TBlocklike>* cached_block, BlockContents* raw_block_contents,
CompressionType raw_block_comp_type,
const UncompressionDict& uncompression_dict, SequenceNumber seq_no,
const UncompressionDict& uncompression_dict,
MemoryAllocator* memory_allocator, BlockType block_type,
GetContext* get_context) const {
const ImmutableCFOptions& ioptions = rep_->ioptions;
@ -1823,13 +1824,13 @@ Status BlockBasedTable::PutDataBlockToCache(
}
block_holder.reset(BlocklikeTraits<TBlocklike>::Create(
std::move(uncompressed_block_contents), seq_no, read_amp_bytes_per_bit,
std::move(uncompressed_block_contents), read_amp_bytes_per_bit,
statistics, rep_->blocks_definitely_zstd_compressed,
rep_->table_options.filter_policy.get()));
} else {
block_holder.reset(BlocklikeTraits<TBlocklike>::Create(
std::move(*raw_block_contents), seq_no, read_amp_bytes_per_bit,
statistics, rep_->blocks_definitely_zstd_compressed,
std::move(*raw_block_contents), read_amp_bytes_per_bit, statistics,
rep_->blocks_definitely_zstd_compressed,
rep_->table_options.filter_policy.get()));
}
@ -1987,7 +1988,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
const bool block_contents_pinned =
block.IsCached() ||
(!block.GetValue()->own_bytes() && rep_->immortal_table);
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), iter,
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), block_type, iter,
block_contents_pinned);
if (!block.IsCached()) {
@ -2033,22 +2034,24 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
template <>
DataBlockIter* BlockBasedTable::InitBlockIterator<DataBlockIter>(
const Rep* rep, Block* block, DataBlockIter* input_iter,
bool block_contents_pinned) {
const Rep* rep, Block* block, BlockType block_type,
DataBlockIter* input_iter, bool block_contents_pinned) {
return block->NewDataIterator(
&rep->internal_comparator, rep->internal_comparator.user_comparator(),
input_iter, rep->ioptions.statistics, block_contents_pinned);
rep->get_global_seqno(block_type), input_iter, rep->ioptions.statistics,
block_contents_pinned);
}
template <>
IndexBlockIter* BlockBasedTable::InitBlockIterator<IndexBlockIter>(
const Rep* rep, Block* block, IndexBlockIter* input_iter,
bool block_contents_pinned) {
const Rep* rep, Block* block, BlockType block_type,
IndexBlockIter* input_iter, bool block_contents_pinned) {
return block->NewIndexIterator(
&rep->internal_comparator, rep->internal_comparator.user_comparator(),
input_iter, rep->ioptions.statistics, /* total_order_seek */ true,
rep->index_has_first_key, rep->index_key_includes_seq,
rep->index_value_is_full, block_contents_pinned);
rep->get_global_seqno(block_type), input_iter, rep->ioptions.statistics,
/* total_order_seek */ true, rep->index_has_first_key,
rep->index_key_includes_seq, rep->index_value_is_full,
block_contents_pinned);
}
// Convert an uncompressed data block (i.e CachableEntry<Block>)
@ -2079,8 +2082,8 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro,
const bool block_contents_pinned =
block.IsCached() ||
(!block.GetValue()->own_bytes() && rep_->immortal_table);
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), iter,
block_contents_pinned);
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), BlockType::kData,
iter, block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
@ -2202,12 +2205,11 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
}
if (s.ok()) {
SequenceNumber seq_no = rep_->get_global_seqno(block_type);
// If filling cache is allowed and a cache is configured, try to put the
// block to the cache.
s = PutDataBlockToCache(
key, ckey, block_cache, block_cache_compressed, block_entry,
contents, raw_block_comp_type, uncompression_dict, seq_no,
contents, raw_block_comp_type, uncompression_dict,
GetMemoryAllocator(rep_->table_options), block_type, get_context);
}
}
@ -2305,7 +2307,6 @@ void BlockBasedTable::RetrieveMultipleBlocks(
RandomAccessFileReader* file = rep_->file.get();
const Footer& footer = rep_->footer;
const ImmutableCFOptions& ioptions = rep_->ioptions;
SequenceNumber global_seqno = rep_->get_global_seqno(BlockType::kData);
size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
@ -2523,9 +2524,8 @@ void BlockBasedTable::RetrieveMultipleBlocks(
contents = std::move(raw_block_contents);
}
if (s.ok()) {
(*results)[idx_in_batch].SetOwnedValue(
new Block(std::move(contents), global_seqno, read_amp_bytes_per_bit,
ioptions.statistics));
(*results)[idx_in_batch].SetOwnedValue(new Block(
std::move(contents), read_amp_bytes_per_bit, ioptions.statistics));
}
}
(*statuses)[idx_in_batch] = s;
@ -2580,7 +2580,6 @@ Status BlockBasedTable::RetrieveBlock(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, &block,
rep_->ioptions, do_uncompress, maybe_compressed, block_type,
uncompression_dict, rep_->persistent_cache_options,
rep_->get_global_seqno(block_type),
block_type == BlockType::kData
? rep_->table_options.read_amp_bytes_per_bit
: 0,
@ -2650,8 +2649,9 @@ BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator(
// to set `block_contents_pinned`.
return block->second.GetValue()->NewIndexIterator(
&rep->internal_comparator, rep->internal_comparator.user_comparator(),
nullptr, kNullStats, true, rep->index_has_first_key,
rep->index_key_includes_seq, rep->index_value_is_full);
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
rep->index_has_first_key, rep->index_key_includes_seq,
rep->index_value_is_full);
}
// Create an empty iterator
return new IndexBlockIter();

View File

@ -287,6 +287,7 @@ class BlockBasedTable : public TableReader {
// Either Block::NewDataIterator() or Block::NewIndexIterator().
template <typename TBlockIter>
static TBlockIter* InitBlockIterator(const Rep* rep, Block* block,
BlockType block_type,
TBlockIter* input_iter,
bool block_contents_pinned);
@ -370,14 +371,16 @@ class BlockBasedTable : public TableReader {
// @param uncompression_dict Data for presetting the compression library's
// dictionary.
template <typename TBlocklike>
Status PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
CachableEntry<TBlocklike>* cached_block,
BlockContents* raw_block_contents, CompressionType raw_block_comp_type,
const UncompressionDict& uncompression_dict, SequenceNumber seq_no,
MemoryAllocator* memory_allocator, BlockType block_type,
GetContext* get_context) const;
Status PutDataBlockToCache(const Slice& block_cache_key,
const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
CachableEntry<TBlocklike>* cached_block,
BlockContents* raw_block_contents,
CompressionType raw_block_comp_type,
const UncompressionDict& uncompression_dict,
MemoryAllocator* memory_allocator,
BlockType block_type,
GetContext* get_context) const;
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.

View File

@ -93,12 +93,12 @@ TEST_F(BlockTest, SimpleTest) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
Block reader(std::move(contents));
// read contents of block sequentially
int count = 0;
InternalIterator *iter =
reader.NewDataIterator(options.comparator, options.comparator);
InternalIterator *iter = reader.NewDataIterator(
options.comparator, options.comparator, kDisableGlobalSequenceNumber);
for (iter->SeekToFirst(); iter->Valid(); count++, iter->Next()) {
// read kv from block
Slice k = iter->key();
@ -111,7 +111,8 @@ TEST_F(BlockTest, SimpleTest) {
delete iter;
// read block contents randomly
iter = reader.NewDataIterator(options.comparator, options.comparator);
iter = reader.NewDataIterator(options.comparator, options.comparator,
kDisableGlobalSequenceNumber);
for (int i = 0; i < num_records; i++) {
// find a random key in the lookaside array
int index = rnd.Uniform(num_records);
@ -151,14 +152,15 @@ void CheckBlockContents(BlockContents contents, const int max_key,
const size_t prefix_size = 6;
// create block reader
BlockContents contents_ref(contents.data);
Block reader1(std::move(contents), kDisableGlobalSequenceNumber);
Block reader2(std::move(contents_ref), kDisableGlobalSequenceNumber);
Block reader1(std::move(contents));
Block reader2(std::move(contents_ref));
std::unique_ptr<const SliceTransform> prefix_extractor(
NewFixedPrefixTransform(prefix_size));
std::unique_ptr<InternalIterator> regular_iter(
reader2.NewDataIterator(BytewiseComparator(), BytewiseComparator()));
reader2.NewDataIterator(BytewiseComparator(), BytewiseComparator(),
kDisableGlobalSequenceNumber));
// Seek existent keys
for (size_t i = 0; i < keys.size(); i++) {
@ -375,13 +377,13 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber,
kBytesPerBit, stats.get());
Block reader(std::move(contents), kBytesPerBit, stats.get());
// read contents of block sequentially
size_t read_bytes = 0;
DataBlockIter *iter = reader.NewDataIterator(
options.comparator, options.comparator, nullptr, stats.get());
options.comparator, options.comparator, kDisableGlobalSequenceNumber,
nullptr, stats.get());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
iter->value();
read_bytes += iter->TEST_CurrentEntrySize();
@ -408,12 +410,12 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber,
kBytesPerBit, stats.get());
Block reader(std::move(contents), kBytesPerBit, stats.get());
size_t read_bytes = 0;
DataBlockIter *iter = reader.NewDataIterator(
options.comparator, options.comparator, nullptr, stats.get());
options.comparator, options.comparator, kDisableGlobalSequenceNumber,
nullptr, stats.get());
for (int i = 0; i < num_records; i++) {
Slice k(keys[i]);
@ -443,12 +445,12 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber,
kBytesPerBit, stats.get());
Block reader(std::move(contents), kBytesPerBit, stats.get());
size_t read_bytes = 0;
DataBlockIter *iter = reader.NewDataIterator(
options.comparator, options.comparator, nullptr, stats.get());
options.comparator, options.comparator, kDisableGlobalSequenceNumber,
nullptr, stats.get());
std::unordered_set<int> read_keys;
for (int i = 0; i < num_records; i++) {
int index = rnd.Uniform(num_records);
@ -563,7 +565,7 @@ TEST_P(IndexBlockTest, IndexValueEncodingTest) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
Block reader(std::move(contents));
const bool kTotalOrderSeek = true;
const bool kIncludesSeq = true;
@ -572,8 +574,9 @@ TEST_P(IndexBlockTest, IndexValueEncodingTest) {
Statistics *kNullStats = nullptr;
// read contents of block sequentially
InternalIteratorBase<IndexValue> *iter = reader.NewIndexIterator(
options.comparator, options.comparator, kNullIter, kNullStats,
kTotalOrderSeek, includeFirstKey(), kIncludesSeq, kValueIsFull);
options.comparator, options.comparator, kDisableGlobalSequenceNumber,
kNullIter, kNullStats, kTotalOrderSeek, includeFirstKey(), kIncludesSeq,
kValueIsFull);
iter->SeekToFirst();
for (int index = 0; index < num_records; ++index) {
ASSERT_TRUE(iter->Valid());
@ -593,8 +596,9 @@ TEST_P(IndexBlockTest, IndexValueEncodingTest) {
// read block contents randomly
iter = reader.NewIndexIterator(options.comparator, options.comparator,
kNullIter, kNullStats, kTotalOrderSeek,
includeFirstKey(), kIncludesSeq, kValueIsFull);
kDisableGlobalSequenceNumber, kNullIter,
kNullStats, kTotalOrderSeek, includeFirstKey(),
kIncludesSeq, kValueIsFull);
for (int i = 0; i < num_records * 2; i++) {
// find a random key in the lookaside array
int index = rnd.Uniform(num_records);

View File

@ -284,7 +284,7 @@ TEST(DataBlockHashIndex, BlockRestartIndexExceedMax) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
Block reader(std::move(contents));
ASSERT_EQ(reader.IndexType(),
BlockBasedTableOptions::kDataBlockBinaryAndHash);
@ -306,7 +306,7 @@ TEST(DataBlockHashIndex, BlockRestartIndexExceedMax) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
Block reader(std::move(contents));
ASSERT_EQ(reader.IndexType(),
BlockBasedTableOptions::kDataBlockBinarySearch);
@ -337,7 +337,7 @@ TEST(DataBlockHashIndex, BlockSizeExceedMax) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
Block reader(std::move(contents));
ASSERT_EQ(reader.IndexType(),
BlockBasedTableOptions::kDataBlockBinaryAndHash);
@ -361,7 +361,7 @@ TEST(DataBlockHashIndex, BlockSizeExceedMax) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
Block reader(std::move(contents));
// the index type have fallen back to binary when build finish.
ASSERT_EQ(reader.IndexType(),
@ -388,10 +388,11 @@ TEST(DataBlockHashIndex, BlockTestSingleKey) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
Block reader(std::move(contents));
const InternalKeyComparator icmp(BytewiseComparator());
auto iter = reader.NewDataIterator(&icmp, icmp.user_comparator());
auto iter = reader.NewDataIterator(&icmp, icmp.user_comparator(),
kDisableGlobalSequenceNumber);
bool may_exist;
// search in block for the key just inserted
{
@ -469,12 +470,13 @@ TEST(DataBlockHashIndex, BlockTestLarge) {
// create block reader
BlockContents contents;
contents.data = rawblock;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
Block reader(std::move(contents));
const InternalKeyComparator icmp(BytewiseComparator());
// random seek existent keys
for (int i = 0; i < num_records; i++) {
auto iter = reader.NewDataIterator(&icmp, icmp.user_comparator());
auto iter = reader.NewDataIterator(&icmp, icmp.user_comparator(),
kDisableGlobalSequenceNumber);
// find a random key in the lookaside array
int index = rnd.Uniform(num_records);
std::string ukey(keys[index] + "1" /* existing key marker */);
@ -511,7 +513,8 @@ TEST(DataBlockHashIndex, BlockTestLarge) {
// C true false
for (int i = 0; i < num_records; i++) {
auto iter = reader.NewDataIterator(&icmp, icmp.user_comparator());
auto iter = reader.NewDataIterator(&icmp, icmp.user_comparator(),
kDisableGlobalSequenceNumber);
// find a random key in the lookaside array
int index = rnd.Uniform(num_records);
std::string ukey(keys[index] + "0" /* non-existing key marker */);

View File

@ -194,8 +194,9 @@ BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
const InternalKeyComparator* const comparator = internal_comparator();
Statistics* kNullStats = nullptr;
filter_block.GetValue()->NewIndexIterator(
comparator, comparator->user_comparator(), &iter, kNullStats,
true /* total_order_seek */, false /* have_first_key */,
comparator, comparator->user_comparator(),
table()->get_rep()->get_global_seqno(BlockType::kFilter), &iter,
kNullStats, true /* total_order_seek */, false /* have_first_key */,
index_key_includes_seq(), index_value_is_full());
iter.Seek(entry);
if (UNLIKELY(!iter.Valid())) {
@ -319,7 +320,8 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
const InternalKeyComparator* const comparator = internal_comparator();
Statistics* kNullStats = nullptr;
filter_block.GetValue()->NewIndexIterator(
comparator, comparator->user_comparator(), &biter, kNullStats,
comparator, comparator->user_comparator(),
rep->get_global_seqno(BlockType::kFilter), &biter, kNullStats,
true /* total_order_seek */, false /* have_first_key */,
index_key_includes_seq(), index_value_is_full());
// Index partitions are assumed to be consecuitive. Prefetch them all.

View File

@ -151,8 +151,7 @@ class PartitionedFilterBlockTest
pib));
BlockContents contents(slice);
CachableEntry<Block> block(
new Block(std::move(contents), kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */, nullptr),
new Block(std::move(contents), 0 /* read_amp_bytes_per_bit */, nullptr),
nullptr /* cache */, nullptr /* cache_handle */, true /* own_value */);
auto reader =
new MyPartitionedFilterBlockReader(table_.get(), std::move(block));

View File

@ -226,11 +226,10 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
return s;
}
Block properties_block(std::move(block_contents),
kDisableGlobalSequenceNumber);
Block properties_block(std::move(block_contents));
DataBlockIter iter;
properties_block.NewDataIterator(BytewiseComparator(), BytewiseComparator(),
&iter);
kDisableGlobalSequenceNumber, &iter);
auto new_table_properties = new TableProperties();
// All pre-defined properties of type uint64_t
@ -385,10 +384,10 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
}
// property blocks are never compressed. Need to add uncompress logic if we
// are to compress it.
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);
Block metaindex_block(std::move(metaindex_contents));
std::unique_ptr<InternalIterator> meta_iter(metaindex_block.NewDataIterator(
BytewiseComparator(), BytewiseComparator()));
BytewiseComparator(), BytewiseComparator(),
kDisableGlobalSequenceNumber));
// -- Read property block
bool found_properties_block = true;
@ -455,12 +454,12 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
}
// meta blocks are never compressed. Need to add uncompress logic if we are to
// compress it.
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);
Block metaindex_block(std::move(metaindex_contents));
std::unique_ptr<InternalIterator> meta_iter;
meta_iter.reset(metaindex_block.NewDataIterator(BytewiseComparator(),
BytewiseComparator()));
meta_iter.reset(metaindex_block.NewDataIterator(
BytewiseComparator(), BytewiseComparator(),
kDisableGlobalSequenceNumber));
return FindMetaBlock(meta_iter.get(), meta_block_name, block_handle);
}
@ -500,12 +499,12 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
// compress it.
// Finding metablock
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);
Block metaindex_block(std::move(metaindex_contents));
std::unique_ptr<InternalIterator> meta_iter;
meta_iter.reset(metaindex_block.NewDataIterator(BytewiseComparator(),
BytewiseComparator()));
meta_iter.reset(metaindex_block.NewDataIterator(
BytewiseComparator(), BytewiseComparator(),
kDisableGlobalSequenceNumber));
BlockHandle block_handle;
status = FindMetaBlock(meta_iter.get(), meta_block_name, &block_handle);

View File

@ -235,12 +235,13 @@ class BlockConstructor: public Constructor {
data_ = builder.Finish().ToString();
BlockContents contents;
contents.data = data_;
block_ = new Block(std::move(contents), kDisableGlobalSequenceNumber);
block_ = new Block(std::move(contents));
return Status::OK();
}
InternalIterator* NewIterator(
const SliceTransform* /*prefix_extractor*/) const override {
return block_->NewDataIterator(comparator_, comparator_);
return block_->NewDataIterator(comparator_, comparator_,
kDisableGlobalSequenceNumber);
}
private:
@ -4297,11 +4298,11 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
BlockFetchHelper(metaindex_handle, BlockType::kMetaIndex,
&metaindex_contents);
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);
Block metaindex_block(std::move(metaindex_contents));
std::unique_ptr<InternalIterator> meta_iter(metaindex_block.NewDataIterator(
BytewiseComparator(), BytewiseComparator()));
BytewiseComparator(), BytewiseComparator(),
kDisableGlobalSequenceNumber));
bool found_properties_block = true;
ASSERT_OK(SeekToPropertiesBlock(meta_iter.get(), &found_properties_block));
ASSERT_TRUE(found_properties_block);
@ -4314,8 +4315,7 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
BlockFetchHelper(properties_handle, BlockType::kProperties,
&properties_contents);
Block properties_block(std::move(properties_contents),
kDisableGlobalSequenceNumber);
Block properties_block(std::move(properties_contents));
ASSERT_EQ(properties_block.NumRestarts(), 1u);
}
@ -4376,12 +4376,12 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
UncompressionDict::GetEmptyDict(), pcache_opts,
nullptr /*memory_allocator*/);
ASSERT_OK(block_fetcher.ReadBlockContents());
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);
Block metaindex_block(std::move(metaindex_contents));
// verify properties block comes last
std::unique_ptr<InternalIterator> metaindex_iter{
metaindex_block.NewDataIterator(options.comparator, options.comparator)};
metaindex_block.NewDataIterator(options.comparator, options.comparator,
kDisableGlobalSequenceNumber)};
uint64_t max_offset = 0;
std::string key_at_max_offset;
for (metaindex_iter->SeekToFirst(); metaindex_iter->Valid();