Add NewMetaDataIterator method (#8692)
Summary: Fixes a problem where the iterator for metadata was being treated as a non-user key when in fact it was a user key. This led to a problem where the property keys could not be searched for correctly. The main exposure of this problem was that the HashIndexReader could not get the "prefixes" property correctly, resulting in the failure of retrieval/creation of the BlockPrefixIndex. Added BlockBasedTableTest.SeekMetaBlocks test to validate this condition. Fixing this condition exposed two other tests (SeekWithPrefixLongerThanKey, MultiGetPrefixFilter) that passed incorrectly previously and now failed. Updated those two tests to pass. Not sure if the tests are functionally correct/still appropriate, but made them pass... Pull Request resolved: https://github.com/facebook/rocksdb/pull/8692 Reviewed By: riversand963 Differential Revision: D33119539 Pulled By: mrambacher fbshipit-source-id: 658969fe9265f73dc184dab97cc3f4eaed2d881a
This commit is contained in:
parent
7ae213f735
commit
9a116ab4b4
@ -3443,6 +3443,27 @@ TEST_F(DBTest, BlockBasedTablePrefixIndexTest) {
|
||||
ASSERT_EQ("v1", Get("k1"));
|
||||
ASSERT_EQ("v2", Get("k2"));
|
||||
}
|
||||
TEST_F(DBTest, BlockBasedTablePrefixHashIndexTest) {
|
||||
// create a DB with block prefix index
|
||||
BlockBasedTableOptions table_options;
|
||||
Options options = CurrentOptions();
|
||||
table_options.index_type = BlockBasedTableOptions::kHashSearch;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
options.prefix_extractor.reset(NewCappedPrefixTransform(2));
|
||||
|
||||
Reopen(options);
|
||||
ASSERT_OK(Put("kk1", "v1"));
|
||||
ASSERT_OK(Put("kk2", "v2"));
|
||||
ASSERT_OK(Put("kk", "v3"));
|
||||
ASSERT_OK(Put("k", "v4"));
|
||||
Flush();
|
||||
|
||||
ASSERT_EQ("v1", Get("kk1"));
|
||||
ASSERT_EQ("v2", Get("kk2"));
|
||||
|
||||
ASSERT_EQ("v3", Get("kk"));
|
||||
ASSERT_EQ("v4", Get("k"));
|
||||
}
|
||||
|
||||
TEST_F(DBTest, BlockBasedTablePrefixIndexTotalOrderSeek) {
|
||||
// create a DB with block prefix index
|
||||
|
@ -772,11 +772,13 @@ TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithPrefixLessThanKey) {
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithPrefixLongerThanKey) {
|
||||
TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithCappedPrefix) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(20));
|
||||
// All of the keys or this test must be longer than 3 characters
|
||||
constexpr int kMinKeyLen = 3;
|
||||
options.prefix_extractor.reset(NewCappedPrefixTransform(kMinKeyLen));
|
||||
options.memtable_whole_key_filtering = true;
|
||||
options.memtable_prefix_bloom_size_ratio = 0.1;
|
||||
BlockBasedTableOptions bbto;
|
||||
@ -1494,7 +1496,7 @@ TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetPrefixFilter) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.prefix_extractor.reset(NewCappedPrefixTransform(5));
|
||||
options.prefix_extractor.reset(NewCappedPrefixTransform(3));
|
||||
BlockBasedTableOptions bbto;
|
||||
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
||||
bbto.cache_index_and_filter_blocks = true;
|
||||
|
@ -126,10 +126,24 @@ struct DecodeKeyV4 {
|
||||
}
|
||||
};
|
||||
|
||||
void DataBlockIter::NextImpl() { ParseNextDataKey<DecodeEntry>(); }
|
||||
struct DecodeEntryV4 {
|
||||
inline const char* operator()(const char* p, const char* limit,
|
||||
uint32_t* shared, uint32_t* non_shared,
|
||||
uint32_t* value_length) {
|
||||
assert(value_length);
|
||||
|
||||
void DataBlockIter::NextOrReportImpl() {
|
||||
ParseNextDataKey<CheckAndDecodeEntry>();
|
||||
*value_length = 0;
|
||||
return DecodeKeyV4()(p, limit, shared, non_shared);
|
||||
}
|
||||
};
|
||||
void DataBlockIter::NextImpl() {
|
||||
bool is_shared = false;
|
||||
ParseNextDataKey(&is_shared);
|
||||
}
|
||||
|
||||
void MetaBlockIter::NextImpl() {
|
||||
bool is_shared = false;
|
||||
ParseNextKey<CheckAndDecodeEntry>(&is_shared);
|
||||
}
|
||||
|
||||
void IndexBlockIter::NextImpl() { ParseNextIndexKey(); }
|
||||
@ -153,6 +167,27 @@ void IndexBlockIter::PrevImpl() {
|
||||
}
|
||||
}
|
||||
|
||||
void MetaBlockIter::PrevImpl() {
|
||||
assert(Valid());
|
||||
// Scan backwards to a restart point before current_
|
||||
const uint32_t original = current_;
|
||||
while (GetRestartPoint(restart_index_) >= original) {
|
||||
if (restart_index_ == 0) {
|
||||
// No more entries
|
||||
current_ = restarts_;
|
||||
restart_index_ = num_restarts_;
|
||||
return;
|
||||
}
|
||||
restart_index_--;
|
||||
}
|
||||
SeekToRestartPoint(restart_index_);
|
||||
bool is_shared = false;
|
||||
// Loop until end of current entry hits the start of original entry
|
||||
while (ParseNextKey<CheckAndDecodeEntry>(&is_shared) &&
|
||||
NextEntryOffset() < original) {
|
||||
}
|
||||
}
|
||||
|
||||
// Similar to IndexBlockIter::PrevImpl but also caches the prev entries
|
||||
void DataBlockIter::PrevImpl() {
|
||||
assert(Valid());
|
||||
@ -212,7 +247,8 @@ void DataBlockIter::PrevImpl() {
|
||||
SeekToRestartPoint(restart_index_);
|
||||
|
||||
do {
|
||||
if (!ParseNextDataKey<DecodeEntry>()) {
|
||||
bool is_shared = false;
|
||||
if (!ParseNextDataKey(&is_shared)) {
|
||||
break;
|
||||
}
|
||||
Slice current_key = raw_key_.GetKey();
|
||||
@ -250,6 +286,22 @@ void DataBlockIter::SeekImpl(const Slice& target) {
|
||||
FindKeyAfterBinarySeek(seek_key, index, skip_linear_scan);
|
||||
}
|
||||
|
||||
void MetaBlockIter::SeekImpl(const Slice& target) {
|
||||
Slice seek_key = target;
|
||||
PERF_TIMER_GUARD(block_seek_nanos);
|
||||
if (data_ == nullptr) { // Not init yet
|
||||
return;
|
||||
}
|
||||
uint32_t index = 0;
|
||||
bool skip_linear_scan = false;
|
||||
bool ok = BinarySeek<DecodeKey>(seek_key, &index, &skip_linear_scan);
|
||||
|
||||
if (!ok) {
|
||||
return;
|
||||
}
|
||||
FindKeyAfterBinarySeek(seek_key, index, skip_linear_scan);
|
||||
}
|
||||
|
||||
// Optimized Seek for point lookup for an internal key `target`
|
||||
// target = "seek_user_key @ type | seqno".
|
||||
//
|
||||
@ -309,23 +361,21 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) {
|
||||
// check if the key is in the restart_interval
|
||||
assert(restart_index < num_restarts_);
|
||||
SeekToRestartPoint(restart_index);
|
||||
current_ = GetRestartPoint(restart_index);
|
||||
|
||||
const char* limit = nullptr;
|
||||
if (restart_index_ + 1 < num_restarts_) {
|
||||
limit = data_ + GetRestartPoint(restart_index_ + 1);
|
||||
} else {
|
||||
limit = data_ + restarts_;
|
||||
uint32_t limit = restarts_;
|
||||
if (restart_index + 1 < num_restarts_) {
|
||||
limit = GetRestartPoint(restart_index + 1);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
while (current_ < limit) {
|
||||
bool shared;
|
||||
// Here we only linear seek the target key inside the restart interval.
|
||||
// If a key does not exist inside a restart interval, we avoid
|
||||
// further searching the block content accross restart interval boundary.
|
||||
// further searching the block content across restart interval boundary.
|
||||
//
|
||||
// TODO(fwu): check the left and write boundary of the restart interval
|
||||
// TODO(fwu): check the left and right boundary of the restart interval
|
||||
// to avoid linear seek a target key that is out of range.
|
||||
if (!ParseNextDataKey<DecodeEntry>(limit) ||
|
||||
CompareCurrentKey(target) >= 0) {
|
||||
if (!ParseNextDataKey(&shared) || CompareCurrentKey(target) >= 0) {
|
||||
// we stop at the first potential matching user key.
|
||||
break;
|
||||
}
|
||||
@ -336,7 +386,7 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) {
|
||||
// 1) there is only one user_key match in the block (otherwise collsion).
|
||||
// the matching user_key resides in the last restart interval, and it
|
||||
// is the last key of the restart interval and of the block as well.
|
||||
// ParseNextDataKey() skiped it as its [ type | seqno ] is smaller.
|
||||
// ParseNextKey() skiped it as its [ type | seqno ] is smaller.
|
||||
//
|
||||
// 2) The seek_key is not found in the HashIndex Lookup(), i.e. kNoEntry,
|
||||
// AND all existing user_keys in the restart interval are smaller than
|
||||
@ -432,20 +482,46 @@ void DataBlockIter::SeekForPrevImpl(const Slice& target) {
|
||||
}
|
||||
}
|
||||
|
||||
void MetaBlockIter::SeekForPrevImpl(const Slice& target) {
|
||||
PERF_TIMER_GUARD(block_seek_nanos);
|
||||
Slice seek_key = target;
|
||||
if (data_ == nullptr) { // Not init yet
|
||||
return;
|
||||
}
|
||||
uint32_t index = 0;
|
||||
bool skip_linear_scan = false;
|
||||
bool ok = BinarySeek<DecodeKey>(seek_key, &index, &skip_linear_scan);
|
||||
|
||||
if (!ok) {
|
||||
return;
|
||||
}
|
||||
FindKeyAfterBinarySeek(seek_key, index, skip_linear_scan);
|
||||
|
||||
if (!Valid()) {
|
||||
SeekToLastImpl();
|
||||
} else {
|
||||
while (Valid() && CompareCurrentKey(seek_key) > 0) {
|
||||
PrevImpl();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DataBlockIter::SeekToFirstImpl() {
|
||||
if (data_ == nullptr) { // Not init yet
|
||||
return;
|
||||
}
|
||||
SeekToRestartPoint(0);
|
||||
ParseNextDataKey<DecodeEntry>();
|
||||
bool is_shared = false;
|
||||
ParseNextDataKey(&is_shared);
|
||||
}
|
||||
|
||||
void DataBlockIter::SeekToFirstOrReportImpl() {
|
||||
void MetaBlockIter::SeekToFirstImpl() {
|
||||
if (data_ == nullptr) { // Not init yet
|
||||
return;
|
||||
}
|
||||
SeekToRestartPoint(0);
|
||||
ParseNextDataKey<CheckAndDecodeEntry>();
|
||||
bool is_shared = false;
|
||||
ParseNextKey<CheckAndDecodeEntry>(&is_shared);
|
||||
}
|
||||
|
||||
void IndexBlockIter::SeekToFirstImpl() {
|
||||
@ -462,7 +538,20 @@ void DataBlockIter::SeekToLastImpl() {
|
||||
return;
|
||||
}
|
||||
SeekToRestartPoint(num_restarts_ - 1);
|
||||
while (ParseNextDataKey<DecodeEntry>() && NextEntryOffset() < restarts_) {
|
||||
bool is_shared = false;
|
||||
while (ParseNextDataKey(&is_shared) && NextEntryOffset() < restarts_) {
|
||||
// Keep skipping
|
||||
}
|
||||
}
|
||||
|
||||
void MetaBlockIter::SeekToLastImpl() {
|
||||
if (data_ == nullptr) { // Not init yet
|
||||
return;
|
||||
}
|
||||
SeekToRestartPoint(num_restarts_ - 1);
|
||||
bool is_shared = false;
|
||||
while (ParseNextKey<CheckAndDecodeEntry>(&is_shared) &&
|
||||
NextEntryOffset() < restarts_) {
|
||||
// Keep skipping
|
||||
}
|
||||
}
|
||||
@ -487,13 +576,12 @@ void BlockIter<TValue>::CorruptionError() {
|
||||
value_.clear();
|
||||
}
|
||||
|
||||
template <class TValue>
|
||||
template <typename DecodeEntryFunc>
|
||||
bool DataBlockIter::ParseNextDataKey(const char* limit) {
|
||||
bool BlockIter<TValue>::ParseNextKey(bool* is_shared) {
|
||||
current_ = NextEntryOffset();
|
||||
const char* p = data_ + current_;
|
||||
if (!limit) {
|
||||
limit = data_ + restarts_; // Restarts come right after data
|
||||
}
|
||||
const char* limit = data_ + restarts_; // Restarts come right after data
|
||||
|
||||
if (p >= limit) {
|
||||
// No more entries to return. Mark as invalid.
|
||||
@ -501,7 +589,6 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
|
||||
restart_index_ = num_restarts_;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Decode next entry
|
||||
uint32_t shared, non_shared, value_length;
|
||||
p = DecodeEntryFunc()(p, limit, &shared, &non_shared, &value_length);
|
||||
@ -510,14 +597,30 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
|
||||
return false;
|
||||
} else {
|
||||
if (shared == 0) {
|
||||
*is_shared = false;
|
||||
// If this key doesn't share any bytes with prev key then we don't need
|
||||
// to decode it and can use its address in the block directly.
|
||||
raw_key_.SetKey(Slice(p, non_shared), false /* copy */);
|
||||
} else {
|
||||
// This key share `shared` bytes with prev key, we need to decode it
|
||||
*is_shared = true;
|
||||
raw_key_.TrimAppend(shared, p, non_shared);
|
||||
}
|
||||
value_ = Slice(p + non_shared, value_length);
|
||||
if (shared == 0) {
|
||||
while (restart_index_ + 1 < num_restarts_ &&
|
||||
GetRestartPoint(restart_index_ + 1) < current_) {
|
||||
++restart_index_;
|
||||
}
|
||||
}
|
||||
// else we are in the middle of a restart interval and the restart_index_
|
||||
// thus has not changed
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool DataBlockIter::ParseNextDataKey(bool* is_shared) {
|
||||
if (ParseNextKey<DecodeEntry>(is_shared)) {
|
||||
#ifndef NDEBUG
|
||||
if (global_seqno_ != kDisableGlobalSequenceNumber) {
|
||||
// If we are reading a file with a global sequence number we should
|
||||
@ -536,64 +639,22 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
|
||||
assert(seqno == 0);
|
||||
}
|
||||
#endif // NDEBUG
|
||||
|
||||
value_ = Slice(p + non_shared, value_length);
|
||||
if (shared == 0) {
|
||||
while (restart_index_ + 1 < num_restarts_ &&
|
||||
GetRestartPoint(restart_index_ + 1) < current_) {
|
||||
++restart_index_;
|
||||
}
|
||||
}
|
||||
// else we are in the middle of a restart interval and the restart_index_
|
||||
// thus has not changed
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool IndexBlockIter::ParseNextIndexKey() {
|
||||
current_ = NextEntryOffset();
|
||||
const char* p = data_ + current_;
|
||||
const char* limit = data_ + restarts_; // Restarts come right after data
|
||||
if (p >= limit) {
|
||||
// No more entries to return. Mark as invalid.
|
||||
current_ = restarts_;
|
||||
restart_index_ = num_restarts_;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Decode next entry
|
||||
uint32_t shared, non_shared, value_length;
|
||||
if (value_delta_encoded_) {
|
||||
p = DecodeKeyV4()(p, limit, &shared, &non_shared);
|
||||
value_length = 0;
|
||||
} else {
|
||||
p = DecodeEntry()(p, limit, &shared, &non_shared, &value_length);
|
||||
}
|
||||
if (p == nullptr || raw_key_.Size() < shared) {
|
||||
CorruptionError();
|
||||
return false;
|
||||
}
|
||||
if (shared == 0) {
|
||||
// If this key doesn't share any bytes with prev key then we don't need
|
||||
// to decode it and can use its address in the block directly.
|
||||
raw_key_.SetKey(Slice(p, non_shared), false /* copy */);
|
||||
} else {
|
||||
// This key share `shared` bytes with prev key, we need to decode it
|
||||
raw_key_.TrimAppend(shared, p, non_shared);
|
||||
}
|
||||
value_ = Slice(p + non_shared, value_length);
|
||||
if (shared == 0) {
|
||||
while (restart_index_ + 1 < num_restarts_ &&
|
||||
GetRestartPoint(restart_index_ + 1) < current_) {
|
||||
++restart_index_;
|
||||
bool is_shared = false;
|
||||
bool ok = (value_delta_encoded_) ? ParseNextKey<DecodeEntryV4>(&is_shared)
|
||||
: ParseNextKey<DecodeEntry>(&is_shared);
|
||||
if (ok) {
|
||||
if (value_delta_encoded_ || global_seqno_state_ != nullptr) {
|
||||
DecodeCurrentValue(is_shared);
|
||||
}
|
||||
}
|
||||
// else we are in the middle of a restart interval and the restart_index_
|
||||
// thus has not changed
|
||||
if (value_delta_encoded_ || global_seqno_state_ != nullptr) {
|
||||
DecodeCurrentValue(shared);
|
||||
}
|
||||
return true;
|
||||
return ok;
|
||||
}
|
||||
|
||||
// The format:
|
||||
@ -604,15 +665,15 @@ bool IndexBlockIter::ParseNextIndexKey() {
|
||||
// where, k is key, v is value, and its encoding is in parenthesis.
|
||||
// The format of each key is (shared_size, non_shared_size, shared, non_shared)
|
||||
// The format of each value, i.e., block handle, is (offset, size) whenever the
|
||||
// shared_size is 0, which included the first entry in each restart point.
|
||||
// is_shared is false, which included the first entry in each restart point.
|
||||
// Otherwise the format is delta-size = block handle size - size of last block
|
||||
// handle.
|
||||
void IndexBlockIter::DecodeCurrentValue(uint32_t shared) {
|
||||
void IndexBlockIter::DecodeCurrentValue(bool is_shared) {
|
||||
Slice v(value_.data(), data_ + restarts_ - value_.data());
|
||||
// Delta encoding is used if `shared` != 0.
|
||||
Status decode_s __attribute__((__unused__)) = decoded_value_.DecodeFrom(
|
||||
&v, have_first_key_,
|
||||
(value_delta_encoded_ && shared) ? &decoded_value_.handle : nullptr);
|
||||
(value_delta_encoded_ && is_shared) ? &decoded_value_.handle : nullptr);
|
||||
assert(decode_s.ok());
|
||||
value_ = Slice(value_.data(), v.data() - value_.data());
|
||||
|
||||
@ -970,6 +1031,21 @@ Block::Block(BlockContents&& contents, size_t read_amp_bytes_per_bit,
|
||||
}
|
||||
}
|
||||
|
||||
MetaBlockIter* Block::NewMetaIterator(bool block_contents_pinned) {
|
||||
MetaBlockIter* iter = new MetaBlockIter();
|
||||
if (size_ < 2 * sizeof(uint32_t)) {
|
||||
iter->Invalidate(Status::Corruption("bad block contents"));
|
||||
return iter;
|
||||
} else if (num_restarts_ == 0) {
|
||||
// Empty block.
|
||||
iter->Invalidate(Status::OK());
|
||||
} else {
|
||||
iter->Initialize(data_, restart_offset_, num_restarts_,
|
||||
block_contents_pinned);
|
||||
}
|
||||
return iter;
|
||||
}
|
||||
|
||||
DataBlockIter* Block::NewDataIterator(const Comparator* raw_ucmp,
|
||||
SequenceNumber global_seqno,
|
||||
DataBlockIter* iter, Statistics* stats,
|
||||
|
@ -34,6 +34,7 @@ template <class TValue>
|
||||
class BlockIter;
|
||||
class DataBlockIter;
|
||||
class IndexBlockIter;
|
||||
class MetaBlockIter;
|
||||
class BlockPrefixIndex;
|
||||
|
||||
// BlockReadAmpBitmap is a bitmap that map the ROCKSDB_NAMESPACE::Block data
|
||||
@ -192,6 +193,21 @@ class Block {
|
||||
Statistics* stats = nullptr,
|
||||
bool block_contents_pinned = false);
|
||||
|
||||
// Returns an MetaBlockIter for iterating over blocks containing metadata
|
||||
// (like Properties blocks). Unlike data blocks, the keys for these blocks
|
||||
// do not contain sequence numbers, do not use a user-define comparator, and
|
||||
// do not track read amplification/statistics. Additionally, MetaBlocks will
|
||||
// not assert if the block is formatted improperly.
|
||||
//
|
||||
// If `block_contents_pinned` is true, the caller will guarantee that when
|
||||
// the cleanup functions are transferred from the iterator to other
|
||||
// classes, e.g. PinnableSlice, the pointer to the bytes will still be
|
||||
// valid. Either the iterator holds cache handle or ownership of some resource
|
||||
// and release them in a release function, or caller is sure that the data
|
||||
// will not go away (for example, it's from mmapped file which will not be
|
||||
// closed).
|
||||
MetaBlockIter* NewMetaIterator(bool block_contents_pinned = false);
|
||||
|
||||
// raw_ucmp is a raw (i.e., not wrapped by `UserComparatorWrapper`) user key
|
||||
// comparator.
|
||||
//
|
||||
@ -269,10 +285,9 @@ class BlockIter : public InternalIteratorBase<TValue> {
|
||||
|
||||
// Makes Valid() return false, status() return `s`, and Seek()/Prev()/etc do
|
||||
// nothing. Calls cleanup functions.
|
||||
void InvalidateBase(Status s) {
|
||||
virtual void Invalidate(const Status& s) {
|
||||
// Assert that the BlockIter is never deleted while Pinning is Enabled.
|
||||
assert(!pinned_iters_mgr_ ||
|
||||
(pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled()));
|
||||
assert(!pinned_iters_mgr_ || !pinned_iters_mgr_->PinningEnabled());
|
||||
|
||||
data_ = nullptr;
|
||||
current_ = restarts_;
|
||||
@ -384,8 +399,12 @@ class BlockIter : public InternalIteratorBase<TValue> {
|
||||
virtual void SeekImpl(const Slice& target) = 0;
|
||||
virtual void SeekForPrevImpl(const Slice& target) = 0;
|
||||
virtual void NextImpl() = 0;
|
||||
|
||||
virtual void PrevImpl() = 0;
|
||||
|
||||
template <typename DecodeEntryFunc>
|
||||
inline bool ParseNextKey(bool* is_shared);
|
||||
|
||||
InternalKeyComparator icmp() {
|
||||
return InternalKeyComparator(raw_ucmp_, false /* named */);
|
||||
}
|
||||
@ -519,24 +538,8 @@ class DataBlockIter final : public BlockIter<Slice> {
|
||||
return res;
|
||||
}
|
||||
|
||||
// Try to advance to the next entry in the block. If there is data corruption
|
||||
// or error, report it to the caller instead of aborting the process. May
|
||||
// incur higher CPU overhead because we need to perform check on every entry.
|
||||
void NextOrReport() {
|
||||
NextOrReportImpl();
|
||||
UpdateKey();
|
||||
}
|
||||
|
||||
// Try to seek to the first entry in the block. If there is data corruption
|
||||
// or error, report it to caller instead of aborting the process. May incur
|
||||
// higher CPU overhead because we need to perform check on every entry.
|
||||
void SeekToFirstOrReport() {
|
||||
SeekToFirstOrReportImpl();
|
||||
UpdateKey();
|
||||
}
|
||||
|
||||
void Invalidate(Status s) {
|
||||
InvalidateBase(s);
|
||||
void Invalidate(const Status& s) override {
|
||||
BlockIter::Invalidate(s);
|
||||
// Clear prev entries cache.
|
||||
prev_entries_keys_buff_.clear();
|
||||
prev_entries_.clear();
|
||||
@ -544,12 +547,14 @@ class DataBlockIter final : public BlockIter<Slice> {
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void SeekToFirstImpl() override;
|
||||
virtual void SeekToLastImpl() override;
|
||||
virtual void SeekImpl(const Slice& target) override;
|
||||
virtual void SeekForPrevImpl(const Slice& target) override;
|
||||
virtual void NextImpl() override;
|
||||
virtual void PrevImpl() override;
|
||||
friend Block;
|
||||
inline bool ParseNextDataKey(bool* is_shared);
|
||||
void SeekToFirstImpl() override;
|
||||
void SeekToLastImpl() override;
|
||||
void SeekImpl(const Slice& target) override;
|
||||
void SeekForPrevImpl(const Slice& target) override;
|
||||
void NextImpl() override;
|
||||
void PrevImpl() override;
|
||||
|
||||
private:
|
||||
// read-amp bitmap
|
||||
@ -582,12 +587,38 @@ class DataBlockIter final : public BlockIter<Slice> {
|
||||
|
||||
DataBlockHashIndex* data_block_hash_index_;
|
||||
|
||||
template <typename DecodeEntryFunc>
|
||||
inline bool ParseNextDataKey(const char* limit = nullptr);
|
||||
|
||||
bool SeekForGetImpl(const Slice& target);
|
||||
void NextOrReportImpl();
|
||||
void SeekToFirstOrReportImpl();
|
||||
};
|
||||
|
||||
// Iterator over MetaBlocks. MetaBlocks are similar to Data Blocks and
|
||||
// are used to store Properties associated with table.
|
||||
// Meta blocks always store user keys (no sequence number) and always
|
||||
// use the BytewiseComparator. Additionally, MetaBlock accesses are
|
||||
// not recorded in the Statistics or for Read-Amplification.
|
||||
class MetaBlockIter final : public BlockIter<Slice> {
|
||||
public:
|
||||
MetaBlockIter() : BlockIter() { raw_key_.SetIsUserKey(true); }
|
||||
void Initialize(const char* data, uint32_t restarts, uint32_t num_restarts,
|
||||
bool block_contents_pinned) {
|
||||
// Initializes the iterator with a BytewiseComparator and
|
||||
// the raw key being a user key.
|
||||
InitializeBase(BytewiseComparator(), data, restarts, num_restarts,
|
||||
kDisableGlobalSequenceNumber, block_contents_pinned);
|
||||
raw_key_.SetIsUserKey(true);
|
||||
}
|
||||
|
||||
Slice value() const override {
|
||||
assert(Valid());
|
||||
return value_;
|
||||
}
|
||||
|
||||
protected:
|
||||
void SeekToFirstImpl() override;
|
||||
void SeekToLastImpl() override;
|
||||
void SeekImpl(const Slice& target) override;
|
||||
void SeekForPrevImpl(const Slice& target) override;
|
||||
void NextImpl() override;
|
||||
void PrevImpl() override;
|
||||
};
|
||||
|
||||
class IndexBlockIter final : public BlockIter<IndexValue> {
|
||||
@ -635,8 +666,6 @@ class IndexBlockIter final : public BlockIter<IndexValue> {
|
||||
}
|
||||
}
|
||||
|
||||
void Invalidate(Status s) { InvalidateBase(s); }
|
||||
|
||||
bool IsValuePinned() const override {
|
||||
return global_seqno_state_ != nullptr ? false : BlockIter::IsValuePinned();
|
||||
}
|
||||
@ -713,7 +742,7 @@ class IndexBlockIter final : public BlockIter<IndexValue> {
|
||||
|
||||
// When value_delta_encoded_ is enabled it decodes the value which is assumed
|
||||
// to be BlockHandle and put it to decoded_value_
|
||||
inline void DecodeCurrentValue(uint32_t shared);
|
||||
inline void DecodeCurrentValue(bool is_shared);
|
||||
};
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -1112,8 +1112,7 @@ Status BlockBasedTable::ReadMetaIndexBlock(
|
||||
|
||||
*metaindex_block = std::move(metaindex);
|
||||
// meta block uses bytewise comparator.
|
||||
iter->reset(metaindex_block->get()->NewDataIterator(
|
||||
BytewiseComparator(), kDisableGlobalSequenceNumber));
|
||||
iter->reset(metaindex_block->get()->NewMetaIterator());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -255,9 +255,7 @@ Status ReadTablePropertiesHelper(
|
||||
// and Block hides block_contents
|
||||
uint64_t block_size = block_contents.data.size();
|
||||
Block properties_block(std::move(block_contents));
|
||||
DataBlockIter iter;
|
||||
properties_block.NewDataIterator(BytewiseComparator(),
|
||||
kDisableGlobalSequenceNumber, &iter);
|
||||
std::unique_ptr<MetaBlockIter> iter(properties_block.NewMetaIterator());
|
||||
|
||||
std::unique_ptr<TableProperties> new_table_properties{new TableProperties};
|
||||
// All pre-defined properties of type uint64_t
|
||||
@ -308,13 +306,13 @@ Status ReadTablePropertiesHelper(
|
||||
};
|
||||
|
||||
std::string last_key;
|
||||
for (iter.SeekToFirstOrReport(); iter.Valid(); iter.NextOrReport()) {
|
||||
s = iter.status();
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
s = iter->status();
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
|
||||
auto key = iter.key().ToString();
|
||||
auto key = iter->key().ToString();
|
||||
// properties block should be strictly sorted with no duplicate key.
|
||||
if (!last_key.empty() &&
|
||||
BytewiseComparator()->Compare(key, last_key) <= 0) {
|
||||
@ -323,12 +321,12 @@ Status ReadTablePropertiesHelper(
|
||||
}
|
||||
last_key = key;
|
||||
|
||||
auto raw_val = iter.value();
|
||||
auto raw_val = iter->value();
|
||||
auto pos = predefined_uint64_properties.find(key);
|
||||
|
||||
if (key == ExternalSstFilePropertyNames::kGlobalSeqno) {
|
||||
new_table_properties->external_sst_file_global_seqno_offset =
|
||||
handle.offset() + iter.ValueOffset();
|
||||
handle.offset() + iter->ValueOffset();
|
||||
}
|
||||
|
||||
if (pos != predefined_uint64_properties.end()) {
|
||||
@ -500,8 +498,7 @@ Status FindMetaBlockInFile(RandomAccessFileReader* file, uint64_t file_size,
|
||||
Block metaindex_block(std::move(metaindex_contents));
|
||||
|
||||
std::unique_ptr<InternalIterator> meta_iter;
|
||||
meta_iter.reset(metaindex_block.NewDataIterator(
|
||||
BytewiseComparator(), kDisableGlobalSequenceNumber));
|
||||
meta_iter.reset(metaindex_block.NewMetaIterator());
|
||||
|
||||
return FindMetaBlock(meta_iter.get(), meta_block_name, block_handle);
|
||||
}
|
||||
|
@ -4819,8 +4819,7 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
|
||||
|
||||
// verify properties block comes last
|
||||
std::unique_ptr<InternalIterator> metaindex_iter{
|
||||
metaindex_block.NewDataIterator(options.comparator,
|
||||
kDisableGlobalSequenceNumber)};
|
||||
metaindex_block.NewMetaIterator()};
|
||||
uint64_t max_offset = 0;
|
||||
std::string key_at_max_offset;
|
||||
for (metaindex_iter->SeekToFirst(); metaindex_iter->Valid();
|
||||
@ -4840,6 +4839,90 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
|
||||
c.ResetTableReader();
|
||||
}
|
||||
|
||||
TEST_P(BlockBasedTableTest, SeekMetaBlocks) {
|
||||
TableConstructor c(BytewiseComparator(), true /* convert_to_internal_key_ */);
|
||||
c.Add("foo_a1", "val1");
|
||||
c.Add("foo_b2", "val2");
|
||||
c.Add("foo_c3", "val3");
|
||||
c.Add("foo_d4", "val4");
|
||||
c.Add("foo_e5", "val5");
|
||||
c.Add("foo_f6", "val6");
|
||||
c.Add("foo_g7", "val7");
|
||||
c.Add("foo_h8", "val8");
|
||||
c.Add("foo_j9", "val9");
|
||||
|
||||
// write an SST file
|
||||
Options options;
|
||||
BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
|
||||
table_options.index_type = BlockBasedTableOptions::kHashSearch;
|
||||
table_options.filter_policy.reset(NewBloomFilterPolicy(
|
||||
8 /* bits_per_key */, false /* use_block_based_filter */));
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(4));
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
ImmutableOptions ioptions(options);
|
||||
MutableCFOptions moptions(options);
|
||||
std::vector<std::string> keys;
|
||||
stl_wrappers::KVMap kvmap;
|
||||
c.Finish(options, ioptions, moptions, table_options,
|
||||
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
|
||||
|
||||
// get file reader
|
||||
test::StringSink* table_sink = c.TEST_GetSink();
|
||||
std::unique_ptr<FSRandomAccessFile> source(new test::StringSource(
|
||||
table_sink->contents(), 0 /* unique_id */, false /* allow_mmap_reads */));
|
||||
|
||||
std::unique_ptr<RandomAccessFileReader> table_reader(
|
||||
new RandomAccessFileReader(std::move(source), "test"));
|
||||
size_t table_size = table_sink->contents().size();
|
||||
|
||||
// read footer
|
||||
Footer footer;
|
||||
IOOptions opts;
|
||||
ASSERT_OK(ReadFooterFromFile(opts, table_reader.get(),
|
||||
nullptr /* prefetch_buffer */, table_size,
|
||||
&footer, kBlockBasedTableMagicNumber));
|
||||
|
||||
// read metaindex
|
||||
auto metaindex_handle = footer.metaindex_handle();
|
||||
BlockContents metaindex_contents;
|
||||
PersistentCacheOptions pcache_opts;
|
||||
BlockFetcher block_fetcher(
|
||||
table_reader.get(), nullptr /* prefetch_buffer */, footer, ReadOptions(),
|
||||
metaindex_handle, &metaindex_contents, ioptions, false /* decompress */,
|
||||
false /*maybe_compressed*/, BlockType::kMetaIndex,
|
||||
UncompressionDict::GetEmptyDict(), pcache_opts,
|
||||
nullptr /*memory_allocator*/);
|
||||
ASSERT_OK(block_fetcher.ReadBlockContents());
|
||||
Block metaindex_block(std::move(metaindex_contents));
|
||||
|
||||
// verify properties block comes last
|
||||
std::unique_ptr<MetaBlockIter> metaindex_iter(
|
||||
metaindex_block.NewMetaIterator());
|
||||
bool has_hash_prefixes = false;
|
||||
bool has_hash_metadata = false;
|
||||
for (metaindex_iter->SeekToFirst(); metaindex_iter->Valid();
|
||||
metaindex_iter->Next()) {
|
||||
if (metaindex_iter->key().ToString() == kHashIndexPrefixesBlock) {
|
||||
has_hash_prefixes = true;
|
||||
} else if (metaindex_iter->key().ToString() ==
|
||||
kHashIndexPrefixesMetadataBlock) {
|
||||
has_hash_metadata = true;
|
||||
}
|
||||
}
|
||||
if (has_hash_metadata) {
|
||||
metaindex_iter->Seek(kHashIndexPrefixesMetadataBlock);
|
||||
ASSERT_TRUE(metaindex_iter->Valid());
|
||||
ASSERT_EQ(kHashIndexPrefixesMetadataBlock,
|
||||
metaindex_iter->key().ToString());
|
||||
}
|
||||
if (has_hash_prefixes) {
|
||||
metaindex_iter->Seek(kHashIndexPrefixesBlock);
|
||||
ASSERT_TRUE(metaindex_iter->Valid());
|
||||
ASSERT_EQ(kHashIndexPrefixesBlock, metaindex_iter->key().ToString());
|
||||
}
|
||||
c.ResetTableReader();
|
||||
}
|
||||
|
||||
TEST_P(BlockBasedTableTest, BadOptions) {
|
||||
ROCKSDB_NAMESPACE::Options options;
|
||||
options.compression = kNoCompression;
|
||||
|
Loading…
x
Reference in New Issue
Block a user