Clean up FragmentedRangeTombstoneList (#4692)
Summary: Removed `one_time_use` flag, which removed the need for some tests, and changed all `NewRangeTombstoneIterator` methods to return `FragmentedRangeTombstoneIterators`. These changes also led to removing `RangeDelAggregatorV2::AddUnfragmentedTombstones` and one of the `MemTableListVersion::AddRangeTombstoneIterators` methods. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4692 Differential Revision: D13106570 Pulled By: abhimadan fbshipit-source-id: cbab5432d7fc2d9cdfd8d9d40361a1bffaa8f845
This commit is contained in:
parent
7125e24619
commit
8fe1e06ca0
@ -943,25 +943,15 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
|
||||
super_version->imm->AddIterators(read_opts, &merge_iter_builder);
|
||||
ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
|
||||
|
||||
std::vector<InternalIterator*> memtable_range_del_iters;
|
||||
auto read_seq = super_version->current->version_set()->LastSequence();
|
||||
RangeDelAggregatorV2 range_del_agg(&internal_comparator_, read_seq);
|
||||
auto* active_range_del_iter =
|
||||
super_version->mem->NewRangeTombstoneIterator(read_opts);
|
||||
if (active_range_del_iter != nullptr) {
|
||||
memtable_range_del_iters.push_back(active_range_del_iter);
|
||||
}
|
||||
super_version->imm->AddRangeTombstoneIterators(read_opts,
|
||||
&memtable_range_del_iters);
|
||||
RangeDelAggregatorV2 range_del_agg(&internal_comparator_,
|
||||
kMaxSequenceNumber /* upper_bound */);
|
||||
{
|
||||
std::unique_ptr<InternalIterator> memtable_range_del_iter(
|
||||
NewMergingIterator(&internal_comparator_,
|
||||
memtable_range_del_iters.empty()
|
||||
? nullptr
|
||||
: &memtable_range_del_iters[0],
|
||||
static_cast<int>(memtable_range_del_iters.size())));
|
||||
range_del_agg.AddUnfragmentedTombstones(std::move(memtable_range_del_iter));
|
||||
}
|
||||
super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
|
||||
range_del_agg.AddTombstones(
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
|
||||
super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
|
||||
&range_del_agg);
|
||||
|
||||
Status status;
|
||||
for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
|
||||
auto* vstorage = super_version->current->storage_info();
|
||||
|
@ -342,8 +342,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
|
||||
InternalKeyComparator icmp(options.comparator);
|
||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
||||
kMaxSequenceNumber /* upper_bound */);
|
||||
ScopedArenaIterator iter(
|
||||
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1]));
|
||||
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
|
||||
&arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
|
||||
iter->SeekToFirst();
|
||||
ASSERT_OK(iter->status());
|
||||
while (iter->Valid()) {
|
||||
@ -432,8 +432,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
|
||||
InternalKeyComparator icmp(options.comparator);
|
||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
||||
kMaxSequenceNumber /* upper_bound */);
|
||||
ScopedArenaIterator iter(
|
||||
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1]));
|
||||
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
|
||||
&arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
|
||||
iter->SeekToFirst();
|
||||
ASSERT_OK(iter->status());
|
||||
while (iter->Valid()) {
|
||||
@ -650,8 +650,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
|
||||
InternalKeyComparator icmp(options.comparator);
|
||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
||||
kMaxSequenceNumber /* snapshots */);
|
||||
ScopedArenaIterator iter(
|
||||
dbfull()->NewInternalIterator(&arena, &range_del_agg));
|
||||
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
|
||||
&arena, &range_del_agg, kMaxSequenceNumber));
|
||||
iter->SeekToFirst();
|
||||
ASSERT_OK(iter->status());
|
||||
while (iter->Valid()) {
|
||||
|
@ -46,6 +46,7 @@
|
||||
#include "db/merge_context.h"
|
||||
#include "db/merge_helper.h"
|
||||
#include "db/range_del_aggregator.h"
|
||||
#include "db/range_tombstone_fragmenter.h"
|
||||
#include "db/table_cache.h"
|
||||
#include "db/table_properties_collector.h"
|
||||
#include "db/transaction_log_impl.h"
|
||||
@ -1032,7 +1033,7 @@ bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
|
||||
}
|
||||
|
||||
InternalIterator* DBImpl::NewInternalIterator(
|
||||
Arena* arena, RangeDelAggregatorV2* range_del_agg,
|
||||
Arena* arena, RangeDelAggregatorV2* range_del_agg, SequenceNumber sequence,
|
||||
ColumnFamilyHandle* column_family) {
|
||||
ColumnFamilyData* cfd;
|
||||
if (column_family == nullptr) {
|
||||
@ -1046,8 +1047,8 @@ InternalIterator* DBImpl::NewInternalIterator(
|
||||
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
|
||||
mutex_.Unlock();
|
||||
ReadOptions roptions;
|
||||
return NewInternalIterator(roptions, cfd, super_version, arena,
|
||||
range_del_agg);
|
||||
return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg,
|
||||
sequence);
|
||||
}
|
||||
|
||||
void DBImpl::SchedulePurge() {
|
||||
@ -1152,7 +1153,7 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
|
||||
InternalIterator* DBImpl::NewInternalIterator(
|
||||
const ReadOptions& read_options, ColumnFamilyData* cfd,
|
||||
SuperVersion* super_version, Arena* arena,
|
||||
RangeDelAggregatorV2* range_del_agg) {
|
||||
RangeDelAggregatorV2* range_del_agg, SequenceNumber sequence) {
|
||||
InternalIterator* internal_iter;
|
||||
assert(arena != nullptr);
|
||||
assert(range_del_agg != nullptr);
|
||||
@ -1164,12 +1165,12 @@ InternalIterator* DBImpl::NewInternalIterator(
|
||||
// Collect iterator for mutable mem
|
||||
merge_iter_builder.AddIterator(
|
||||
super_version->mem->NewIterator(read_options, arena));
|
||||
std::unique_ptr<InternalIterator> range_del_iter;
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
|
||||
Status s;
|
||||
if (!read_options.ignore_range_deletions) {
|
||||
range_del_iter.reset(
|
||||
super_version->mem->NewRangeTombstoneIterator(read_options));
|
||||
range_del_agg->AddUnfragmentedTombstones(std::move(range_del_iter));
|
||||
super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
|
||||
range_del_agg->AddTombstones(std::move(range_del_iter));
|
||||
}
|
||||
// Collect all needed child iterators for immutable memtables
|
||||
if (s.ok()) {
|
||||
@ -1841,7 +1842,7 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
|
||||
|
||||
InternalIterator* internal_iter =
|
||||
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
|
||||
db_iter->GetRangeDelAggregator());
|
||||
db_iter->GetRangeDelAggregator(), snapshot);
|
||||
db_iter->SetIterUnderDBIter(internal_iter);
|
||||
|
||||
return db_iter;
|
||||
|
@ -375,7 +375,7 @@ class DBImpl : public DB {
|
||||
// The returned iterator should be deleted when no longer needed.
|
||||
InternalIterator* NewInternalIterator(
|
||||
Arena* arena, RangeDelAggregatorV2* range_del_agg,
|
||||
ColumnFamilyHandle* column_family = nullptr);
|
||||
SequenceNumber sequence, ColumnFamilyHandle* column_family = nullptr);
|
||||
|
||||
LogsWithPrepTracker* logs_with_prep_tracker() {
|
||||
return &logs_with_prep_tracker_;
|
||||
@ -582,7 +582,8 @@ class DBImpl : public DB {
|
||||
ColumnFamilyData* cfd,
|
||||
SuperVersion* super_version,
|
||||
Arena* arena,
|
||||
RangeDelAggregatorV2* range_del_agg);
|
||||
RangeDelAggregatorV2* range_del_agg,
|
||||
SequenceNumber sequence);
|
||||
|
||||
// hollow transactions shell used for recovery.
|
||||
// these will then be passed to TransactionDB so that
|
||||
|
@ -997,7 +997,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||
s = BuildTable(
|
||||
dbname_, env_, *cfd->ioptions(), mutable_cf_options,
|
||||
env_options_for_compaction_, cfd->table_cache(), iter.get(),
|
||||
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
|
||||
std::unique_ptr<InternalIterator>(
|
||||
mem->NewRangeTombstoneIterator(ro, versions_->LastSequence())),
|
||||
&meta, cfd->internal_comparator(),
|
||||
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
|
||||
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
|
||||
|
@ -72,18 +72,20 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
|
||||
auto cfd = cfh->cfd();
|
||||
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
|
||||
SequenceNumber latest_snapshot = versions_->LastSequence();
|
||||
SequenceNumber read_seq =
|
||||
read_options.snapshot != nullptr
|
||||
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
|
||||
->number_
|
||||
: latest_snapshot;
|
||||
ReadCallback* read_callback = nullptr; // No read callback provided.
|
||||
auto db_iter = NewArenaWrappedDbIterator(
|
||||
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
|
||||
(read_options.snapshot != nullptr
|
||||
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
|
||||
->number_
|
||||
: latest_snapshot),
|
||||
read_seq,
|
||||
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
|
||||
super_version->version_number, read_callback);
|
||||
auto internal_iter =
|
||||
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
|
||||
db_iter->GetRangeDelAggregator());
|
||||
db_iter->GetRangeDelAggregator(), read_seq);
|
||||
db_iter->SetIterUnderDBIter(internal_iter);
|
||||
return db_iter;
|
||||
}
|
||||
@ -99,21 +101,22 @@ Status DBImplReadOnly::NewIterators(
|
||||
iterators->clear();
|
||||
iterators->reserve(column_families.size());
|
||||
SequenceNumber latest_snapshot = versions_->LastSequence();
|
||||
SequenceNumber read_seq =
|
||||
read_options.snapshot != nullptr
|
||||
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
|
||||
->number_
|
||||
: latest_snapshot;
|
||||
|
||||
for (auto cfh : column_families) {
|
||||
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
|
||||
auto* sv = cfd->GetSuperVersion()->Ref();
|
||||
auto* db_iter = NewArenaWrappedDbIterator(
|
||||
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
|
||||
(read_options.snapshot != nullptr
|
||||
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
|
||||
->number_
|
||||
: latest_snapshot),
|
||||
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
|
||||
sv->mutable_cf_options.max_sequential_skip_in_iterations,
|
||||
sv->version_number, read_callback);
|
||||
auto* internal_iter =
|
||||
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
|
||||
db_iter->GetRangeDelAggregator());
|
||||
db_iter->GetRangeDelAggregator(), read_seq);
|
||||
db_iter->SetIterUnderDBIter(internal_iter);
|
||||
iterators->push_back(db_iter);
|
||||
}
|
||||
|
@ -1555,7 +1555,8 @@ Status ArenaWrappedDBIter::Refresh() {
|
||||
allow_refresh_);
|
||||
|
||||
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
|
||||
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator());
|
||||
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
|
||||
latest_seq);
|
||||
SetIterUnderDBIter(internal_iter);
|
||||
} else {
|
||||
db_iter_->set_sequence(latest_seq);
|
||||
|
@ -818,10 +818,11 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
|
||||
kMaxSequenceNumber /* upper_bound */);
|
||||
ScopedArenaIterator iter;
|
||||
if (cf == 0) {
|
||||
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg));
|
||||
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
|
||||
kMaxSequenceNumber));
|
||||
} else {
|
||||
iter.set(
|
||||
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[cf]));
|
||||
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
|
||||
kMaxSequenceNumber, handles_[cf]));
|
||||
}
|
||||
InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
|
||||
iter->Seek(target.Encode());
|
||||
@ -1232,10 +1233,11 @@ void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
|
||||
// assigned iterator before it range_del_agg is already destructed.
|
||||
ScopedArenaIterator iter;
|
||||
if (cf != 0) {
|
||||
iter.set(
|
||||
dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[cf]));
|
||||
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
|
||||
kMaxSequenceNumber, handles_[cf]));
|
||||
} else {
|
||||
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg));
|
||||
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
|
||||
kMaxSequenceNumber));
|
||||
}
|
||||
iter->SeekToFirst();
|
||||
ASSERT_EQ(iter->status().ok(), true);
|
||||
@ -1437,7 +1439,8 @@ void DBTestBase::VerifyDBInternal(
|
||||
InternalKeyComparator icmp(last_options_.comparator);
|
||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
||||
kMaxSequenceNumber /* upper_bound */);
|
||||
auto iter = dbfull()->NewInternalIterator(&arena, &range_del_agg);
|
||||
auto iter =
|
||||
dbfull()->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber);
|
||||
iter->SeekToFirst();
|
||||
for (auto p : true_data) {
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
|
@ -307,7 +307,8 @@ Status FlushJob::WriteLevel0Table() {
|
||||
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
|
||||
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
|
||||
memtables.push_back(m->NewIterator(ro, &arena));
|
||||
auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
|
||||
auto* range_del_iter =
|
||||
m->NewRangeTombstoneIterator(ro, versions_->LastSequence());
|
||||
if (range_del_iter != nullptr) {
|
||||
range_del_iters.push_back(range_del_iter);
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "db/dbformat.h"
|
||||
#include "db/job_context.h"
|
||||
#include "db/range_del_aggregator_v2.h"
|
||||
#include "db/range_tombstone_fragmenter.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/slice_transform.h"
|
||||
@ -614,9 +615,10 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
|
||||
mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
|
||||
sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
|
||||
if (!read_options_.ignore_range_deletions) {
|
||||
std::unique_ptr<InternalIterator> range_del_iter(
|
||||
sv_->mem->NewRangeTombstoneIterator(read_options_));
|
||||
range_del_agg.AddUnfragmentedTombstones(std::move(range_del_iter));
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
|
||||
sv_->mem->NewRangeTombstoneIterator(
|
||||
read_options_, sv_->current->version_set()->LastSequence()));
|
||||
range_del_agg.AddTombstones(std::move(range_del_iter));
|
||||
sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
|
||||
&range_del_agg);
|
||||
}
|
||||
@ -670,9 +672,10 @@ void ForwardIterator::RenewIterators() {
|
||||
RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(),
|
||||
kMaxSequenceNumber /* upper_bound */);
|
||||
if (!read_options_.ignore_range_deletions) {
|
||||
std::unique_ptr<InternalIterator> range_del_iter(
|
||||
svnew->mem->NewRangeTombstoneIterator(read_options_));
|
||||
range_del_agg.AddUnfragmentedTombstones(std::move(range_del_iter));
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
|
||||
svnew->mem->NewRangeTombstoneIterator(
|
||||
read_options_, sv_->current->version_set()->LastSequence()));
|
||||
range_del_agg.AddTombstones(std::move(range_del_iter));
|
||||
svnew->imm->AddRangeTombstoneIterators(read_options_, &arena_,
|
||||
&range_del_agg);
|
||||
}
|
||||
|
@ -412,13 +412,24 @@ InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
|
||||
return new (mem) MemTableIterator(*this, read_options, arena);
|
||||
}
|
||||
|
||||
InternalIterator* MemTable::NewRangeTombstoneIterator(
|
||||
const ReadOptions& read_options) {
|
||||
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
|
||||
const ReadOptions& read_options, SequenceNumber read_seq) {
|
||||
if (read_options.ignore_range_deletions || is_range_del_table_empty_) {
|
||||
return nullptr;
|
||||
}
|
||||
return new MemTableIterator(*this, read_options, nullptr /* arena */,
|
||||
true /* use_range_del_table */);
|
||||
auto* unfragmented_iter = new MemTableIterator(
|
||||
*this, read_options, nullptr /* arena */, true /* use_range_del_table */);
|
||||
if (unfragmented_iter == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
auto fragmented_tombstone_list =
|
||||
std::make_shared<FragmentedRangeTombstoneList>(
|
||||
std::unique_ptr<InternalIterator>(unfragmented_iter),
|
||||
comparator_.comparator);
|
||||
|
||||
auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
|
||||
fragmented_tombstone_list, read_seq, comparator_.comparator);
|
||||
return fragmented_iter;
|
||||
}
|
||||
|
||||
port::RWMutex* MemTable::GetLock(const Slice& key) {
|
||||
@ -732,17 +743,14 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
||||
}
|
||||
PERF_TIMER_GUARD(get_from_memtable_time);
|
||||
|
||||
std::unique_ptr<InternalIterator> range_del_iter(
|
||||
NewRangeTombstoneIterator(read_opts));
|
||||
SequenceNumber snapshot = GetInternalKeySeqno(key.internal_key());
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
comparator_.comparator,
|
||||
true /* one_time_use */, snapshot);
|
||||
FragmentedRangeTombstoneIterator fragment_iter(&fragment_list, snapshot,
|
||||
comparator_.comparator);
|
||||
*max_covering_tombstone_seq =
|
||||
std::max(*max_covering_tombstone_seq,
|
||||
fragment_iter.MaxCoveringTombstoneSeqnum(key.user_key()));
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
|
||||
NewRangeTombstoneIterator(read_opts,
|
||||
GetInternalKeySeqno(key.internal_key())));
|
||||
if (range_del_iter != nullptr) {
|
||||
*max_covering_tombstone_seq =
|
||||
std::max(*max_covering_tombstone_seq,
|
||||
range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
|
||||
}
|
||||
|
||||
Slice user_key = key.user_key();
|
||||
bool found_final_value = false;
|
||||
|
@ -16,7 +16,7 @@
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include "db/dbformat.h"
|
||||
#include "db/range_del_aggregator.h"
|
||||
#include "db/range_tombstone_fragmenter.h"
|
||||
#include "db/read_callback.h"
|
||||
#include "db/version_edit.h"
|
||||
#include "monitoring/instrumented_mutex.h"
|
||||
@ -158,7 +158,8 @@ class MemTable {
|
||||
// those allocated in arena.
|
||||
InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);
|
||||
|
||||
InternalIterator* NewRangeTombstoneIterator(const ReadOptions& read_options);
|
||||
FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
|
||||
const ReadOptions& read_options, SequenceNumber read_seq);
|
||||
|
||||
// Add an entry into memtable that maps key to value at the
|
||||
// specified sequence number and with the specified type.
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <string>
|
||||
#include "db/db_impl.h"
|
||||
#include "db/memtable.h"
|
||||
#include "db/range_tombstone_fragmenter.h"
|
||||
#include "db/version_set.h"
|
||||
#include "monitoring/thread_status_util.h"
|
||||
#include "rocksdb/db.h"
|
||||
@ -161,21 +162,11 @@ Status MemTableListVersion::AddRangeTombstoneIterators(
|
||||
RangeDelAggregatorV2* range_del_agg) {
|
||||
assert(range_del_agg != nullptr);
|
||||
for (auto& m : memlist_) {
|
||||
std::unique_ptr<InternalIterator> range_del_iter(
|
||||
m->NewRangeTombstoneIterator(read_opts));
|
||||
range_del_agg->AddUnfragmentedTombstones(std::move(range_del_iter));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MemTableListVersion::AddRangeTombstoneIterators(
|
||||
const ReadOptions& read_opts,
|
||||
std::vector<InternalIterator*>* range_del_iters) {
|
||||
for (auto& m : memlist_) {
|
||||
auto* range_del_iter = m->NewRangeTombstoneIterator(read_opts);
|
||||
if (range_del_iter != nullptr) {
|
||||
range_del_iters->push_back(range_del_iter);
|
||||
}
|
||||
// Using kMaxSequenceNumber is OK because these are immutable memtables.
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
|
||||
m->NewRangeTombstoneIterator(read_opts,
|
||||
kMaxSequenceNumber /* read_seq */));
|
||||
range_del_agg->AddTombstones(std::move(range_del_iter));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -92,9 +92,6 @@ class MemTableListVersion {
|
||||
|
||||
Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
|
||||
RangeDelAggregatorV2* range_del_agg);
|
||||
Status AddRangeTombstoneIterators(
|
||||
const ReadOptions& read_opts,
|
||||
std::vector<InternalIterator*>* range_del_iters);
|
||||
|
||||
void AddIterators(const ReadOptions& options,
|
||||
std::vector<InternalIterator*>* iterator_list,
|
||||
|
@ -215,8 +215,8 @@ int main(int argc, char** argv) {
|
||||
rocksdb::MakeRangeDelIterator(persistent_range_tombstones);
|
||||
fragmented_range_tombstone_lists.emplace_back(
|
||||
new rocksdb::FragmentedRangeTombstoneList(
|
||||
rocksdb::MakeRangeDelIterator(persistent_range_tombstones), icmp,
|
||||
false /* one_time_use */));
|
||||
rocksdb::MakeRangeDelIterator(persistent_range_tombstones),
|
||||
icmp));
|
||||
std::unique_ptr<rocksdb::FragmentedRangeTombstoneIterator>
|
||||
fragmented_range_del_iter(
|
||||
new rocksdb::FragmentedRangeTombstoneIterator(
|
||||
|
@ -215,11 +215,8 @@ void ReverseRangeDelIterator::Invalidate() {
|
||||
}
|
||||
|
||||
RangeDelAggregatorV2::RangeDelAggregatorV2(const InternalKeyComparator* icmp,
|
||||
SequenceNumber upper_bound)
|
||||
: icmp_(icmp),
|
||||
upper_bound_(upper_bound),
|
||||
forward_iter_(icmp, &iters_),
|
||||
reverse_iter_(icmp, &iters_) {}
|
||||
SequenceNumber /* upper_bound */)
|
||||
: icmp_(icmp), forward_iter_(icmp, &iters_), reverse_iter_(icmp, &iters_) {}
|
||||
|
||||
void RangeDelAggregatorV2::AddTombstones(
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
|
||||
@ -237,20 +234,6 @@ void RangeDelAggregatorV2::AddTombstones(
|
||||
icmp_, smallest, largest));
|
||||
}
|
||||
|
||||
void RangeDelAggregatorV2::AddUnfragmentedTombstones(
|
||||
std::unique_ptr<InternalIterator> input_iter) {
|
||||
assert(wrapped_range_del_agg == nullptr);
|
||||
if (input_iter == nullptr) {
|
||||
return;
|
||||
}
|
||||
pinned_fragments_.emplace_back(new FragmentedRangeTombstoneList(
|
||||
std::move(input_iter), *icmp_, false /* one_time_use */));
|
||||
auto fragmented_iter = new FragmentedRangeTombstoneIterator(
|
||||
pinned_fragments_.back().get(), upper_bound_, *icmp_);
|
||||
AddTombstones(
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator>(fragmented_iter));
|
||||
}
|
||||
|
||||
bool RangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed,
|
||||
RangeDelPositioningMode mode) {
|
||||
if (wrapped_range_del_agg != nullptr) {
|
||||
|
@ -249,8 +249,6 @@ class RangeDelAggregatorV2 {
|
||||
const InternalKey* smallest = nullptr,
|
||||
const InternalKey* largest = nullptr);
|
||||
|
||||
void AddUnfragmentedTombstones(std::unique_ptr<InternalIterator> input_iter);
|
||||
|
||||
bool ShouldDelete(const ParsedInternalKey& parsed,
|
||||
RangeDelPositioningMode mode);
|
||||
|
||||
@ -283,10 +281,8 @@ class RangeDelAggregatorV2 {
|
||||
|
||||
private:
|
||||
const InternalKeyComparator* icmp_;
|
||||
SequenceNumber upper_bound_;
|
||||
|
||||
std::vector<std::unique_ptr<TruncatedRangeDelIterator>> iters_;
|
||||
std::list<std::unique_ptr<FragmentedRangeTombstoneList>> pinned_fragments_;
|
||||
std::set<uint64_t> files_seen_;
|
||||
|
||||
ForwardRangeDelIterator forward_iter_;
|
||||
|
@ -41,7 +41,7 @@ MakeFragmentedTombstoneLists(
|
||||
for (const auto& range_dels : range_dels_list) {
|
||||
auto range_del_iter = MakeRangeDelIter(range_dels);
|
||||
fragment_lists.emplace_back(new FragmentedRangeTombstoneList(
|
||||
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */));
|
||||
std::move(range_del_iter), bytewise_icmp));
|
||||
}
|
||||
return fragment_lists;
|
||||
}
|
||||
@ -170,8 +170,8 @@ void VerifyIsRangeOverlapped(
|
||||
|
||||
TEST_F(RangeDelAggregatorV2Test, EmptyTruncatedIter) {
|
||||
auto range_del_iter = MakeRangeDelIter({});
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp));
|
||||
@ -189,8 +189,8 @@ TEST_F(RangeDelAggregatorV2Test, EmptyTruncatedIter) {
|
||||
TEST_F(RangeDelAggregatorV2Test, UntruncatedIter) {
|
||||
auto range_del_iter =
|
||||
MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}});
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp));
|
||||
@ -223,8 +223,8 @@ TEST_F(RangeDelAggregatorV2Test, UntruncatedIter) {
|
||||
TEST_F(RangeDelAggregatorV2Test, UntruncatedIterWithSnapshot) {
|
||||
auto range_del_iter =
|
||||
MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}});
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||
new FragmentedRangeTombstoneIterator(&fragment_list, 9 /* snapshot */,
|
||||
bytewise_icmp));
|
||||
@ -256,8 +256,8 @@ TEST_F(RangeDelAggregatorV2Test, UntruncatedIterWithSnapshot) {
|
||||
TEST_F(RangeDelAggregatorV2Test, TruncatedIter) {
|
||||
auto range_del_iter =
|
||||
MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}});
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp));
|
||||
@ -291,8 +291,8 @@ TEST_F(RangeDelAggregatorV2Test, TruncatedIter) {
|
||||
|
||||
TEST_F(RangeDelAggregatorV2Test, SingleIterInAggregator) {
|
||||
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, {"c", "g", 8}});
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp));
|
||||
|
@ -20,8 +20,7 @@ namespace rocksdb {
|
||||
|
||||
FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
|
||||
std::unique_ptr<InternalIterator> unfragmented_tombstones,
|
||||
const InternalKeyComparator& icmp, bool one_time_use,
|
||||
SequenceNumber snapshot) {
|
||||
const InternalKeyComparator& icmp) {
|
||||
if (unfragmented_tombstones == nullptr) {
|
||||
return;
|
||||
}
|
||||
@ -44,8 +43,7 @@ FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
|
||||
}
|
||||
}
|
||||
if (is_sorted) {
|
||||
FragmentTombstones(std::move(unfragmented_tombstones), icmp, one_time_use,
|
||||
snapshot);
|
||||
FragmentTombstones(std::move(unfragmented_tombstones), icmp);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -63,13 +61,12 @@ FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
|
||||
// VectorIterator implicitly sorts by key during construction.
|
||||
auto iter = std::unique_ptr<VectorIterator>(
|
||||
new VectorIterator(std::move(keys), std::move(values), &icmp));
|
||||
FragmentTombstones(std::move(iter), icmp, one_time_use, snapshot);
|
||||
FragmentTombstones(std::move(iter), icmp);
|
||||
}
|
||||
|
||||
void FragmentedRangeTombstoneList::FragmentTombstones(
|
||||
std::unique_ptr<InternalIterator> unfragmented_tombstones,
|
||||
const InternalKeyComparator& icmp, bool one_time_use,
|
||||
SequenceNumber snapshot) {
|
||||
const InternalKeyComparator& icmp) {
|
||||
Slice cur_start_key(nullptr, 0);
|
||||
auto cmp = ParsedInternalKeyComparator(&icmp);
|
||||
|
||||
@ -112,32 +109,20 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
|
||||
icmp.user_comparator()->Compare(tombstones_.back().end_key,
|
||||
cur_start_key) <= 0);
|
||||
|
||||
if (one_time_use) {
|
||||
SequenceNumber max_seqnum = 0;
|
||||
for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) {
|
||||
max_seqnum = std::max(max_seqnum, flush_it->sequence);
|
||||
}
|
||||
|
||||
size_t start_idx = tombstone_seqs_.size();
|
||||
tombstone_seqs_.push_back(max_seqnum);
|
||||
tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx,
|
||||
start_idx + 1);
|
||||
} else {
|
||||
// Sort the sequence numbers of the tombstones being fragmented in
|
||||
// descending order, and then flush them in that order.
|
||||
autovector<SequenceNumber> seqnums_to_flush;
|
||||
for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) {
|
||||
seqnums_to_flush.push_back(flush_it->sequence);
|
||||
}
|
||||
std::sort(seqnums_to_flush.begin(), seqnums_to_flush.end(),
|
||||
std::greater<SequenceNumber>());
|
||||
size_t start_idx = tombstone_seqs_.size();
|
||||
size_t end_idx = start_idx + seqnums_to_flush.size();
|
||||
tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
|
||||
seqnums_to_flush.end());
|
||||
tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx,
|
||||
end_idx);
|
||||
// Sort the sequence numbers of the tombstones being fragmented in
|
||||
// descending order, and then flush them in that order.
|
||||
autovector<SequenceNumber> seqnums_to_flush;
|
||||
for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) {
|
||||
seqnums_to_flush.push_back(flush_it->sequence);
|
||||
}
|
||||
std::sort(seqnums_to_flush.begin(), seqnums_to_flush.end(),
|
||||
std::greater<SequenceNumber>());
|
||||
size_t start_idx = tombstone_seqs_.size();
|
||||
size_t end_idx = start_idx + seqnums_to_flush.size();
|
||||
tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
|
||||
seqnums_to_flush.end());
|
||||
tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx, end_idx);
|
||||
|
||||
cur_start_key = cur_end_key;
|
||||
}
|
||||
if (!reached_next_start_key) {
|
||||
@ -158,10 +143,6 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
|
||||
const Slice& ikey = unfragmented_tombstones->key();
|
||||
Slice tombstone_start_key = ExtractUserKey(ikey);
|
||||
SequenceNumber tombstone_seq = GetInternalKeySeqno(ikey);
|
||||
if (one_time_use && tombstone_seq > snapshot) {
|
||||
// The tombstone is not visible by this snapshot.
|
||||
continue;
|
||||
}
|
||||
no_tombstones = false;
|
||||
|
||||
Slice tombstone_end_key = unfragmented_tombstones->value();
|
||||
|
@ -38,8 +38,7 @@ struct FragmentedRangeTombstoneList {
|
||||
};
|
||||
FragmentedRangeTombstoneList(
|
||||
std::unique_ptr<InternalIterator> unfragmented_tombstones,
|
||||
const InternalKeyComparator& icmp, bool one_time_use,
|
||||
SequenceNumber snapshot = kMaxSequenceNumber);
|
||||
const InternalKeyComparator& icmp);
|
||||
|
||||
std::vector<RangeTombstoneStack>::const_iterator begin() const {
|
||||
return tombstones_.begin();
|
||||
@ -69,8 +68,7 @@ struct FragmentedRangeTombstoneList {
|
||||
// tombstones_ and tombstone_seqs_.
|
||||
void FragmentTombstones(
|
||||
std::unique_ptr<InternalIterator> unfragmented_tombstones,
|
||||
const InternalKeyComparator& icmp, bool one_time_use,
|
||||
SequenceNumber snapshot = kMaxSequenceNumber);
|
||||
const InternalKeyComparator& icmp);
|
||||
|
||||
std::vector<RangeTombstoneStack> tombstones_;
|
||||
std::vector<SequenceNumber> tombstone_seqs_;
|
||||
|
@ -110,8 +110,8 @@ void VerifyMaxCoveringTombstoneSeqnum(
|
||||
TEST_F(RangeTombstoneFragmenterTest, NonOverlappingTombstones) {
|
||||
auto range_del_iter = MakeRangeDelIter({{"a", "b", 10}, {"c", "d", 5}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
VerifyFragmentedRangeDels(&iter, {{"a", "b", 10}, {"c", "d", 5}});
|
||||
@ -122,12 +122,12 @@ TEST_F(RangeTombstoneFragmenterTest, NonOverlappingTombstones) {
|
||||
TEST_F(RangeTombstoneFragmenterTest, OverlappingTombstones) {
|
||||
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, {"c", "g", 15}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
VerifyFragmentedRangeDels(&iter,
|
||||
{{"a", "c", 10}, {"c", "e", 15}, {"e", "g", 15}});
|
||||
VerifyFragmentedRangeDels(
|
||||
&iter, {{"a", "c", 10}, {"c", "e", 15}, {"c", "e", 10}, {"e", "g", 15}});
|
||||
VerifyMaxCoveringTombstoneSeqnum(&iter,
|
||||
{{"a", 10}, {"c", 15}, {"e", 15}, {"g", 0}});
|
||||
}
|
||||
@ -136,12 +136,12 @@ TEST_F(RangeTombstoneFragmenterTest, ContiguousTombstones) {
|
||||
auto range_del_iter = MakeRangeDelIter(
|
||||
{{"a", "c", 10}, {"c", "e", 20}, {"c", "e", 5}, {"e", "g", 15}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
VerifyFragmentedRangeDels(&iter,
|
||||
{{"a", "c", 10}, {"c", "e", 20}, {"e", "g", 15}});
|
||||
VerifyFragmentedRangeDels(
|
||||
&iter, {{"a", "c", 10}, {"c", "e", 20}, {"c", "e", 5}, {"e", "g", 15}});
|
||||
VerifyMaxCoveringTombstoneSeqnum(&iter,
|
||||
{{"a", 10}, {"c", 20}, {"e", 15}, {"g", 0}});
|
||||
}
|
||||
@ -150,11 +150,12 @@ TEST_F(RangeTombstoneFragmenterTest, RepeatedStartAndEndKey) {
|
||||
auto range_del_iter =
|
||||
MakeRangeDelIter({{"a", "c", 10}, {"a", "c", 7}, {"a", "c", 3}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
VerifyFragmentedRangeDels(&iter, {{"a", "c", 10}});
|
||||
VerifyFragmentedRangeDels(&iter,
|
||||
{{"a", "c", 10}, {"a", "c", 7}, {"a", "c", 3}});
|
||||
VerifyMaxCoveringTombstoneSeqnum(&iter, {{"a", 10}, {"b", 10}, {"c", 0}});
|
||||
}
|
||||
|
||||
@ -162,12 +163,16 @@ TEST_F(RangeTombstoneFragmenterTest, RepeatedStartKeyDifferentEndKeys) {
|
||||
auto range_del_iter =
|
||||
MakeRangeDelIter({{"a", "e", 10}, {"a", "g", 7}, {"a", "c", 3}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
VerifyFragmentedRangeDels(&iter,
|
||||
{{"a", "c", 10}, {"c", "e", 10}, {"e", "g", 7}});
|
||||
VerifyFragmentedRangeDels(&iter, {{"a", "c", 10},
|
||||
{"a", "c", 7},
|
||||
{"a", "c", 3},
|
||||
{"c", "e", 10},
|
||||
{"c", "e", 7},
|
||||
{"e", "g", 7}});
|
||||
VerifyMaxCoveringTombstoneSeqnum(&iter,
|
||||
{{"a", 10}, {"c", 10}, {"e", 7}, {"g", 0}});
|
||||
}
|
||||
@ -179,12 +184,20 @@ TEST_F(RangeTombstoneFragmenterTest, RepeatedStartKeyMixedEndKeys) {
|
||||
{"a", "g", 7},
|
||||
{"a", "c", 3}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
VerifyFragmentedRangeDels(&iter,
|
||||
{{"a", "c", 30}, {"c", "e", 20}, {"e", "g", 20}});
|
||||
VerifyFragmentedRangeDels(&iter, {{"a", "c", 30},
|
||||
{"a", "c", 20},
|
||||
{"a", "c", 10},
|
||||
{"a", "c", 7},
|
||||
{"a", "c", 3},
|
||||
{"c", "e", 20},
|
||||
{"c", "e", 10},
|
||||
{"c", "e", 7},
|
||||
{"e", "g", 20},
|
||||
{"e", "g", 7}});
|
||||
VerifyMaxCoveringTombstoneSeqnum(&iter,
|
||||
{{"a", 30}, {"c", 20}, {"e", 20}, {"g", 0}});
|
||||
}
|
||||
@ -196,63 +209,8 @@ TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKey) {
|
||||
{"j", "n", 4},
|
||||
{"j", "l", 2}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */);
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
VerifyFragmentedRangeDels(&iter, {{"a", "c", 10},
|
||||
{"c", "e", 10},
|
||||
{"e", "g", 8},
|
||||
{"g", "i", 6},
|
||||
{"j", "l", 4},
|
||||
{"l", "n", 4}});
|
||||
VerifyMaxCoveringTombstoneSeqnum(
|
||||
&iter, {{"a", 10}, {"c", 10}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}});
|
||||
}
|
||||
|
||||
TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKeyWithSnapshot) {
|
||||
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10},
|
||||
{"c", "g", 8},
|
||||
{"c", "i", 6},
|
||||
{"j", "n", 4},
|
||||
{"j", "l", 2}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */, 9);
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, 9 /* snapshot */,
|
||||
bytewise_icmp);
|
||||
VerifyFragmentedRangeDels(
|
||||
&iter, {{"c", "g", 8}, {"g", "i", 6}, {"j", "l", 4}, {"l", "n", 4}});
|
||||
VerifyMaxCoveringTombstoneSeqnum(
|
||||
&iter, {{"a", 0}, {"c", 8}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}});
|
||||
}
|
||||
|
||||
TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKeyUnordered) {
|
||||
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10},
|
||||
{"j", "n", 4},
|
||||
{"c", "i", 6},
|
||||
{"c", "g", 8},
|
||||
{"j", "l", 2}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */, 9);
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, 9 /* snapshot */,
|
||||
bytewise_icmp);
|
||||
VerifyFragmentedRangeDels(
|
||||
&iter, {{"c", "g", 8}, {"g", "i", 6}, {"j", "l", 4}, {"l", "n", 4}});
|
||||
VerifyMaxCoveringTombstoneSeqnum(
|
||||
&iter, {{"a", 0}, {"c", 8}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}});
|
||||
}
|
||||
|
||||
TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKeyMultiUse) {
|
||||
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10},
|
||||
{"c", "g", 8},
|
||||
{"c", "i", 6},
|
||||
{"j", "n", 4},
|
||||
{"j", "l", 2}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
FragmentedRangeTombstoneIterator iter1(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
FragmentedRangeTombstoneIterator iter2(&fragment_list, 9 /* snapshot */,
|
||||
@ -310,6 +268,31 @@ TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKeyMultiUse) {
|
||||
&iter5, {{"a", 0}, {"c", 0}, {"e", 0}, {"i", 0}, {"j", 2}, {"m", 0}});
|
||||
}
|
||||
|
||||
TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKeyUnordered) {
|
||||
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10},
|
||||
{"j", "n", 4},
|
||||
{"c", "i", 6},
|
||||
{"c", "g", 8},
|
||||
{"j", "l", 2}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, 9 /* snapshot */,
|
||||
bytewise_icmp);
|
||||
VerifyFragmentedRangeDels(&iter, {{"a", "c", 10},
|
||||
{"c", "e", 10},
|
||||
{"c", "e", 8},
|
||||
{"c", "e", 6},
|
||||
{"e", "g", 8},
|
||||
{"e", "g", 6},
|
||||
{"g", "i", 6},
|
||||
{"j", "l", 4},
|
||||
{"j", "l", 2},
|
||||
{"l", "n", 4}});
|
||||
VerifyMaxCoveringTombstoneSeqnum(
|
||||
&iter, {{"a", 0}, {"c", 8}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}});
|
||||
}
|
||||
|
||||
TEST_F(RangeTombstoneFragmenterTest, SeekStartKey) {
|
||||
// Same tombstones as OverlapAndRepeatedStartKey.
|
||||
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10},
|
||||
@ -318,8 +301,8 @@ TEST_F(RangeTombstoneFragmenterTest, SeekStartKey) {
|
||||
{"j", "n", 4},
|
||||
{"j", "l", 2}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
|
||||
FragmentedRangeTombstoneIterator iter1(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
@ -348,8 +331,8 @@ TEST_F(RangeTombstoneFragmenterTest, SeekCovered) {
|
||||
{"j", "n", 4},
|
||||
{"j", "l", 2}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
|
||||
FragmentedRangeTombstoneIterator iter1(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
@ -378,8 +361,8 @@ TEST_F(RangeTombstoneFragmenterTest, SeekEndKey) {
|
||||
{"j", "n", 4},
|
||||
{"j", "l", 2}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
|
||||
FragmentedRangeTombstoneIterator iter1(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
@ -412,8 +395,8 @@ TEST_F(RangeTombstoneFragmenterTest, SeekOutOfBounds) {
|
||||
{"j", "n", 4},
|
||||
{"j", "l", 2}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, false /* one_time_use */);
|
||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||
bytewise_icmp);
|
||||
|
||||
FragmentedRangeTombstoneIterator iter(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
@ -422,38 +405,6 @@ TEST_F(RangeTombstoneFragmenterTest, SeekOutOfBounds) {
|
||||
{{"", {}, true /* out of range */}, {"z", {"l", "n", 4}}});
|
||||
}
|
||||
|
||||
TEST_F(RangeTombstoneFragmenterTest, SeekOneTimeUse) {
|
||||
// Same tombstones as OverlapAndRepeatedStartKey.
|
||||
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10},
|
||||
{"c", "g", 8},
|
||||
{"c", "i", 6},
|
||||
{"j", "n", 4},
|
||||
{"j", "l", 2}});
|
||||
|
||||
FragmentedRangeTombstoneList fragment_list(
|
||||
std::move(range_del_iter), bytewise_icmp, true /* one_time_use */);
|
||||
|
||||
FragmentedRangeTombstoneIterator iter1(&fragment_list, kMaxSequenceNumber,
|
||||
bytewise_icmp);
|
||||
VerifySeek(
|
||||
&iter1,
|
||||
{{"a", {"a", "c", 10}}, {"e", {"e", "g", 8}}, {"l", {"l", "n", 4}}});
|
||||
VerifySeekForPrev(
|
||||
&iter1,
|
||||
{{"a", {"a", "c", 10}}, {"e", {"e", "g", 8}}, {"l", {"l", "n", 4}}});
|
||||
|
||||
// No tombstone fragments exist at this snapshot because they were dropped
|
||||
// when the list was created.
|
||||
FragmentedRangeTombstoneIterator iter2(&fragment_list, 3 /* snapshot */,
|
||||
bytewise_icmp);
|
||||
VerifySeek(&iter2, {{"a", {}, true /* out of range */},
|
||||
{"e", {}, true /* out of range */},
|
||||
{"l", {}, true /* out of range */}});
|
||||
VerifySeekForPrev(&iter2, {{"a", {}, true /* out of range */},
|
||||
{"e", {}, true /* out of range */},
|
||||
{"l", {}, true /* out of range */}});
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -419,7 +419,8 @@ class Repairer {
|
||||
status = BuildTable(
|
||||
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
|
||||
env_options_, table_cache_, iter.get(),
|
||||
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
|
||||
std::unique_ptr<InternalIterator>(
|
||||
mem->NewRangeTombstoneIterator(ro, vset_.LastSequence())),
|
||||
&meta, cfd->internal_comparator(),
|
||||
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
|
||||
{}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
|
||||
|
@ -382,8 +382,7 @@ Status TableCache::Get(const ReadOptions& options,
|
||||
if (s.ok() && max_covering_tombstone_seq != nullptr &&
|
||||
!options.ignore_range_deletions) {
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
|
||||
static_cast<FragmentedRangeTombstoneIterator*>(
|
||||
t->NewRangeTombstoneIterator(options)));
|
||||
t->NewRangeTombstoneIterator(options));
|
||||
if (range_del_iter != nullptr) {
|
||||
*max_covering_tombstone_seq = std::max(
|
||||
*max_covering_tombstone_seq,
|
||||
|
@ -51,7 +51,8 @@ static std::string PrintContents(WriteBatch* b) {
|
||||
iter = mem->NewIterator(ReadOptions(), &arena);
|
||||
arena_iter_guard.set(iter);
|
||||
} else {
|
||||
iter = mem->NewRangeTombstoneIterator(ReadOptions());
|
||||
iter = mem->NewRangeTombstoneIterator(ReadOptions(),
|
||||
kMaxSequenceNumber /* read_seq */);
|
||||
iter_guard.reset(iter);
|
||||
}
|
||||
if (iter == nullptr) {
|
||||
|
@ -995,7 +995,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
|
||||
auto iter = std::unique_ptr<InternalIterator>(
|
||||
new_table->NewUnfragmentedRangeTombstoneIterator(read_options));
|
||||
rep->fragmented_range_dels = std::make_shared<FragmentedRangeTombstoneList>(
|
||||
std::move(iter), internal_comparator, false /* one_time_use */);
|
||||
std::move(iter), internal_comparator);
|
||||
}
|
||||
|
||||
bool need_upper_bound_check =
|
||||
@ -2315,7 +2315,7 @@ InternalIterator* BlockBasedTable::NewIterator(
|
||||
}
|
||||
}
|
||||
|
||||
InternalIterator* BlockBasedTable::NewRangeTombstoneIterator(
|
||||
FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator(
|
||||
const ReadOptions& read_options) {
|
||||
if (rep_->fragmented_range_dels == nullptr) {
|
||||
return nullptr;
|
||||
|
@ -114,7 +114,7 @@ class BlockBasedTable : public TableReader {
|
||||
bool skip_filters = false,
|
||||
bool for_compaction = false) override;
|
||||
|
||||
InternalIterator* NewRangeTombstoneIterator(
|
||||
FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
|
||||
const ReadOptions& read_options) override;
|
||||
|
||||
// @param skip_filters Disables loading/accessing the filter block
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#pragma once
|
||||
#include <memory>
|
||||
#include "db/range_tombstone_fragmenter.h"
|
||||
#include "rocksdb/slice_transform.h"
|
||||
#include "table/internal_iterator.h"
|
||||
|
||||
@ -44,7 +45,7 @@ class TableReader {
|
||||
bool skip_filters = false,
|
||||
bool for_compaction = false) = 0;
|
||||
|
||||
virtual InternalIterator* NewRangeTombstoneIterator(
|
||||
virtual FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
|
||||
const ReadOptions& /*read_options*/) {
|
||||
return nullptr;
|
||||
}
|
||||
|
@ -2905,7 +2905,8 @@ TEST_F(MemTableTest, Simple) {
|
||||
iter = memtable->NewIterator(ReadOptions(), &arena);
|
||||
arena_iter_guard.set(iter);
|
||||
} else {
|
||||
iter = memtable->NewRangeTombstoneIterator(ReadOptions());
|
||||
iter = memtable->NewRangeTombstoneIterator(
|
||||
ReadOptions(), kMaxSequenceNumber /* read_seq */);
|
||||
iter_guard.reset(iter);
|
||||
}
|
||||
if (iter == nullptr) {
|
||||
|
@ -96,9 +96,7 @@ class BinaryHeap {
|
||||
return data_.empty();
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
return data_.size();
|
||||
}
|
||||
size_t size() const { return data_.size(); }
|
||||
|
||||
void reset_root_cmp_cache() { root_cmp_cache_ = port::kMaxSizet; }
|
||||
|
||||
|
@ -389,7 +389,7 @@ Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) {
|
||||
for (auto& item : handle_map_) {
|
||||
auto handle = item.second;
|
||||
builder.AddIterator(db_impl->NewInternalIterator(
|
||||
arena, db_iter->GetRangeDelAggregator(), handle));
|
||||
arena, db_iter->GetRangeDelAggregator(), kMaxSequenceNumber, handle));
|
||||
}
|
||||
auto internal_iter = builder.Finish();
|
||||
db_iter->SetIterUnderDBIter(internal_iter);
|
||||
|
@ -22,7 +22,8 @@ Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
|
||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
||||
kMaxSequenceNumber /* upper_bound */);
|
||||
Arena arena;
|
||||
ScopedArenaIterator iter(idb->NewInternalIterator(&arena, &range_del_agg));
|
||||
ScopedArenaIterator iter(
|
||||
idb->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber));
|
||||
|
||||
if (!begin_key.empty()) {
|
||||
InternalKey ikey;
|
||||
|
Loading…
x
Reference in New Issue
Block a user