Maintain position in range deletions map
Summary: When deletion-collapsing mode is enabled (i.e., for DBIter/CompactionIterator), we maintain position in the tombstone maps across calls to ShouldDelete(). Since iterators often access keys sequentially (or reverse-sequentially), scanning forward/backward from the last position can be faster than binary-searching the map for every key. - When Next() is invoked on an iterator, we use kForwardTraversal to scan forwards, if needed, until arriving at the range deletion containing the next key. - Similarly for Prev(), we use kBackwardTraversal to scan backwards in the range deletion map. - When the iterator seeks, we use kBinarySearch for repositioning - After tombstones are added or before the first ShouldDelete() invocation, the current position is set to invalid, which forces kBinarySearch to be used. - Non-iterator users (i.e., Get()) use kFullScan, which has the same behavior as before---scan the whole map for every key passed to ShouldDelete(). Closes https://github.com/facebook/rocksdb/pull/1701 Differential Revision: D4350318 Pulled By: ajkr fbshipit-source-id: 5129b76
This commit is contained in:
parent
640d724808
commit
b104b87814
@ -466,7 +466,8 @@ void CompactionIterator::NextFromInput() {
|
|||||||
} else {
|
} else {
|
||||||
// 1. new user key -OR-
|
// 1. new user key -OR-
|
||||||
// 2. different snapshot stripe
|
// 2. different snapshot stripe
|
||||||
bool should_delete = range_del_agg_->ShouldDelete(key_);
|
bool should_delete = range_del_agg_->ShouldDelete(
|
||||||
|
key_, RangeDelAggregator::RangePositioningMode::kForwardTraversal);
|
||||||
if (should_delete) {
|
if (should_delete) {
|
||||||
++iter_stats_.num_record_drop_hidden;
|
++iter_stats_.num_record_drop_hidden;
|
||||||
++iter_stats_.num_record_drop_range_del;
|
++iter_stats_.num_record_drop_range_del;
|
||||||
|
@ -412,7 +412,9 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
|
|||||||
saved_key_.SetKey(
|
saved_key_.SetKey(
|
||||||
ikey.user_key,
|
ikey.user_key,
|
||||||
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
|
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
|
||||||
if (range_del_agg_.ShouldDelete(ikey)) {
|
if (range_del_agg_.ShouldDelete(
|
||||||
|
ikey, RangeDelAggregator::RangePositioningMode::
|
||||||
|
kForwardTraversal)) {
|
||||||
// Arrange to skip all upcoming entries for this key since
|
// Arrange to skip all upcoming entries for this key since
|
||||||
// they are hidden by this deletion.
|
// they are hidden by this deletion.
|
||||||
skipping = true;
|
skipping = true;
|
||||||
@ -427,7 +429,9 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
|
|||||||
saved_key_.SetKey(
|
saved_key_.SetKey(
|
||||||
ikey.user_key,
|
ikey.user_key,
|
||||||
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
|
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
|
||||||
if (range_del_agg_.ShouldDelete(ikey)) {
|
if (range_del_agg_.ShouldDelete(
|
||||||
|
ikey, RangeDelAggregator::RangePositioningMode::
|
||||||
|
kForwardTraversal)) {
|
||||||
// Arrange to skip all upcoming entries for this key since
|
// Arrange to skip all upcoming entries for this key since
|
||||||
// they are hidden by this deletion.
|
// they are hidden by this deletion.
|
||||||
skipping = true;
|
skipping = true;
|
||||||
@ -530,7 +534,9 @@ void DBIter::MergeValuesNewToOld() {
|
|||||||
// hit the next user key, stop right here
|
// hit the next user key, stop right here
|
||||||
break;
|
break;
|
||||||
} else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
|
} else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
|
||||||
range_del_agg_.ShouldDelete(ikey)) {
|
range_del_agg_.ShouldDelete(
|
||||||
|
ikey, RangeDelAggregator::RangePositioningMode::
|
||||||
|
kForwardTraversal)) {
|
||||||
// hit a delete with the same user key, stop right here
|
// hit a delete with the same user key, stop right here
|
||||||
// iter_ is positioned after delete
|
// iter_ is positioned after delete
|
||||||
iter_->Next();
|
iter_->Next();
|
||||||
@ -599,6 +605,7 @@ void DBIter::ReverseToForward() {
|
|||||||
direction_ = kForward;
|
direction_ = kForward;
|
||||||
if (!iter_->Valid()) {
|
if (!iter_->Valid()) {
|
||||||
iter_->SeekToFirst();
|
iter_->SeekToFirst();
|
||||||
|
range_del_agg_.InvalidateTombstoneMapPositions();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -614,6 +621,7 @@ void DBIter::ReverseToBackward() {
|
|||||||
// previous key.
|
// previous key.
|
||||||
if (!iter_->Valid()) {
|
if (!iter_->Valid()) {
|
||||||
iter_->SeekToLast();
|
iter_->SeekToLast();
|
||||||
|
range_del_agg_.InvalidateTombstoneMapPositions();
|
||||||
}
|
}
|
||||||
ParsedInternalKey ikey;
|
ParsedInternalKey ikey;
|
||||||
FindParseableKey(&ikey, kReverse);
|
FindParseableKey(&ikey, kReverse);
|
||||||
@ -710,7 +718,9 @@ bool DBIter::FindValueForCurrentKey() {
|
|||||||
last_key_entry_type = ikey.type;
|
last_key_entry_type = ikey.type;
|
||||||
switch (last_key_entry_type) {
|
switch (last_key_entry_type) {
|
||||||
case kTypeValue:
|
case kTypeValue:
|
||||||
if (range_del_agg_.ShouldDelete(ikey)) {
|
if (range_del_agg_.ShouldDelete(
|
||||||
|
ikey,
|
||||||
|
RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
|
||||||
last_key_entry_type = kTypeRangeDeletion;
|
last_key_entry_type = kTypeRangeDeletion;
|
||||||
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
|
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
|
||||||
} else {
|
} else {
|
||||||
@ -727,7 +737,9 @@ bool DBIter::FindValueForCurrentKey() {
|
|||||||
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
|
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
|
||||||
break;
|
break;
|
||||||
case kTypeMerge:
|
case kTypeMerge:
|
||||||
if (range_del_agg_.ShouldDelete(ikey)) {
|
if (range_del_agg_.ShouldDelete(
|
||||||
|
ikey,
|
||||||
|
RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
|
||||||
merge_context_.Clear();
|
merge_context_.Clear();
|
||||||
last_key_entry_type = kTypeRangeDeletion;
|
last_key_entry_type = kTypeRangeDeletion;
|
||||||
last_not_merge_type = last_key_entry_type;
|
last_not_merge_type = last_key_entry_type;
|
||||||
@ -805,7 +817,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
|
|||||||
FindParseableKey(&ikey, kForward);
|
FindParseableKey(&ikey, kForward);
|
||||||
|
|
||||||
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
|
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
|
||||||
range_del_agg_.ShouldDelete(ikey)) {
|
range_del_agg_.ShouldDelete(
|
||||||
|
ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
|
||||||
valid_ = false;
|
valid_ = false;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -820,9 +833,12 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
|
|||||||
// in operands
|
// in operands
|
||||||
current_entry_is_merged_ = true;
|
current_entry_is_merged_ = true;
|
||||||
merge_context_.Clear();
|
merge_context_.Clear();
|
||||||
while (iter_->Valid() &&
|
while (
|
||||||
|
iter_->Valid() &&
|
||||||
user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
|
user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
|
||||||
ikey.type == kTypeMerge && !range_del_agg_.ShouldDelete(ikey)) {
|
ikey.type == kTypeMerge &&
|
||||||
|
!range_del_agg_.ShouldDelete(
|
||||||
|
ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
|
||||||
merge_context_.PushOperand(iter_->value(),
|
merge_context_.PushOperand(iter_->value(),
|
||||||
iter_->IsValuePinned() /* operand_pinned */);
|
iter_->IsValuePinned() /* operand_pinned */);
|
||||||
PERF_COUNTER_ADD(internal_merge_count, 1);
|
PERF_COUNTER_ADD(internal_merge_count, 1);
|
||||||
@ -834,7 +850,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
|
|||||||
if (!iter_->Valid() ||
|
if (!iter_->Valid() ||
|
||||||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
|
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
|
||||||
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
|
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
|
||||||
range_del_agg_.ShouldDelete(ikey)) {
|
range_del_agg_.ShouldDelete(
|
||||||
|
ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
|
||||||
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
|
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
|
||||||
nullptr, merge_context_.GetOperands(),
|
nullptr, merge_context_.GetOperands(),
|
||||||
&saved_value_, logger_, statistics_, env_,
|
&saved_value_, logger_, statistics_, env_,
|
||||||
@ -929,12 +946,13 @@ void DBIter::Seek(const Slice& target) {
|
|||||||
StopWatch sw(env_, statistics_, DB_SEEK);
|
StopWatch sw(env_, statistics_, DB_SEEK);
|
||||||
ReleaseTempPinnedData();
|
ReleaseTempPinnedData();
|
||||||
saved_key_.Clear();
|
saved_key_.Clear();
|
||||||
// now savved_key is used to store internal key.
|
// now saved_key is used to store internal key.
|
||||||
saved_key_.SetInternalKey(target, sequence_);
|
saved_key_.SetInternalKey(target, sequence_);
|
||||||
|
|
||||||
{
|
{
|
||||||
PERF_TIMER_GUARD(seek_internal_seek_time);
|
PERF_TIMER_GUARD(seek_internal_seek_time);
|
||||||
iter_->Seek(saved_key_.GetKey());
|
iter_->Seek(saved_key_.GetKey());
|
||||||
|
range_del_agg_.InvalidateTombstoneMapPositions();
|
||||||
}
|
}
|
||||||
RecordTick(statistics_, NUMBER_DB_SEEK);
|
RecordTick(statistics_, NUMBER_DB_SEEK);
|
||||||
if (iter_->Valid()) {
|
if (iter_->Valid()) {
|
||||||
@ -974,6 +992,7 @@ void DBIter::SeekForPrev(const Slice& target) {
|
|||||||
{
|
{
|
||||||
PERF_TIMER_GUARD(seek_internal_seek_time);
|
PERF_TIMER_GUARD(seek_internal_seek_time);
|
||||||
iter_->SeekForPrev(saved_key_.GetKey());
|
iter_->SeekForPrev(saved_key_.GetKey());
|
||||||
|
range_del_agg_.InvalidateTombstoneMapPositions();
|
||||||
}
|
}
|
||||||
|
|
||||||
RecordTick(statistics_, NUMBER_DB_SEEK);
|
RecordTick(statistics_, NUMBER_DB_SEEK);
|
||||||
@ -1015,6 +1034,7 @@ void DBIter::SeekToFirst() {
|
|||||||
{
|
{
|
||||||
PERF_TIMER_GUARD(seek_internal_seek_time);
|
PERF_TIMER_GUARD(seek_internal_seek_time);
|
||||||
iter_->SeekToFirst();
|
iter_->SeekToFirst();
|
||||||
|
range_del_agg_.InvalidateTombstoneMapPositions();
|
||||||
}
|
}
|
||||||
|
|
||||||
RecordTick(statistics_, NUMBER_DB_SEEK);
|
RecordTick(statistics_, NUMBER_DB_SEEK);
|
||||||
@ -1048,12 +1068,14 @@ void DBIter::SeekToLast() {
|
|||||||
{
|
{
|
||||||
PERF_TIMER_GUARD(seek_internal_seek_time);
|
PERF_TIMER_GUARD(seek_internal_seek_time);
|
||||||
iter_->SeekToLast();
|
iter_->SeekToLast();
|
||||||
|
range_del_agg_.InvalidateTombstoneMapPositions();
|
||||||
}
|
}
|
||||||
// When the iterate_upper_bound is set to a value,
|
// When the iterate_upper_bound is set to a value,
|
||||||
// it will seek to the last key before the
|
// it will seek to the last key before the
|
||||||
// ReadOptions.iterate_upper_bound
|
// ReadOptions.iterate_upper_bound
|
||||||
if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
|
if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
|
||||||
SeekForPrev(*iterate_upper_bound_);
|
SeekForPrev(*iterate_upper_bound_);
|
||||||
|
range_del_agg_.InvalidateTombstoneMapPositions();
|
||||||
if (!Valid()) {
|
if (!Valid()) {
|
||||||
return;
|
return;
|
||||||
} else if (user_comparator_->Equal(*iterate_upper_bound_, key())) {
|
} else if (user_comparator_->Equal(*iterate_upper_bound_, key())) {
|
||||||
|
@ -192,7 +192,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
|
|||||||
? CompactionFilter::Decision::kKeep
|
? CompactionFilter::Decision::kKeep
|
||||||
: FilterMerge(orig_ikey.user_key, value_slice);
|
: FilterMerge(orig_ikey.user_key, value_slice);
|
||||||
if (range_del_agg != nullptr &&
|
if (range_del_agg != nullptr &&
|
||||||
range_del_agg->ShouldDelete(iter->key()) &&
|
|
||||||
|
range_del_agg->ShouldDelete(
|
||||||
|
iter->key(),
|
||||||
|
RangeDelAggregator::RangePositioningMode::kForwardTraversal) &&
|
||||||
filter != CompactionFilter::Decision::kRemoveAndSkipUntil) {
|
filter != CompactionFilter::Decision::kRemoveAndSkipUntil) {
|
||||||
filter = CompactionFilter::Decision::kRemove;
|
filter = CompactionFilter::Decision::kRemove;
|
||||||
}
|
}
|
||||||
|
@ -32,16 +32,19 @@ void RangeDelAggregator::InitRep(const std::vector<SequenceNumber>& snapshots) {
|
|||||||
for (auto snapshot : snapshots) {
|
for (auto snapshot : snapshots) {
|
||||||
rep_->stripe_map_.emplace(
|
rep_->stripe_map_.emplace(
|
||||||
snapshot,
|
snapshot,
|
||||||
TombstoneMap(stl_wrappers::LessOfComparator(icmp_.user_comparator())));
|
PositionalTombstoneMap(TombstoneMap(
|
||||||
|
stl_wrappers::LessOfComparator(icmp_.user_comparator()))));
|
||||||
}
|
}
|
||||||
// Data newer than any snapshot falls in this catch-all stripe
|
// Data newer than any snapshot falls in this catch-all stripe
|
||||||
rep_->stripe_map_.emplace(
|
rep_->stripe_map_.emplace(
|
||||||
kMaxSequenceNumber,
|
kMaxSequenceNumber,
|
||||||
TombstoneMap(stl_wrappers::LessOfComparator(icmp_.user_comparator())));
|
PositionalTombstoneMap(TombstoneMap(
|
||||||
|
stl_wrappers::LessOfComparator(icmp_.user_comparator()))));
|
||||||
rep_->pinned_iters_mgr_.StartPinning();
|
rep_->pinned_iters_mgr_.StartPinning();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RangeDelAggregator::ShouldDelete(const Slice& internal_key) {
|
bool RangeDelAggregator::ShouldDelete(
|
||||||
|
const Slice& internal_key, RangeDelAggregator::RangePositioningMode mode) {
|
||||||
if (rep_ == nullptr) {
|
if (rep_ == nullptr) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -49,36 +52,96 @@ bool RangeDelAggregator::ShouldDelete(const Slice& internal_key) {
|
|||||||
if (!ParseInternalKey(internal_key, &parsed)) {
|
if (!ParseInternalKey(internal_key, &parsed)) {
|
||||||
assert(false);
|
assert(false);
|
||||||
}
|
}
|
||||||
return ShouldDelete(parsed);
|
return ShouldDelete(parsed, mode);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) {
|
bool RangeDelAggregator::ShouldDelete(
|
||||||
|
const ParsedInternalKey& parsed,
|
||||||
|
RangeDelAggregator::RangePositioningMode mode) {
|
||||||
assert(IsValueType(parsed.type));
|
assert(IsValueType(parsed.type));
|
||||||
if (rep_ == nullptr) {
|
if (rep_ == nullptr) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
const auto& tombstone_map = GetTombstoneMap(parsed.sequence);
|
auto& positional_tombstone_map = GetPositionalTombstoneMap(parsed.sequence);
|
||||||
if (collapse_deletions_) {
|
const auto& tombstone_map = positional_tombstone_map.raw_map;
|
||||||
auto iter = tombstone_map.upper_bound(parsed.user_key);
|
if (tombstone_map.empty()) {
|
||||||
if (iter == tombstone_map.begin()) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
--iter;
|
auto& tombstone_map_iter = positional_tombstone_map.iter;
|
||||||
return parsed.sequence < iter->second.seq_;
|
if (tombstone_map_iter == tombstone_map.end() &&
|
||||||
|
(mode == kForwardTraversal || mode == kBackwardTraversal)) {
|
||||||
|
// invalid (e.g., if AddTombstones() changed the deletions), so need to
|
||||||
|
// reseek
|
||||||
|
mode = kBinarySearch;
|
||||||
}
|
}
|
||||||
for (const auto& start_key_and_tombstone : tombstone_map) {
|
switch (mode) {
|
||||||
const auto& tombstone = start_key_and_tombstone.second;
|
case kFullScan:
|
||||||
|
assert(!collapse_deletions_);
|
||||||
|
// The maintained state (PositionalTombstoneMap::iter) isn't useful when
|
||||||
|
// we linear scan from the beginning each time, but we maintain it anyways
|
||||||
|
// for consistency.
|
||||||
|
tombstone_map_iter = tombstone_map.begin();
|
||||||
|
while (tombstone_map_iter != tombstone_map.end()) {
|
||||||
|
const auto& tombstone = tombstone_map_iter->second;
|
||||||
if (icmp_.user_comparator()->Compare(parsed.user_key,
|
if (icmp_.user_comparator()->Compare(parsed.user_key,
|
||||||
tombstone.start_key_) < 0) {
|
tombstone.start_key_) < 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (parsed.sequence < tombstone.seq_ &&
|
if (parsed.sequence < tombstone.seq_ &&
|
||||||
icmp_.user_comparator()->Compare(parsed.user_key, tombstone.end_key_) <
|
icmp_.user_comparator()->Compare(parsed.user_key,
|
||||||
0) {
|
tombstone.end_key_) < 0) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
++tombstone_map_iter;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
case kForwardTraversal:
|
||||||
|
assert(collapse_deletions_ && tombstone_map_iter != tombstone_map.end());
|
||||||
|
if (tombstone_map_iter == tombstone_map.begin() &&
|
||||||
|
icmp_.user_comparator()->Compare(parsed.user_key,
|
||||||
|
tombstone_map_iter->first) < 0) {
|
||||||
|
// before start of deletion intervals
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
while (std::next(tombstone_map_iter) != tombstone_map.end() &&
|
||||||
|
icmp_.user_comparator()->Compare(
|
||||||
|
std::next(tombstone_map_iter)->first, parsed.user_key) <= 0) {
|
||||||
|
++tombstone_map_iter;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case kBackwardTraversal:
|
||||||
|
assert(collapse_deletions_ && tombstone_map_iter != tombstone_map.end());
|
||||||
|
while (tombstone_map_iter != tombstone_map.begin() &&
|
||||||
|
icmp_.user_comparator()->Compare(parsed.user_key,
|
||||||
|
tombstone_map_iter->first) < 0) {
|
||||||
|
--tombstone_map_iter;
|
||||||
|
}
|
||||||
|
if (tombstone_map_iter == tombstone_map.begin() &&
|
||||||
|
icmp_.user_comparator()->Compare(parsed.user_key,
|
||||||
|
tombstone_map_iter->first) < 0) {
|
||||||
|
// before start of deletion intervals
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case kBinarySearch:
|
||||||
|
assert(collapse_deletions_);
|
||||||
|
tombstone_map_iter =
|
||||||
|
tombstone_map.upper_bound(parsed.user_key);
|
||||||
|
if (tombstone_map_iter == tombstone_map.begin()) {
|
||||||
|
// before start of deletion intervals
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
--tombstone_map_iter;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
assert(mode != kFullScan);
|
||||||
|
assert(tombstone_map_iter != tombstone_map.end() &&
|
||||||
|
icmp_.user_comparator()->Compare(tombstone_map_iter->first,
|
||||||
|
parsed.user_key) <= 0);
|
||||||
|
assert(std::next(tombstone_map_iter) == tombstone_map.end() ||
|
||||||
|
icmp_.user_comparator()->Compare(
|
||||||
|
parsed.user_key, std::next(tombstone_map_iter)->first) < 0);
|
||||||
|
return parsed.sequence < tombstone_map_iter->second.seq_;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RangeDelAggregator::ShouldAddTombstones(
|
bool RangeDelAggregator::ShouldAddTombstones(
|
||||||
@ -97,7 +160,7 @@ bool RangeDelAggregator::ShouldAddTombstones(
|
|||||||
++stripe_map_iter;
|
++stripe_map_iter;
|
||||||
}
|
}
|
||||||
while (stripe_map_iter != rep_->stripe_map_.end()) {
|
while (stripe_map_iter != rep_->stripe_map_.end()) {
|
||||||
if (!stripe_map_iter->second.empty()) {
|
if (!stripe_map_iter->second.raw_map.empty()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
++stripe_map_iter;
|
++stripe_map_iter;
|
||||||
@ -116,6 +179,8 @@ Status RangeDelAggregator::AddTombstones(
|
|||||||
if (first_iter) {
|
if (first_iter) {
|
||||||
if (rep_ == nullptr) {
|
if (rep_ == nullptr) {
|
||||||
InitRep({upper_bound_});
|
InitRep({upper_bound_});
|
||||||
|
} else {
|
||||||
|
InvalidateTombstoneMapPositions();
|
||||||
}
|
}
|
||||||
first_iter = false;
|
first_iter = false;
|
||||||
}
|
}
|
||||||
@ -133,8 +198,19 @@ Status RangeDelAggregator::AddTombstones(
|
|||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RangeDelAggregator::InvalidateTombstoneMapPositions() {
|
||||||
|
if (rep_ == nullptr) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (auto stripe_map_iter = rep_->stripe_map_.begin();
|
||||||
|
stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) {
|
||||||
|
stripe_map_iter->second.iter = stripe_map_iter->second.raw_map.end();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Status RangeDelAggregator::AddTombstone(RangeTombstone tombstone) {
|
Status RangeDelAggregator::AddTombstone(RangeTombstone tombstone) {
|
||||||
auto& tombstone_map = GetTombstoneMap(tombstone.seq_);
|
auto& positional_tombstone_map = GetPositionalTombstoneMap(tombstone.seq_);
|
||||||
|
auto& tombstone_map = positional_tombstone_map.raw_map;
|
||||||
if (collapse_deletions_) {
|
if (collapse_deletions_) {
|
||||||
// In collapsed mode, we only fill the seq_ field in the TombstoneMap's
|
// In collapsed mode, we only fill the seq_ field in the TombstoneMap's
|
||||||
// values. The end_key is unneeded because we assume the tombstone extends
|
// values. The end_key is unneeded because we assume the tombstone extends
|
||||||
@ -286,8 +362,8 @@ Status RangeDelAggregator::AddTombstone(RangeTombstone tombstone) {
|
|||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
RangeDelAggregator::TombstoneMap& RangeDelAggregator::GetTombstoneMap(
|
RangeDelAggregator::PositionalTombstoneMap&
|
||||||
SequenceNumber seq) {
|
RangeDelAggregator::GetPositionalTombstoneMap(SequenceNumber seq) {
|
||||||
assert(rep_ != nullptr);
|
assert(rep_ != nullptr);
|
||||||
// The stripe includes seqnum for the snapshot above and excludes seqnum for
|
// The stripe includes seqnum for the snapshot above and excludes seqnum for
|
||||||
// the snapshot below.
|
// the snapshot below.
|
||||||
@ -321,12 +397,12 @@ void RangeDelAggregator::AddToBuilder(
|
|||||||
if (bottommost_level) {
|
if (bottommost_level) {
|
||||||
// TODO(andrewkr): these are counted for each compaction output file, so
|
// TODO(andrewkr): these are counted for each compaction output file, so
|
||||||
// lots of double-counting.
|
// lots of double-counting.
|
||||||
if (!stripe_map_iter->second.empty()) {
|
if (!stripe_map_iter->second.raw_map.empty()) {
|
||||||
range_del_out_stats->num_range_del_drop_obsolete +=
|
range_del_out_stats->num_range_del_drop_obsolete +=
|
||||||
static_cast<int64_t>(stripe_map_iter->second.size()) -
|
static_cast<int64_t>(stripe_map_iter->second.raw_map.size()) -
|
||||||
(collapse_deletions_ ? 1 : 0);
|
(collapse_deletions_ ? 1 : 0);
|
||||||
range_del_out_stats->num_record_drop_obsolete +=
|
range_del_out_stats->num_record_drop_obsolete +=
|
||||||
static_cast<int64_t>(stripe_map_iter->second.size()) -
|
static_cast<int64_t>(stripe_map_iter->second.raw_map.size()) -
|
||||||
(collapse_deletions_ ? 1 : 0);
|
(collapse_deletions_ ? 1 : 0);
|
||||||
}
|
}
|
||||||
// For the bottommost level, keys covered by tombstones in the first
|
// For the bottommost level, keys covered by tombstones in the first
|
||||||
@ -338,13 +414,13 @@ void RangeDelAggregator::AddToBuilder(
|
|||||||
// insert them into a std::map on the read path.
|
// insert them into a std::map on the read path.
|
||||||
bool first_added = false;
|
bool first_added = false;
|
||||||
while (stripe_map_iter != rep_->stripe_map_.end()) {
|
while (stripe_map_iter != rep_->stripe_map_.end()) {
|
||||||
for (auto tombstone_map_iter = stripe_map_iter->second.begin();
|
for (auto tombstone_map_iter = stripe_map_iter->second.raw_map.begin();
|
||||||
tombstone_map_iter != stripe_map_iter->second.end();
|
tombstone_map_iter != stripe_map_iter->second.raw_map.end();
|
||||||
++tombstone_map_iter) {
|
++tombstone_map_iter) {
|
||||||
RangeTombstone tombstone;
|
RangeTombstone tombstone;
|
||||||
if (collapse_deletions_) {
|
if (collapse_deletions_) {
|
||||||
auto next_tombstone_map_iter = std::next(tombstone_map_iter);
|
auto next_tombstone_map_iter = std::next(tombstone_map_iter);
|
||||||
if (next_tombstone_map_iter == stripe_map_iter->second.end()) {
|
if (next_tombstone_map_iter == stripe_map_iter->second.raw_map.end()) {
|
||||||
// it's the sentinel tombstone
|
// it's the sentinel tombstone
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -432,7 +508,7 @@ bool RangeDelAggregator::IsEmpty() {
|
|||||||
}
|
}
|
||||||
for (auto stripe_map_iter = rep_->stripe_map_.begin();
|
for (auto stripe_map_iter = rep_->stripe_map_.begin();
|
||||||
stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) {
|
stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) {
|
||||||
if (!stripe_map_iter->second.empty()) {
|
if (!stripe_map_iter->second.raw_map.empty()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,10 +52,31 @@ class RangeDelAggregator {
|
|||||||
SequenceNumber upper_bound,
|
SequenceNumber upper_bound,
|
||||||
bool collapse_deletions = false);
|
bool collapse_deletions = false);
|
||||||
|
|
||||||
|
// We maintain position in the tombstone map across calls to ShouldDelete. The
|
||||||
|
// caller may wish to specify a mode to optimize positioning the iterator
|
||||||
|
// during the next call to ShouldDelete. The non-kFullScan modes are only
|
||||||
|
// available when deletion collapsing is enabled.
|
||||||
|
//
|
||||||
|
// For example, if we invoke Next() on an iterator, kForwardTraversal should
|
||||||
|
// be specified to advance one-by-one through deletions until one is found
|
||||||
|
// with its interval containing the key. This will typically be faster than
|
||||||
|
// doing a full binary search (kBinarySearch).
|
||||||
|
enum RangePositioningMode {
|
||||||
|
kFullScan, // used iff collapse_deletions_ == false
|
||||||
|
kForwardTraversal,
|
||||||
|
kBackwardTraversal,
|
||||||
|
kBinarySearch,
|
||||||
|
};
|
||||||
|
|
||||||
// Returns whether the key should be deleted, which is the case when it is
|
// Returns whether the key should be deleted, which is the case when it is
|
||||||
// covered by a range tombstone residing in the same snapshot stripe.
|
// covered by a range tombstone residing in the same snapshot stripe.
|
||||||
bool ShouldDelete(const ParsedInternalKey& parsed);
|
// @param mode If collapse_deletions_ is true, this dictates how we will find
|
||||||
bool ShouldDelete(const Slice& internal_key);
|
// the deletion whose interval contains this key. Otherwise, its
|
||||||
|
// value must be kFullScan indicating linear scan from beginning..
|
||||||
|
bool ShouldDelete(const ParsedInternalKey& parsed,
|
||||||
|
RangePositioningMode mode = kFullScan);
|
||||||
|
bool ShouldDelete(const Slice& internal_key,
|
||||||
|
RangePositioningMode mode = kFullScan);
|
||||||
bool ShouldAddTombstones(bool bottommost_level = false);
|
bool ShouldAddTombstones(bool bottommost_level = false);
|
||||||
|
|
||||||
// Adds tombstones to the tombstone aggregation structure maintained by this
|
// Adds tombstones to the tombstone aggregation structure maintained by this
|
||||||
@ -63,6 +84,13 @@ class RangeDelAggregator {
|
|||||||
// @return non-OK status if any of the tombstone keys are corrupted.
|
// @return non-OK status if any of the tombstone keys are corrupted.
|
||||||
Status AddTombstones(std::unique_ptr<InternalIterator> input);
|
Status AddTombstones(std::unique_ptr<InternalIterator> input);
|
||||||
|
|
||||||
|
// Resets iterators maintained across calls to ShouldDelete(). This may be
|
||||||
|
// called when the tombstones change, or the owner may call explicitly, e.g.,
|
||||||
|
// if it's an iterator that just seeked to an arbitrary position. The effect
|
||||||
|
// of invalidation is that the following call to ShouldDelete() will binary
|
||||||
|
// search for its tombstone.
|
||||||
|
void InvalidateTombstoneMapPositions();
|
||||||
|
|
||||||
// Writes tombstones covering a range to a table builder.
|
// Writes tombstones covering a range to a table builder.
|
||||||
// @param extend_before_min_key If true, the range of tombstones to be added
|
// @param extend_before_min_key If true, the range of tombstones to be added
|
||||||
// to the TableBuilder starts from the beginning of the key-range;
|
// to the TableBuilder starts from the beginning of the key-range;
|
||||||
@ -93,9 +121,23 @@ class RangeDelAggregator {
|
|||||||
// Maps tombstone user start key -> tombstone object
|
// Maps tombstone user start key -> tombstone object
|
||||||
typedef std::multimap<Slice, RangeTombstone, stl_wrappers::LessOfComparator>
|
typedef std::multimap<Slice, RangeTombstone, stl_wrappers::LessOfComparator>
|
||||||
TombstoneMap;
|
TombstoneMap;
|
||||||
|
// Also maintains position in TombstoneMap last seen by ShouldDelete(). The
|
||||||
|
// end iterator indicates invalidation (e.g., if AddTombstones() changes the
|
||||||
|
// underlying map). End iterator cannot be invalidated.
|
||||||
|
struct PositionalTombstoneMap {
|
||||||
|
explicit PositionalTombstoneMap(TombstoneMap _raw_map)
|
||||||
|
: raw_map(std::move(_raw_map)), iter(raw_map.end()) {}
|
||||||
|
PositionalTombstoneMap(const PositionalTombstoneMap&) = delete;
|
||||||
|
PositionalTombstoneMap(PositionalTombstoneMap&& other)
|
||||||
|
: raw_map(std::move(other.raw_map)), iter(raw_map.end()) {}
|
||||||
|
|
||||||
|
TombstoneMap raw_map;
|
||||||
|
TombstoneMap::const_iterator iter;
|
||||||
|
};
|
||||||
|
|
||||||
// Maps snapshot seqnum -> map of tombstones that fall in that stripe, i.e.,
|
// Maps snapshot seqnum -> map of tombstones that fall in that stripe, i.e.,
|
||||||
// their seqnums are greater than the next smaller snapshot's seqnum.
|
// their seqnums are greater than the next smaller snapshot's seqnum.
|
||||||
typedef std::map<SequenceNumber, TombstoneMap> StripeMap;
|
typedef std::map<SequenceNumber, PositionalTombstoneMap> StripeMap;
|
||||||
|
|
||||||
struct Rep {
|
struct Rep {
|
||||||
StripeMap stripe_map_;
|
StripeMap stripe_map_;
|
||||||
@ -106,7 +148,7 @@ class RangeDelAggregator {
|
|||||||
// once the first range deletion is encountered.
|
// once the first range deletion is encountered.
|
||||||
void InitRep(const std::vector<SequenceNumber>& snapshots);
|
void InitRep(const std::vector<SequenceNumber>& snapshots);
|
||||||
|
|
||||||
TombstoneMap& GetTombstoneMap(SequenceNumber seq);
|
PositionalTombstoneMap& GetPositionalTombstoneMap(SequenceNumber seq);
|
||||||
Status AddTombstone(RangeTombstone tombstone);
|
Status AddTombstone(RangeTombstone tombstone);
|
||||||
|
|
||||||
SequenceNumber upper_bound_;
|
SequenceNumber upper_bound_;
|
||||||
|
@ -51,10 +51,14 @@ void VerifyRangeDels(const std::vector<RangeTombstone>& range_dels,
|
|||||||
parsed_key.user_key = expected_point.begin;
|
parsed_key.user_key = expected_point.begin;
|
||||||
parsed_key.sequence = expected_point.seq;
|
parsed_key.sequence = expected_point.seq;
|
||||||
parsed_key.type = kTypeValue;
|
parsed_key.type = kTypeValue;
|
||||||
ASSERT_FALSE(range_del_agg.ShouldDelete(parsed_key));
|
ASSERT_FALSE(range_del_agg.ShouldDelete(
|
||||||
|
parsed_key,
|
||||||
|
RangeDelAggregator::RangePositioningMode::kForwardTraversal));
|
||||||
if (parsed_key.sequence > 0) {
|
if (parsed_key.sequence > 0) {
|
||||||
--parsed_key.sequence;
|
--parsed_key.sequence;
|
||||||
ASSERT_TRUE(range_del_agg.ShouldDelete(parsed_key));
|
ASSERT_TRUE(range_del_agg.ShouldDelete(
|
||||||
|
parsed_key,
|
||||||
|
RangeDelAggregator::RangePositioningMode::kForwardTraversal));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user