Add compaction logic to RangeDelAggregatorV2 (#4758)
Summary: RangeDelAggregatorV2 now supports ShouldDelete calls on snapshot stripes and creation of range tombstone compaction iterators. RangeDelAggregator is no longer used on any non-test code path, and will be removed in a future commit. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4758 Differential Revision: D13439254 Pulled By: abhimadan fbshipit-source-id: fe105bcf8e3d4a2df37a622d5510843cd71b0401
This commit is contained in:
parent
8522d9c74d
commit
96de211f4c
@ -18,6 +18,7 @@
|
|||||||
#include "db/event_helpers.h"
|
#include "db/event_helpers.h"
|
||||||
#include "db/internal_stats.h"
|
#include "db/internal_stats.h"
|
||||||
#include "db/merge_helper.h"
|
#include "db/merge_helper.h"
|
||||||
|
#include "db/range_del_aggregator_v2.h"
|
||||||
#include "db/table_cache.h"
|
#include "db/table_cache.h"
|
||||||
#include "db/version_edit.h"
|
#include "db/version_edit.h"
|
||||||
#include "monitoring/iostats_context_imp.h"
|
#include "monitoring/iostats_context_imp.h"
|
||||||
@ -65,8 +66,9 @@ Status BuildTable(
|
|||||||
const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
|
const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
|
||||||
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
|
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
|
||||||
TableCache* table_cache, InternalIterator* iter,
|
TableCache* table_cache, InternalIterator* iter,
|
||||||
std::unique_ptr<InternalIterator> range_del_iter, FileMetaData* meta,
|
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
||||||
const InternalKeyComparator& internal_comparator,
|
range_del_iters,
|
||||||
|
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
|
||||||
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
|
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
|
||||||
int_tbl_prop_collector_factories,
|
int_tbl_prop_collector_factories,
|
||||||
uint32_t column_family_id, const std::string& column_family_name,
|
uint32_t column_family_id, const std::string& column_family_name,
|
||||||
@ -86,12 +88,10 @@ Status BuildTable(
|
|||||||
Status s;
|
Status s;
|
||||||
meta->fd.file_size = 0;
|
meta->fd.file_size = 0;
|
||||||
iter->SeekToFirst();
|
iter->SeekToFirst();
|
||||||
std::unique_ptr<RangeDelAggregator> range_del_agg(
|
std::unique_ptr<CompactionRangeDelAggregatorV2> range_del_agg(
|
||||||
new RangeDelAggregator(internal_comparator, snapshots));
|
new CompactionRangeDelAggregatorV2(&internal_comparator, snapshots));
|
||||||
s = range_del_agg->AddTombstones(std::move(range_del_iter));
|
for (auto& range_del_iter : range_del_iters) {
|
||||||
if (!s.ok()) {
|
range_del_agg->AddTombstones(std::move(range_del_iter));
|
||||||
// may be non-ok if a range tombstone key is unparsable
|
|
||||||
return s;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(),
|
std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(),
|
||||||
@ -158,8 +158,10 @@ Status BuildTable(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto it = range_del_agg->NewIterator(); it->Valid(); it->Next()) {
|
auto range_del_it = range_del_agg->NewIterator();
|
||||||
auto tombstone = it->Tombstone();
|
for (range_del_it->SeekToFirst(); range_del_it->Valid();
|
||||||
|
range_del_it->Next()) {
|
||||||
|
auto tombstone = range_del_it->Tombstone();
|
||||||
auto kv = tombstone.Serialize();
|
auto kv = tombstone.Serialize();
|
||||||
builder->Add(kv.first.Encode(), kv.second);
|
builder->Add(kv.first.Encode(), kv.second);
|
||||||
meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
|
meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include "db/range_tombstone_fragmenter.h"
|
||||||
#include "db/table_properties_collector.h"
|
#include "db/table_properties_collector.h"
|
||||||
#include "options/cf_options.h"
|
#include "options/cf_options.h"
|
||||||
#include "rocksdb/comparator.h"
|
#include "rocksdb/comparator.h"
|
||||||
@ -65,8 +66,9 @@ extern Status BuildTable(
|
|||||||
const std::string& dbname, Env* env, const ImmutableCFOptions& options,
|
const std::string& dbname, Env* env, const ImmutableCFOptions& options,
|
||||||
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
|
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
|
||||||
TableCache* table_cache, InternalIterator* iter,
|
TableCache* table_cache, InternalIterator* iter,
|
||||||
std::unique_ptr<InternalIterator> range_del_iter, FileMetaData* meta,
|
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
||||||
const InternalKeyComparator& internal_comparator,
|
range_del_iters,
|
||||||
|
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
|
||||||
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
|
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
|
||||||
int_tbl_prop_collector_factories,
|
int_tbl_prop_collector_factories,
|
||||||
uint32_t column_family_id, const std::string& column_family_name,
|
uint32_t column_family_id, const std::string& column_family_name,
|
||||||
|
@ -945,7 +945,7 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
|
|||||||
ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
|
ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
|
||||||
|
|
||||||
auto read_seq = super_version->current->version_set()->LastSequence();
|
auto read_seq = super_version->current->version_set()->LastSequence();
|
||||||
RangeDelAggregatorV2 range_del_agg(&internal_comparator_, read_seq);
|
ReadRangeDelAggregatorV2 range_del_agg(&internal_comparator_, read_seq);
|
||||||
auto* active_range_del_iter =
|
auto* active_range_del_iter =
|
||||||
super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
|
super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
|
||||||
range_del_agg.AddTombstones(
|
range_del_agg.AddTombstones(
|
||||||
|
@ -18,7 +18,7 @@ CompactionIterator::CompactionIterator(
|
|||||||
SequenceNumber earliest_write_conflict_snapshot,
|
SequenceNumber earliest_write_conflict_snapshot,
|
||||||
const SnapshotChecker* snapshot_checker, Env* env,
|
const SnapshotChecker* snapshot_checker, Env* env,
|
||||||
bool report_detailed_time, bool expect_valid_internal_key,
|
bool report_detailed_time, bool expect_valid_internal_key,
|
||||||
RangeDelAggregator* range_del_agg, const Compaction* compaction,
|
CompactionRangeDelAggregatorV2* range_del_agg, const Compaction* compaction,
|
||||||
const CompactionFilter* compaction_filter,
|
const CompactionFilter* compaction_filter,
|
||||||
const std::atomic<bool>* shutting_down,
|
const std::atomic<bool>* shutting_down,
|
||||||
const SequenceNumber preserve_deletes_seqnum)
|
const SequenceNumber preserve_deletes_seqnum)
|
||||||
@ -36,7 +36,7 @@ CompactionIterator::CompactionIterator(
|
|||||||
SequenceNumber earliest_write_conflict_snapshot,
|
SequenceNumber earliest_write_conflict_snapshot,
|
||||||
const SnapshotChecker* snapshot_checker, Env* env,
|
const SnapshotChecker* snapshot_checker, Env* env,
|
||||||
bool report_detailed_time, bool expect_valid_internal_key,
|
bool report_detailed_time, bool expect_valid_internal_key,
|
||||||
RangeDelAggregator* range_del_agg,
|
CompactionRangeDelAggregatorV2* range_del_agg,
|
||||||
std::unique_ptr<CompactionProxy> compaction,
|
std::unique_ptr<CompactionProxy> compaction,
|
||||||
const CompactionFilter* compaction_filter,
|
const CompactionFilter* compaction_filter,
|
||||||
const std::atomic<bool>* shutting_down,
|
const std::atomic<bool>* shutting_down,
|
||||||
|
@ -13,7 +13,7 @@
|
|||||||
#include "db/compaction_iteration_stats.h"
|
#include "db/compaction_iteration_stats.h"
|
||||||
#include "db/merge_helper.h"
|
#include "db/merge_helper.h"
|
||||||
#include "db/pinned_iterators_manager.h"
|
#include "db/pinned_iterators_manager.h"
|
||||||
#include "db/range_del_aggregator.h"
|
#include "db/range_del_aggregator_v2.h"
|
||||||
#include "db/snapshot_checker.h"
|
#include "db/snapshot_checker.h"
|
||||||
#include "options/cf_options.h"
|
#include "options/cf_options.h"
|
||||||
#include "rocksdb/compaction_filter.h"
|
#include "rocksdb/compaction_filter.h"
|
||||||
@ -64,7 +64,7 @@ class CompactionIterator {
|
|||||||
SequenceNumber earliest_write_conflict_snapshot,
|
SequenceNumber earliest_write_conflict_snapshot,
|
||||||
const SnapshotChecker* snapshot_checker, Env* env,
|
const SnapshotChecker* snapshot_checker, Env* env,
|
||||||
bool report_detailed_time, bool expect_valid_internal_key,
|
bool report_detailed_time, bool expect_valid_internal_key,
|
||||||
RangeDelAggregator* range_del_agg,
|
CompactionRangeDelAggregatorV2* range_del_agg,
|
||||||
const Compaction* compaction = nullptr,
|
const Compaction* compaction = nullptr,
|
||||||
const CompactionFilter* compaction_filter = nullptr,
|
const CompactionFilter* compaction_filter = nullptr,
|
||||||
const std::atomic<bool>* shutting_down = nullptr,
|
const std::atomic<bool>* shutting_down = nullptr,
|
||||||
@ -77,7 +77,7 @@ class CompactionIterator {
|
|||||||
SequenceNumber earliest_write_conflict_snapshot,
|
SequenceNumber earliest_write_conflict_snapshot,
|
||||||
const SnapshotChecker* snapshot_checker, Env* env,
|
const SnapshotChecker* snapshot_checker, Env* env,
|
||||||
bool report_detailed_time, bool expect_valid_internal_key,
|
bool report_detailed_time, bool expect_valid_internal_key,
|
||||||
RangeDelAggregator* range_del_agg,
|
CompactionRangeDelAggregatorV2* range_del_agg,
|
||||||
std::unique_ptr<CompactionProxy> compaction,
|
std::unique_ptr<CompactionProxy> compaction,
|
||||||
const CompactionFilter* compaction_filter = nullptr,
|
const CompactionFilter* compaction_filter = nullptr,
|
||||||
const std::atomic<bool>* shutting_down = nullptr,
|
const std::atomic<bool>* shutting_down = nullptr,
|
||||||
@ -141,7 +141,7 @@ class CompactionIterator {
|
|||||||
Env* env_;
|
Env* env_;
|
||||||
bool report_detailed_time_;
|
bool report_detailed_time_;
|
||||||
bool expect_valid_internal_key_;
|
bool expect_valid_internal_key_;
|
||||||
RangeDelAggregator* range_del_agg_;
|
CompactionRangeDelAggregatorV2* range_del_agg_;
|
||||||
std::unique_ptr<CompactionProxy> compaction_;
|
std::unique_ptr<CompactionProxy> compaction_;
|
||||||
const CompactionFilter* compaction_filter_;
|
const CompactionFilter* compaction_filter_;
|
||||||
const std::atomic<bool>* shutting_down_;
|
const std::atomic<bool>* shutting_down_;
|
||||||
|
@ -221,10 +221,16 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
|
|||||||
MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr,
|
MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr,
|
||||||
bool bottommost_level = false,
|
bool bottommost_level = false,
|
||||||
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) {
|
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) {
|
||||||
std::unique_ptr<InternalIterator> range_del_iter(
|
std::unique_ptr<InternalIterator> unfragmented_range_del_iter(
|
||||||
new test::VectorIterator(range_del_ks, range_del_vs));
|
new test::VectorIterator(range_del_ks, range_del_vs));
|
||||||
range_del_agg_.reset(new RangeDelAggregator(icmp_, snapshots_));
|
auto tombstone_list = std::make_shared<FragmentedRangeTombstoneList>(
|
||||||
ASSERT_OK(range_del_agg_->AddTombstones(std::move(range_del_iter)));
|
std::move(unfragmented_range_del_iter), icmp_);
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
|
||||||
|
new FragmentedRangeTombstoneIterator(tombstone_list, icmp_,
|
||||||
|
kMaxSequenceNumber));
|
||||||
|
range_del_agg_.reset(
|
||||||
|
new CompactionRangeDelAggregatorV2(&icmp_, snapshots_));
|
||||||
|
range_del_agg_->AddTombstones(std::move(range_del_iter));
|
||||||
|
|
||||||
std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
|
std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
|
||||||
if (filter || bottommost_level) {
|
if (filter || bottommost_level) {
|
||||||
@ -292,7 +298,7 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
|
|||||||
std::unique_ptr<MergeHelper> merge_helper_;
|
std::unique_ptr<MergeHelper> merge_helper_;
|
||||||
std::unique_ptr<LoggingForwardVectorIterator> iter_;
|
std::unique_ptr<LoggingForwardVectorIterator> iter_;
|
||||||
std::unique_ptr<CompactionIterator> c_iter_;
|
std::unique_ptr<CompactionIterator> c_iter_;
|
||||||
std::unique_ptr<RangeDelAggregator> range_del_agg_;
|
std::unique_ptr<CompactionRangeDelAggregatorV2> range_del_agg_;
|
||||||
std::unique_ptr<SnapshotChecker> snapshot_checker_;
|
std::unique_ptr<SnapshotChecker> snapshot_checker_;
|
||||||
std::atomic<bool> shutting_down_{false};
|
std::atomic<bool> shutting_down_{false};
|
||||||
FakeCompaction* compaction_proxy_;
|
FakeCompaction* compaction_proxy_;
|
||||||
|
@ -805,15 +805,13 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
|
|||||||
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||||
assert(sub_compact != nullptr);
|
assert(sub_compact != nullptr);
|
||||||
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
|
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
|
||||||
RangeDelAggregatorV2 range_del_agg_v2(&cfd->internal_comparator(),
|
CompactionRangeDelAggregatorV2 range_del_agg(&cfd->internal_comparator(),
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
existing_snapshots_);
|
||||||
auto* range_del_agg =
|
|
||||||
range_del_agg_v2.DelegateToRangeDelAggregator(existing_snapshots_);
|
|
||||||
|
|
||||||
// Although the v2 aggregator is what the level iterator(s) know about,
|
// Although the v2 aggregator is what the level iterator(s) know about,
|
||||||
// the AddTombstones calls will be propagated down to the v1 aggregator.
|
// the AddTombstones calls will be propagated down to the v1 aggregator.
|
||||||
std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
|
std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
|
||||||
sub_compact->compaction, &range_del_agg_v2, env_optiosn_for_read_));
|
sub_compact->compaction, &range_del_agg, env_optiosn_for_read_));
|
||||||
|
|
||||||
AutoThreadOperationStageUpdater stage_updater(
|
AutoThreadOperationStageUpdater stage_updater(
|
||||||
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
|
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
|
||||||
@ -902,8 +900,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
|||||||
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
|
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
|
||||||
&existing_snapshots_, earliest_write_conflict_snapshot_,
|
&existing_snapshots_, earliest_write_conflict_snapshot_,
|
||||||
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
|
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
|
||||||
range_del_agg, sub_compact->compaction, compaction_filter, shutting_down_,
|
&range_del_agg, sub_compact->compaction, compaction_filter,
|
||||||
preserve_deletes_seqnum_));
|
shutting_down_, preserve_deletes_seqnum_));
|
||||||
auto c_iter = sub_compact->c_iter.get();
|
auto c_iter = sub_compact->c_iter.get();
|
||||||
c_iter->SeekToFirst();
|
c_iter->SeekToFirst();
|
||||||
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
|
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
|
||||||
@ -1041,7 +1039,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
|||||||
}
|
}
|
||||||
CompactionIterationStats range_del_out_stats;
|
CompactionIterationStats range_del_out_stats;
|
||||||
status =
|
status =
|
||||||
FinishCompactionOutputFile(input_status, sub_compact, range_del_agg,
|
FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
|
||||||
&range_del_out_stats, next_key);
|
&range_del_out_stats, next_key);
|
||||||
RecordDroppedKeys(range_del_out_stats,
|
RecordDroppedKeys(range_del_out_stats,
|
||||||
&sub_compact->compaction_job_stats);
|
&sub_compact->compaction_job_stats);
|
||||||
@ -1092,8 +1090,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (status.ok() && sub_compact->builder == nullptr &&
|
if (status.ok() && sub_compact->builder == nullptr &&
|
||||||
sub_compact->outputs.size() == 0 &&
|
sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
|
||||||
!range_del_agg->IsEmpty()) {
|
|
||||||
// handle subcompaction containing only range deletions
|
// handle subcompaction containing only range deletions
|
||||||
status = OpenCompactionOutputFile(sub_compact);
|
status = OpenCompactionOutputFile(sub_compact);
|
||||||
}
|
}
|
||||||
@ -1102,7 +1099,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
|||||||
// close the output file.
|
// close the output file.
|
||||||
if (sub_compact->builder != nullptr) {
|
if (sub_compact->builder != nullptr) {
|
||||||
CompactionIterationStats range_del_out_stats;
|
CompactionIterationStats range_del_out_stats;
|
||||||
Status s = FinishCompactionOutputFile(status, sub_compact, range_del_agg,
|
Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
|
||||||
&range_del_out_stats);
|
&range_del_out_stats);
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
status = s;
|
status = s;
|
||||||
@ -1168,7 +1165,7 @@ void CompactionJob::RecordDroppedKeys(
|
|||||||
|
|
||||||
Status CompactionJob::FinishCompactionOutputFile(
|
Status CompactionJob::FinishCompactionOutputFile(
|
||||||
const Status& input_status, SubcompactionState* sub_compact,
|
const Status& input_status, SubcompactionState* sub_compact,
|
||||||
RangeDelAggregator* range_del_agg,
|
CompactionRangeDelAggregatorV2* range_del_agg,
|
||||||
CompactionIterationStats* range_del_out_stats,
|
CompactionIterationStats* range_del_out_stats,
|
||||||
const Slice* next_table_min_key /* = nullptr */) {
|
const Slice* next_table_min_key /* = nullptr */) {
|
||||||
AutoThreadOperationStageUpdater stage_updater(
|
AutoThreadOperationStageUpdater stage_updater(
|
||||||
@ -1220,11 +1217,6 @@ Status CompactionJob::FinishCompactionOutputFile(
|
|||||||
if (existing_snapshots_.size() > 0) {
|
if (existing_snapshots_.size() > 0) {
|
||||||
earliest_snapshot = existing_snapshots_[0];
|
earliest_snapshot = existing_snapshots_[0];
|
||||||
}
|
}
|
||||||
auto it = range_del_agg->NewIterator();
|
|
||||||
if (lower_bound != nullptr) {
|
|
||||||
it->Seek(*lower_bound);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool has_overlapping_endpoints;
|
bool has_overlapping_endpoints;
|
||||||
if (upper_bound != nullptr && meta->largest.size() > 0) {
|
if (upper_bound != nullptr && meta->largest.size() > 0) {
|
||||||
has_overlapping_endpoints =
|
has_overlapping_endpoints =
|
||||||
@ -1232,6 +1224,17 @@ Status CompactionJob::FinishCompactionOutputFile(
|
|||||||
} else {
|
} else {
|
||||||
has_overlapping_endpoints = false;
|
has_overlapping_endpoints = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
|
||||||
|
has_overlapping_endpoints);
|
||||||
|
// Position the range tombstone output iterator. There may be tombstone
|
||||||
|
// fragments that are entirely out of range, so make sure that we do not
|
||||||
|
// include those.
|
||||||
|
if (lower_bound != nullptr) {
|
||||||
|
it->Seek(*lower_bound);
|
||||||
|
} else {
|
||||||
|
it->SeekToFirst();
|
||||||
|
}
|
||||||
for (; it->Valid(); it->Next()) {
|
for (; it->Valid(); it->Next()) {
|
||||||
auto tombstone = it->Tombstone();
|
auto tombstone = it->Tombstone();
|
||||||
if (upper_bound != nullptr) {
|
if (upper_bound != nullptr) {
|
||||||
@ -1257,6 +1260,8 @@ Status CompactionJob::FinishCompactionOutputFile(
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto kv = tombstone.Serialize();
|
auto kv = tombstone.Serialize();
|
||||||
|
assert(lower_bound == nullptr ||
|
||||||
|
ucmp->Compare(*lower_bound, kv.second) < 0);
|
||||||
sub_compact->builder->Add(kv.first.Encode(), kv.second);
|
sub_compact->builder->Add(kv.first.Encode(), kv.second);
|
||||||
InternalKey smallest_candidate = std::move(kv.first);
|
InternalKey smallest_candidate = std::move(kv.first);
|
||||||
if (lower_bound != nullptr &&
|
if (lower_bound != nullptr &&
|
||||||
|
@ -25,12 +25,12 @@
|
|||||||
#include "db/job_context.h"
|
#include "db/job_context.h"
|
||||||
#include "db/log_writer.h"
|
#include "db/log_writer.h"
|
||||||
#include "db/memtable_list.h"
|
#include "db/memtable_list.h"
|
||||||
#include "db/range_del_aggregator.h"
|
#include "db/range_del_aggregator_v2.h"
|
||||||
#include "db/version_edit.h"
|
#include "db/version_edit.h"
|
||||||
#include "db/write_controller.h"
|
#include "db/write_controller.h"
|
||||||
#include "db/write_thread.h"
|
#include "db/write_thread.h"
|
||||||
#include "options/db_options.h"
|
|
||||||
#include "options/cf_options.h"
|
#include "options/cf_options.h"
|
||||||
|
#include "options/db_options.h"
|
||||||
#include "port/port.h"
|
#include "port/port.h"
|
||||||
#include "rocksdb/compaction_filter.h"
|
#include "rocksdb/compaction_filter.h"
|
||||||
#include "rocksdb/compaction_job_stats.h"
|
#include "rocksdb/compaction_job_stats.h"
|
||||||
@ -104,7 +104,7 @@ class CompactionJob {
|
|||||||
|
|
||||||
Status FinishCompactionOutputFile(
|
Status FinishCompactionOutputFile(
|
||||||
const Status& input_status, SubcompactionState* sub_compact,
|
const Status& input_status, SubcompactionState* sub_compact,
|
||||||
RangeDelAggregator* range_del_agg,
|
CompactionRangeDelAggregatorV2* range_del_agg,
|
||||||
CompactionIterationStats* range_del_out_stats,
|
CompactionIterationStats* range_del_out_stats,
|
||||||
const Slice* next_table_min_key = nullptr);
|
const Slice* next_table_min_key = nullptr);
|
||||||
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
|
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
|
||||||
|
@ -340,8 +340,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
|
|||||||
Arena arena;
|
Arena arena;
|
||||||
{
|
{
|
||||||
InternalKeyComparator icmp(options.comparator);
|
InternalKeyComparator icmp(options.comparator);
|
||||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
ReadRangeDelAggregatorV2 range_del_agg(
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
&icmp, kMaxSequenceNumber /* upper_bound */);
|
||||||
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
|
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
|
||||||
&arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
|
&arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
|
||||||
iter->SeekToFirst();
|
iter->SeekToFirst();
|
||||||
@ -430,8 +430,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
|
|||||||
count = 0;
|
count = 0;
|
||||||
{
|
{
|
||||||
InternalKeyComparator icmp(options.comparator);
|
InternalKeyComparator icmp(options.comparator);
|
||||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
ReadRangeDelAggregatorV2 range_del_agg(
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
&icmp, kMaxSequenceNumber /* upper_bound */);
|
||||||
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
|
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
|
||||||
&arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
|
&arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
|
||||||
iter->SeekToFirst();
|
iter->SeekToFirst();
|
||||||
@ -648,8 +648,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
|
|||||||
int total = 0;
|
int total = 0;
|
||||||
Arena arena;
|
Arena arena;
|
||||||
InternalKeyComparator icmp(options.comparator);
|
InternalKeyComparator icmp(options.comparator);
|
||||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
ReadRangeDelAggregatorV2 range_del_agg(&icmp,
|
||||||
kMaxSequenceNumber /* snapshots */);
|
kMaxSequenceNumber /* snapshots */);
|
||||||
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
|
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
|
||||||
&arena, &range_del_agg, kMaxSequenceNumber));
|
&arena, &range_del_agg, kMaxSequenceNumber));
|
||||||
iter->SeekToFirst();
|
iter->SeekToFirst();
|
||||||
|
@ -23,8 +23,7 @@
|
|||||||
#include "util/sync_point.h"
|
#include "util/sync_point.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
Options SanitizeOptions(const std::string& dbname,
|
Options SanitizeOptions(const std::string& dbname, const Options& src) {
|
||||||
const Options& src) {
|
|
||||||
auto db_options = SanitizeOptions(dbname, DBOptions(src));
|
auto db_options = SanitizeOptions(dbname, DBOptions(src));
|
||||||
ImmutableDBOptions immutable_db_options(db_options);
|
ImmutableDBOptions immutable_db_options(db_options);
|
||||||
auto cf_options =
|
auto cf_options =
|
||||||
@ -56,10 +55,9 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
|
|||||||
result.write_buffer_manager.reset(
|
result.write_buffer_manager.reset(
|
||||||
new WriteBufferManager(result.db_write_buffer_size));
|
new WriteBufferManager(result.db_write_buffer_size));
|
||||||
}
|
}
|
||||||
auto bg_job_limits = DBImpl::GetBGJobLimits(result.max_background_flushes,
|
auto bg_job_limits = DBImpl::GetBGJobLimits(
|
||||||
result.max_background_compactions,
|
result.max_background_flushes, result.max_background_compactions,
|
||||||
result.max_background_jobs,
|
result.max_background_jobs, true /* parallelize_compactions */);
|
||||||
true /* parallelize_compactions */);
|
|
||||||
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
|
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
|
||||||
Env::Priority::LOW);
|
Env::Priority::LOW);
|
||||||
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
|
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
|
||||||
@ -107,14 +105,12 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
|
|||||||
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
|
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result.use_direct_reads &&
|
if (result.use_direct_reads && result.compaction_readahead_size == 0) {
|
||||||
result.compaction_readahead_size == 0) {
|
|
||||||
TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
|
TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
|
||||||
result.compaction_readahead_size = 1024 * 1024 * 2;
|
result.compaction_readahead_size = 1024 * 1024 * 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result.compaction_readahead_size > 0 ||
|
if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
|
||||||
result.use_direct_reads) {
|
|
||||||
result.new_table_reader_for_compaction_inputs = true;
|
result.new_table_reader_for_compaction_inputs = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,7 +214,7 @@ static Status ValidateOptions(
|
|||||||
|
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
Status DBImpl::NewDB() {
|
Status DBImpl::NewDB() {
|
||||||
VersionEdit new_db;
|
VersionEdit new_db;
|
||||||
new_db.SetLogNumber(0);
|
new_db.SetLogNumber(0);
|
||||||
@ -258,9 +254,8 @@ Status DBImpl::NewDB() {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status DBImpl::CreateAndNewDirectory(
|
Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname,
|
||||||
Env* env, const std::string& dirname,
|
std::unique_ptr<Directory>* directory) {
|
||||||
std::unique_ptr<Directory>* directory) {
|
|
||||||
// We call CreateDirIfMissing() as the directory may already exist (if we
|
// We call CreateDirIfMissing() as the directory may already exist (if we
|
||||||
// are reopening a DB), when this happens we don't want creating the
|
// are reopening a DB), when this happens we don't want creating the
|
||||||
// directory to cause an error. However, we need to check if creating the
|
// directory to cause an error. However, we need to check if creating the
|
||||||
@ -341,8 +336,8 @@ Status DBImpl::Recover(
|
|||||||
}
|
}
|
||||||
} else if (s.ok()) {
|
} else if (s.ok()) {
|
||||||
if (immutable_db_options_.error_if_exists) {
|
if (immutable_db_options_.error_if_exists) {
|
||||||
return Status::InvalidArgument(
|
return Status::InvalidArgument(dbname_,
|
||||||
dbname_, "exists (error_if_exists is true)");
|
"exists (error_if_exists is true)");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Unexpected error reading file
|
// Unexpected error reading file
|
||||||
@ -527,10 +522,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
std::map<std::string, uint32_t> cf_name_id_map;
|
std::map<std::string, uint32_t> cf_name_id_map;
|
||||||
std::map<uint32_t, uint64_t> cf_lognumber_map;
|
std::map<uint32_t, uint64_t> cf_lognumber_map;
|
||||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||||
cf_name_id_map.insert(
|
cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
|
||||||
std::make_pair(cfd->GetName(), cfd->GetID()));
|
|
||||||
cf_lognumber_map.insert(
|
cf_lognumber_map.insert(
|
||||||
std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
|
std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
|
||||||
}
|
}
|
||||||
|
|
||||||
immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
|
immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
|
||||||
@ -880,8 +874,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
// VersionSet::next_file_number_ always to be strictly greater than any
|
// VersionSet::next_file_number_ always to be strictly greater than any
|
||||||
// log number
|
// log number
|
||||||
versions_->MarkFileNumberUsed(max_log_number + 1);
|
versions_->MarkFileNumberUsed(max_log_number + 1);
|
||||||
status = versions_->LogAndApply(
|
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
|
||||||
cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
|
edit, &mutex_);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
// Recovery failed
|
// Recovery failed
|
||||||
break;
|
break;
|
||||||
@ -994,12 +988,17 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
|||||||
if (use_custom_gc_ && snapshot_checker == nullptr) {
|
if (use_custom_gc_ && snapshot_checker == nullptr) {
|
||||||
snapshot_checker = DisableGCSnapshotChecker::Instance();
|
snapshot_checker = DisableGCSnapshotChecker::Instance();
|
||||||
}
|
}
|
||||||
|
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
||||||
|
range_del_iters;
|
||||||
|
auto range_del_iter =
|
||||||
|
mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
|
||||||
|
if (range_del_iter != nullptr) {
|
||||||
|
range_del_iters.emplace_back(range_del_iter);
|
||||||
|
}
|
||||||
s = BuildTable(
|
s = BuildTable(
|
||||||
dbname_, env_, *cfd->ioptions(), mutable_cf_options,
|
dbname_, env_, *cfd->ioptions(), mutable_cf_options,
|
||||||
env_options_for_compaction_, cfd->table_cache(), iter.get(),
|
env_options_for_compaction_, cfd->table_cache(), iter.get(),
|
||||||
std::unique_ptr<InternalIterator>(
|
std::move(range_del_iters), &meta, cfd->internal_comparator(),
|
||||||
mem->NewRangeTombstoneIterator(ro, versions_->LastSequence())),
|
|
||||||
&meta, cfd->internal_comparator(),
|
|
||||||
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
|
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
|
||||||
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
|
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
|
||||||
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
|
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
|
||||||
@ -1033,8 +1032,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
|||||||
stats.bytes_written = meta.fd.GetFileSize();
|
stats.bytes_written = meta.fd.GetFileSize();
|
||||||
stats.num_output_files = 1;
|
stats.num_output_files = 1;
|
||||||
cfd->internal_stats()->AddCompactionStats(level, stats);
|
cfd->internal_stats()->AddCompactionStats(level, stats);
|
||||||
cfd->internal_stats()->AddCFStats(
|
cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
|
||||||
InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
|
meta.fd.GetFileSize());
|
||||||
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
|
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
@ -1227,7 +1226,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
!cfd->mem()->IsMergeOperatorSupported()) {
|
!cfd->mem()->IsMergeOperatorSupported()) {
|
||||||
s = Status::InvalidArgument(
|
s = Status::InvalidArgument(
|
||||||
"The memtable of column family %s does not support merge operator "
|
"The memtable of column family %s does not support merge operator "
|
||||||
"its options.merge_operator is non-null", cfd->GetName().c_str());
|
"its options.merge_operator is non-null",
|
||||||
|
cfd->GetName().c_str());
|
||||||
}
|
}
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
break;
|
break;
|
||||||
|
@ -171,7 +171,7 @@ class DBIter final: public Iterator {
|
|||||||
iter_ = iter;
|
iter_ = iter;
|
||||||
iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
|
iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
|
||||||
}
|
}
|
||||||
virtual RangeDelAggregatorV2* GetRangeDelAggregator() {
|
virtual ReadRangeDelAggregatorV2* GetRangeDelAggregator() {
|
||||||
return &range_del_agg_;
|
return &range_del_agg_;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -341,7 +341,7 @@ class DBIter final: public Iterator {
|
|||||||
const bool total_order_seek_;
|
const bool total_order_seek_;
|
||||||
// List of operands for merge operator.
|
// List of operands for merge operator.
|
||||||
MergeContext merge_context_;
|
MergeContext merge_context_;
|
||||||
RangeDelAggregatorV2 range_del_agg_;
|
ReadRangeDelAggregatorV2 range_del_agg_;
|
||||||
LocalStatistics local_stats_;
|
LocalStatistics local_stats_;
|
||||||
PinnedIteratorsManager pinned_iters_mgr_;
|
PinnedIteratorsManager pinned_iters_mgr_;
|
||||||
ReadCallback* read_callback_;
|
ReadCallback* read_callback_;
|
||||||
@ -1479,7 +1479,7 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
|
|||||||
|
|
||||||
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
|
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
|
||||||
|
|
||||||
RangeDelAggregatorV2* ArenaWrappedDBIter::GetRangeDelAggregator() {
|
ReadRangeDelAggregatorV2* ArenaWrappedDBIter::GetRangeDelAggregator() {
|
||||||
return db_iter_->GetRangeDelAggregator();
|
return db_iter_->GetRangeDelAggregator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ class ArenaWrappedDBIter : public Iterator {
|
|||||||
// Get the arena to be used to allocate memory for DBIter to be wrapped,
|
// Get the arena to be used to allocate memory for DBIter to be wrapped,
|
||||||
// as well as child iterators in it.
|
// as well as child iterators in it.
|
||||||
virtual Arena* GetArena() { return &arena_; }
|
virtual Arena* GetArena() { return &arena_; }
|
||||||
virtual RangeDelAggregatorV2* GetRangeDelAggregator();
|
virtual ReadRangeDelAggregatorV2* GetRangeDelAggregator();
|
||||||
|
|
||||||
// Set the internal iterator wrapped inside the DB Iterator. Usually it is
|
// Set the internal iterator wrapped inside the DB Iterator. Usually it is
|
||||||
// a merging iterator.
|
// a merging iterator.
|
||||||
|
@ -814,8 +814,8 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
|
|||||||
Arena arena;
|
Arena arena;
|
||||||
auto options = CurrentOptions();
|
auto options = CurrentOptions();
|
||||||
InternalKeyComparator icmp(options.comparator);
|
InternalKeyComparator icmp(options.comparator);
|
||||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
ReadRangeDelAggregatorV2 range_del_agg(&icmp,
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
kMaxSequenceNumber /* upper_bound */);
|
||||||
ScopedArenaIterator iter;
|
ScopedArenaIterator iter;
|
||||||
if (cf == 0) {
|
if (cf == 0) {
|
||||||
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
|
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
|
||||||
@ -1227,8 +1227,8 @@ void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
|
|||||||
Arena arena;
|
Arena arena;
|
||||||
auto options = CurrentOptions();
|
auto options = CurrentOptions();
|
||||||
InternalKeyComparator icmp(options.comparator);
|
InternalKeyComparator icmp(options.comparator);
|
||||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
ReadRangeDelAggregatorV2 range_del_agg(&icmp,
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
kMaxSequenceNumber /* upper_bound */);
|
||||||
// This should be defined after range_del_agg so that it destructs the
|
// This should be defined after range_del_agg so that it destructs the
|
||||||
// assigned iterator before it range_del_agg is already destructed.
|
// assigned iterator before it range_del_agg is already destructed.
|
||||||
ScopedArenaIterator iter;
|
ScopedArenaIterator iter;
|
||||||
@ -1437,8 +1437,8 @@ void DBTestBase::VerifyDBInternal(
|
|||||||
std::vector<std::pair<std::string, std::string>> true_data) {
|
std::vector<std::pair<std::string, std::string>> true_data) {
|
||||||
Arena arena;
|
Arena arena;
|
||||||
InternalKeyComparator icmp(last_options_.comparator);
|
InternalKeyComparator icmp(last_options_.comparator);
|
||||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
ReadRangeDelAggregatorV2 range_del_agg(&icmp,
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
kMaxSequenceNumber /* upper_bound */);
|
||||||
auto iter =
|
auto iter =
|
||||||
dbfull()->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber);
|
dbfull()->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber);
|
||||||
iter->SeekToFirst();
|
iter->SeekToFirst();
|
||||||
|
@ -24,14 +24,15 @@
|
|||||||
#include "db/event_helpers.h"
|
#include "db/event_helpers.h"
|
||||||
#include "db/log_reader.h"
|
#include "db/log_reader.h"
|
||||||
#include "db/log_writer.h"
|
#include "db/log_writer.h"
|
||||||
|
#include "db/memtable.h"
|
||||||
#include "db/memtable_list.h"
|
#include "db/memtable_list.h"
|
||||||
#include "db/merge_context.h"
|
#include "db/merge_context.h"
|
||||||
|
#include "db/range_tombstone_fragmenter.h"
|
||||||
#include "db/version_set.h"
|
#include "db/version_set.h"
|
||||||
#include "monitoring/iostats_context_imp.h"
|
#include "monitoring/iostats_context_imp.h"
|
||||||
#include "monitoring/perf_context_imp.h"
|
#include "monitoring/perf_context_imp.h"
|
||||||
#include "monitoring/thread_status_util.h"
|
#include "monitoring/thread_status_util.h"
|
||||||
#include "port/port.h"
|
#include "port/port.h"
|
||||||
#include "db/memtable.h"
|
|
||||||
#include "rocksdb/db.h"
|
#include "rocksdb/db.h"
|
||||||
#include "rocksdb/env.h"
|
#include "rocksdb/env.h"
|
||||||
#include "rocksdb/statistics.h"
|
#include "rocksdb/statistics.h"
|
||||||
@ -295,7 +296,8 @@ Status FlushJob::WriteLevel0Table() {
|
|||||||
// memtable and its associated range deletion memtable, respectively, at
|
// memtable and its associated range deletion memtable, respectively, at
|
||||||
// corresponding indexes.
|
// corresponding indexes.
|
||||||
std::vector<InternalIterator*> memtables;
|
std::vector<InternalIterator*> memtables;
|
||||||
std::vector<InternalIterator*> range_del_iters;
|
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
||||||
|
range_del_iters;
|
||||||
ReadOptions ro;
|
ReadOptions ro;
|
||||||
ro.total_order_seek = true;
|
ro.total_order_seek = true;
|
||||||
Arena arena;
|
Arena arena;
|
||||||
@ -308,9 +310,9 @@ Status FlushJob::WriteLevel0Table() {
|
|||||||
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
|
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
|
||||||
memtables.push_back(m->NewIterator(ro, &arena));
|
memtables.push_back(m->NewIterator(ro, &arena));
|
||||||
auto* range_del_iter =
|
auto* range_del_iter =
|
||||||
m->NewRangeTombstoneIterator(ro, versions_->LastSequence());
|
m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
|
||||||
if (range_del_iter != nullptr) {
|
if (range_del_iter != nullptr) {
|
||||||
range_del_iters.push_back(range_del_iter);
|
range_del_iters.emplace_back(range_del_iter);
|
||||||
}
|
}
|
||||||
total_num_entries += m->num_entries();
|
total_num_entries += m->num_entries();
|
||||||
total_num_deletes += m->num_deletes();
|
total_num_deletes += m->num_deletes();
|
||||||
@ -329,10 +331,6 @@ Status FlushJob::WriteLevel0Table() {
|
|||||||
ScopedArenaIterator iter(
|
ScopedArenaIterator iter(
|
||||||
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
|
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
|
||||||
static_cast<int>(memtables.size()), &arena));
|
static_cast<int>(memtables.size()), &arena));
|
||||||
std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
|
|
||||||
&cfd_->internal_comparator(),
|
|
||||||
range_del_iters.empty() ? nullptr : &range_del_iters[0],
|
|
||||||
static_cast<int>(range_del_iters.size())));
|
|
||||||
ROCKS_LOG_INFO(db_options_.info_log,
|
ROCKS_LOG_INFO(db_options_.info_log,
|
||||||
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
|
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
|
||||||
cfd_->GetName().c_str(), job_context_->job_id,
|
cfd_->GetName().c_str(), job_context_->job_id,
|
||||||
@ -358,7 +356,7 @@ Status FlushJob::WriteLevel0Table() {
|
|||||||
s = BuildTable(
|
s = BuildTable(
|
||||||
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
|
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
|
||||||
env_options_, cfd_->table_cache(), iter.get(),
|
env_options_, cfd_->table_cache(), iter.get(),
|
||||||
std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
|
std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
|
||||||
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
|
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
|
||||||
cfd_->GetName(), existing_snapshots_,
|
cfd_->GetName(), existing_snapshots_,
|
||||||
earliest_write_conflict_snapshot_, snapshot_checker_,
|
earliest_write_conflict_snapshot_, snapshot_checker_,
|
||||||
|
@ -73,8 +73,8 @@ class ForwardLevelIterator : public InternalIterator {
|
|||||||
delete file_iter_;
|
delete file_iter_;
|
||||||
}
|
}
|
||||||
|
|
||||||
RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(),
|
ReadRangeDelAggregatorV2 range_del_agg(
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
&cfd_->internal_comparator(), kMaxSequenceNumber /* upper_bound */);
|
||||||
file_iter_ = cfd_->table_cache()->NewIterator(
|
file_iter_ = cfd_->table_cache()->NewIterator(
|
||||||
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
|
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
|
||||||
*files_[file_index_],
|
*files_[file_index_],
|
||||||
@ -610,8 +610,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
|
|||||||
// New
|
// New
|
||||||
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
|
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
|
||||||
}
|
}
|
||||||
RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(),
|
ReadRangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(),
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
kMaxSequenceNumber /* upper_bound */);
|
||||||
mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
|
mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
|
||||||
sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
|
sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
|
||||||
if (!read_options_.ignore_range_deletions) {
|
if (!read_options_.ignore_range_deletions) {
|
||||||
@ -669,8 +669,8 @@ void ForwardIterator::RenewIterators() {
|
|||||||
|
|
||||||
mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
|
mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
|
||||||
svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
|
svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
|
||||||
RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(),
|
ReadRangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(),
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
kMaxSequenceNumber /* upper_bound */);
|
||||||
if (!read_options_.ignore_range_deletions) {
|
if (!read_options_.ignore_range_deletions) {
|
||||||
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
|
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
|
||||||
svnew->mem->NewRangeTombstoneIterator(
|
svnew->mem->NewRangeTombstoneIterator(
|
||||||
|
@ -110,8 +110,11 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
|
|||||||
// keys_ stores the list of keys encountered while merging.
|
// keys_ stores the list of keys encountered while merging.
|
||||||
// operands_ stores the list of merge operands encountered while merging.
|
// operands_ stores the list of merge operands encountered while merging.
|
||||||
// keys_[i] corresponds to operands_[i] for each i.
|
// keys_[i] corresponds to operands_[i] for each i.
|
||||||
|
//
|
||||||
|
// TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator
|
||||||
|
// and just pass the StripeRep corresponding to the stripe being merged.
|
||||||
Status MergeHelper::MergeUntil(InternalIterator* iter,
|
Status MergeHelper::MergeUntil(InternalIterator* iter,
|
||||||
RangeDelAggregator* range_del_agg,
|
CompactionRangeDelAggregatorV2* range_del_agg,
|
||||||
const SequenceNumber stop_before,
|
const SequenceNumber stop_before,
|
||||||
const bool at_bottom) {
|
const bool at_bottom) {
|
||||||
// Get a copy of the internal key, before it's invalidated by iter->Next()
|
// Get a copy of the internal key, before it's invalidated by iter->Next()
|
||||||
|
@ -11,7 +11,7 @@
|
|||||||
|
|
||||||
#include "db/dbformat.h"
|
#include "db/dbformat.h"
|
||||||
#include "db/merge_context.h"
|
#include "db/merge_context.h"
|
||||||
#include "db/range_del_aggregator.h"
|
#include "db/range_del_aggregator_v2.h"
|
||||||
#include "db/snapshot_checker.h"
|
#include "db/snapshot_checker.h"
|
||||||
#include "rocksdb/compaction_filter.h"
|
#include "rocksdb/compaction_filter.h"
|
||||||
#include "rocksdb/env.h"
|
#include "rocksdb/env.h"
|
||||||
@ -78,7 +78,7 @@ class MergeHelper {
|
|||||||
//
|
//
|
||||||
// REQUIRED: The first key in the input is not corrupted.
|
// REQUIRED: The first key in the input is not corrupted.
|
||||||
Status MergeUntil(InternalIterator* iter,
|
Status MergeUntil(InternalIterator* iter,
|
||||||
RangeDelAggregator* range_del_agg = nullptr,
|
CompactionRangeDelAggregatorV2* range_del_agg = nullptr,
|
||||||
const SequenceNumber stop_before = 0,
|
const SequenceNumber stop_before = 0,
|
||||||
const bool at_bottom = false);
|
const bool at_bottom = false);
|
||||||
|
|
||||||
|
@ -194,7 +194,7 @@ int main(int argc, char** argv) {
|
|||||||
for (int i = 0; i < FLAGS_num_runs; i++) {
|
for (int i = 0; i < FLAGS_num_runs; i++) {
|
||||||
rocksdb::RangeDelAggregator range_del_agg(icmp, {} /* snapshots */,
|
rocksdb::RangeDelAggregator range_del_agg(icmp, {} /* snapshots */,
|
||||||
FLAGS_use_collapsed);
|
FLAGS_use_collapsed);
|
||||||
rocksdb::RangeDelAggregatorV2 range_del_agg_v2(
|
rocksdb::ReadRangeDelAggregatorV2 range_del_agg_v2(
|
||||||
&icmp, rocksdb::kMaxSequenceNumber /* upper_bound */);
|
&icmp, rocksdb::kMaxSequenceNumber /* upper_bound */);
|
||||||
|
|
||||||
std::vector<std::unique_ptr<rocksdb::FragmentedRangeTombstoneList> >
|
std::vector<std::unique_ptr<rocksdb::FragmentedRangeTombstoneList> >
|
||||||
|
@ -26,7 +26,10 @@ TruncatedRangeDelIterator::TruncatedRangeDelIterator(
|
|||||||
std::unique_ptr<FragmentedRangeTombstoneIterator> iter,
|
std::unique_ptr<FragmentedRangeTombstoneIterator> iter,
|
||||||
const InternalKeyComparator* icmp, const InternalKey* smallest,
|
const InternalKeyComparator* icmp, const InternalKey* smallest,
|
||||||
const InternalKey* largest)
|
const InternalKey* largest)
|
||||||
: iter_(std::move(iter)), icmp_(icmp) {
|
: iter_(std::move(iter)),
|
||||||
|
icmp_(icmp),
|
||||||
|
smallest_ikey_(smallest),
|
||||||
|
largest_ikey_(largest) {
|
||||||
if (smallest != nullptr) {
|
if (smallest != nullptr) {
|
||||||
pinned_bounds_.emplace_back();
|
pinned_bounds_.emplace_back();
|
||||||
auto& parsed_smallest = pinned_bounds_.back();
|
auto& parsed_smallest = pinned_bounds_.back();
|
||||||
@ -78,6 +81,8 @@ void TruncatedRangeDelIterator::Next() { iter_->TopNext(); }
|
|||||||
|
|
||||||
void TruncatedRangeDelIterator::Prev() { iter_->TopPrev(); }
|
void TruncatedRangeDelIterator::Prev() { iter_->TopPrev(); }
|
||||||
|
|
||||||
|
void TruncatedRangeDelIterator::InternalNext() { iter_->Next(); }
|
||||||
|
|
||||||
// NOTE: target is a user key
|
// NOTE: target is a user key
|
||||||
void TruncatedRangeDelIterator::Seek(const Slice& target) {
|
void TruncatedRangeDelIterator::Seek(const Slice& target) {
|
||||||
if (largest_ != nullptr &&
|
if (largest_ != nullptr &&
|
||||||
@ -86,6 +91,11 @@ void TruncatedRangeDelIterator::Seek(const Slice& target) {
|
|||||||
iter_->Invalidate();
|
iter_->Invalidate();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (smallest_ != nullptr &&
|
||||||
|
icmp_->user_comparator()->Compare(target, smallest_->user_key) < 0) {
|
||||||
|
iter_->Seek(smallest_->user_key);
|
||||||
|
return;
|
||||||
|
}
|
||||||
iter_->Seek(target);
|
iter_->Seek(target);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,12 +107,51 @@ void TruncatedRangeDelIterator::SeekForPrev(const Slice& target) {
|
|||||||
iter_->Invalidate();
|
iter_->Invalidate();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (largest_ != nullptr &&
|
||||||
|
icmp_->user_comparator()->Compare(largest_->user_key, target) < 0) {
|
||||||
|
iter_->SeekForPrev(largest_->user_key);
|
||||||
|
return;
|
||||||
|
}
|
||||||
iter_->SeekForPrev(target);
|
iter_->SeekForPrev(target);
|
||||||
}
|
}
|
||||||
|
|
||||||
void TruncatedRangeDelIterator::SeekToFirst() { iter_->SeekToTopFirst(); }
|
void TruncatedRangeDelIterator::SeekToFirst() {
|
||||||
|
if (smallest_ != nullptr) {
|
||||||
|
iter_->Seek(smallest_->user_key);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
iter_->SeekToTopFirst();
|
||||||
|
}
|
||||||
|
|
||||||
void TruncatedRangeDelIterator::SeekToLast() { iter_->SeekToTopLast(); }
|
void TruncatedRangeDelIterator::SeekToLast() {
|
||||||
|
if (largest_ != nullptr) {
|
||||||
|
iter_->SeekForPrev(largest_->user_key);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
iter_->SeekToTopLast();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
|
||||||
|
TruncatedRangeDelIterator::SplitBySnapshot(
|
||||||
|
const std::vector<SequenceNumber>& snapshots) {
|
||||||
|
using FragmentedIterPair =
|
||||||
|
std::pair<const SequenceNumber,
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator>>;
|
||||||
|
|
||||||
|
auto split_untruncated_iters = iter_->SplitBySnapshot(snapshots);
|
||||||
|
std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
|
||||||
|
split_truncated_iters;
|
||||||
|
std::for_each(
|
||||||
|
split_untruncated_iters.begin(), split_untruncated_iters.end(),
|
||||||
|
[&](FragmentedIterPair& iter_pair) {
|
||||||
|
std::unique_ptr<TruncatedRangeDelIterator> truncated_iter(
|
||||||
|
new TruncatedRangeDelIterator(std::move(iter_pair.second), icmp_,
|
||||||
|
smallest_ikey_, largest_ikey_));
|
||||||
|
split_truncated_iters.emplace(iter_pair.first,
|
||||||
|
std::move(truncated_iter));
|
||||||
|
});
|
||||||
|
return split_truncated_iters;
|
||||||
|
}
|
||||||
|
|
||||||
ForwardRangeDelIterator::ForwardRangeDelIterator(
|
ForwardRangeDelIterator::ForwardRangeDelIterator(
|
||||||
const InternalKeyComparator* icmp,
|
const InternalKeyComparator* icmp,
|
||||||
@ -116,15 +165,6 @@ ForwardRangeDelIterator::ForwardRangeDelIterator(
|
|||||||
|
|
||||||
bool ForwardRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
|
bool ForwardRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
|
||||||
assert(iters_ != nullptr);
|
assert(iters_ != nullptr);
|
||||||
// Pick up previously unseen iterators.
|
|
||||||
for (auto it = std::next(iters_->begin(), unused_idx_); it != iters_->end();
|
|
||||||
++it, ++unused_idx_) {
|
|
||||||
auto& iter = *it;
|
|
||||||
iter->Seek(parsed.user_key);
|
|
||||||
PushIter(iter.get(), parsed);
|
|
||||||
assert(active_iters_.size() == active_seqnums_.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Move active iterators that end before parsed.
|
// Move active iterators that end before parsed.
|
||||||
while (!active_iters_.empty() &&
|
while (!active_iters_.empty() &&
|
||||||
icmp_->Compare((*active_iters_.top())->end_key(), parsed) <= 0) {
|
icmp_->Compare((*active_iters_.top())->end_key(), parsed) <= 0) {
|
||||||
@ -171,15 +211,6 @@ ReverseRangeDelIterator::ReverseRangeDelIterator(
|
|||||||
|
|
||||||
bool ReverseRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
|
bool ReverseRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
|
||||||
assert(iters_ != nullptr);
|
assert(iters_ != nullptr);
|
||||||
// Pick up previously unseen iterators.
|
|
||||||
for (auto it = std::next(iters_->begin(), unused_idx_); it != iters_->end();
|
|
||||||
++it, ++unused_idx_) {
|
|
||||||
auto& iter = *it;
|
|
||||||
iter->SeekForPrev(parsed.user_key);
|
|
||||||
PushIter(iter.get(), parsed);
|
|
||||||
assert(active_iters_.size() == active_seqnums_.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Move active iterators that start after parsed.
|
// Move active iterators that start after parsed.
|
||||||
while (!active_iters_.empty() &&
|
while (!active_iters_.empty() &&
|
||||||
icmp_->Compare(parsed, (*active_iters_.top())->start_key()) < 0) {
|
icmp_->Compare(parsed, (*active_iters_.top())->start_key()) < 0) {
|
||||||
@ -214,38 +245,33 @@ void ReverseRangeDelIterator::Invalidate() {
|
|||||||
inactive_iters_.clear();
|
inactive_iters_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
RangeDelAggregatorV2::RangeDelAggregatorV2(const InternalKeyComparator* icmp,
|
bool RangeDelAggregatorV2::StripeRep::ShouldDelete(
|
||||||
SequenceNumber /* upper_bound */)
|
const ParsedInternalKey& parsed, RangeDelPositioningMode mode) {
|
||||||
: icmp_(icmp), forward_iter_(icmp, &iters_), reverse_iter_(icmp, &iters_) {}
|
if (!InStripe(parsed.sequence) || IsEmpty()) {
|
||||||
|
return false;
|
||||||
void RangeDelAggregatorV2::AddTombstones(
|
|
||||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
|
|
||||||
const InternalKey* smallest, const InternalKey* largest) {
|
|
||||||
if (input_iter == nullptr || input_iter->empty()) {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
if (wrapped_range_del_agg != nullptr) {
|
|
||||||
wrapped_range_del_agg->AddTombstones(std::move(input_iter), smallest,
|
|
||||||
largest);
|
|
||||||
// TODO: this eats the status of the wrapped call; may want to propagate it
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
iters_.emplace_back(new TruncatedRangeDelIterator(std::move(input_iter),
|
|
||||||
icmp_, smallest, largest));
|
|
||||||
}
|
|
||||||
|
|
||||||
bool RangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed,
|
|
||||||
RangeDelPositioningMode mode) {
|
|
||||||
if (wrapped_range_del_agg != nullptr) {
|
|
||||||
return wrapped_range_del_agg->ShouldDelete(parsed, mode);
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (mode) {
|
switch (mode) {
|
||||||
case RangeDelPositioningMode::kForwardTraversal:
|
case RangeDelPositioningMode::kForwardTraversal:
|
||||||
reverse_iter_.Invalidate();
|
InvalidateReverseIter();
|
||||||
|
|
||||||
|
// Pick up previously unseen iterators.
|
||||||
|
for (auto it = std::next(iters_.begin(), forward_iter_.UnusedIdx());
|
||||||
|
it != iters_.end(); ++it, forward_iter_.IncUnusedIdx()) {
|
||||||
|
auto& iter = *it;
|
||||||
|
forward_iter_.AddNewIter(iter.get(), parsed);
|
||||||
|
}
|
||||||
|
|
||||||
return forward_iter_.ShouldDelete(parsed);
|
return forward_iter_.ShouldDelete(parsed);
|
||||||
case RangeDelPositioningMode::kBackwardTraversal:
|
case RangeDelPositioningMode::kBackwardTraversal:
|
||||||
forward_iter_.Invalidate();
|
InvalidateForwardIter();
|
||||||
|
|
||||||
|
// Pick up previously unseen iterators.
|
||||||
|
for (auto it = std::next(iters_.begin(), reverse_iter_.UnusedIdx());
|
||||||
|
it != iters_.end(); ++it, reverse_iter_.IncUnusedIdx()) {
|
||||||
|
auto& iter = *it;
|
||||||
|
reverse_iter_.AddNewIter(iter.get(), parsed);
|
||||||
|
}
|
||||||
|
|
||||||
return reverse_iter_.ShouldDelete(parsed);
|
return reverse_iter_.ShouldDelete(parsed);
|
||||||
default:
|
default:
|
||||||
assert(false);
|
assert(false);
|
||||||
@ -253,14 +279,13 @@ bool RangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start,
|
bool RangeDelAggregatorV2::StripeRep::IsRangeOverlapped(const Slice& start,
|
||||||
const Slice& end) {
|
const Slice& end) {
|
||||||
assert(wrapped_range_del_agg == nullptr);
|
Invalidate();
|
||||||
InvalidateRangeDelMapPositions();
|
|
||||||
|
|
||||||
// Set the internal start/end keys so that:
|
// Set the internal start/end keys so that:
|
||||||
// - if start_ikey has the same user key and sequence number as the current
|
// - if start_ikey has the same user key and sequence number as the
|
||||||
// end key, start_ikey will be considered greater; and
|
// current end key, start_ikey will be considered greater; and
|
||||||
// - if end_ikey has the same user key and sequence number as the current
|
// - if end_ikey has the same user key and sequence number as the current
|
||||||
// start key, end_ikey will be considered greater.
|
// start key, end_ikey will be considered greater.
|
||||||
ParsedInternalKey start_ikey(start, kMaxSequenceNumber,
|
ParsedInternalKey start_ikey(start, kMaxSequenceNumber,
|
||||||
@ -279,9 +304,9 @@ bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!checked_candidate_tombstones) {
|
if (!checked_candidate_tombstones) {
|
||||||
// Do an additional check for when the end of the range is the begin key
|
// Do an additional check for when the end of the range is the begin
|
||||||
// of a tombstone, which we missed earlier since SeekForPrev'ing to the
|
// key of a tombstone, which we missed earlier since SeekForPrev'ing
|
||||||
// start was invalid.
|
// to the start was invalid.
|
||||||
iter->SeekForPrev(end);
|
iter->SeekForPrev(end);
|
||||||
if (iter->Valid() && icmp_->Compare(start_ikey, iter->end_key()) < 0 &&
|
if (iter->Valid() && icmp_->Compare(start_ikey, iter->end_key()) < 0 &&
|
||||||
icmp_->Compare(iter->start_key(), end_ikey) <= 0) {
|
icmp_->Compare(iter->start_key(), end_ikey) <= 0) {
|
||||||
@ -292,4 +317,176 @@ bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start,
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ReadRangeDelAggregatorV2::AddTombstones(
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
|
||||||
|
const InternalKey* smallest, const InternalKey* largest) {
|
||||||
|
if (input_iter == nullptr || input_iter->empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
rep_.AddTombstones(
|
||||||
|
std::unique_ptr<TruncatedRangeDelIterator>(new TruncatedRangeDelIterator(
|
||||||
|
std::move(input_iter), icmp_, smallest, largest)));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ReadRangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed,
|
||||||
|
RangeDelPositioningMode mode) {
|
||||||
|
return rep_.ShouldDelete(parsed, mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ReadRangeDelAggregatorV2::IsRangeOverlapped(const Slice& start,
|
||||||
|
const Slice& end) {
|
||||||
|
InvalidateRangeDelMapPositions();
|
||||||
|
return rep_.IsRangeOverlapped(start, end);
|
||||||
|
}
|
||||||
|
|
||||||
|
void CompactionRangeDelAggregatorV2::AddTombstones(
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
|
||||||
|
const InternalKey* smallest, const InternalKey* largest) {
|
||||||
|
if (input_iter == nullptr || input_iter->empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
assert(input_iter->lower_bound() == 0);
|
||||||
|
assert(input_iter->upper_bound() == kMaxSequenceNumber);
|
||||||
|
parent_iters_.emplace_back(new TruncatedRangeDelIterator(
|
||||||
|
std::move(input_iter), icmp_, smallest, largest));
|
||||||
|
|
||||||
|
auto split_iters = parent_iters_.back()->SplitBySnapshot(*snapshots_);
|
||||||
|
for (auto& split_iter : split_iters) {
|
||||||
|
auto it = reps_.find(split_iter.first);
|
||||||
|
if (it == reps_.end()) {
|
||||||
|
bool inserted;
|
||||||
|
SequenceNumber upper_bound = split_iter.second->upper_bound();
|
||||||
|
SequenceNumber lower_bound = split_iter.second->lower_bound();
|
||||||
|
std::tie(it, inserted) = reps_.emplace(
|
||||||
|
split_iter.first, StripeRep(icmp_, upper_bound, lower_bound));
|
||||||
|
assert(inserted);
|
||||||
|
}
|
||||||
|
assert(it != reps_.end());
|
||||||
|
it->second.AddTombstones(std::move(split_iter.second));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool CompactionRangeDelAggregatorV2::ShouldDelete(
|
||||||
|
const ParsedInternalKey& parsed, RangeDelPositioningMode mode) {
|
||||||
|
auto it = reps_.lower_bound(parsed.sequence);
|
||||||
|
if (it == reps_.end()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return it->second.ShouldDelete(parsed, mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
class TruncatedRangeDelMergingIter : public InternalIterator {
|
||||||
|
public:
|
||||||
|
TruncatedRangeDelMergingIter(
|
||||||
|
const InternalKeyComparator* icmp, const Slice* lower_bound,
|
||||||
|
const Slice* upper_bound, bool upper_bound_inclusive,
|
||||||
|
const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>& children)
|
||||||
|
: icmp_(icmp),
|
||||||
|
lower_bound_(lower_bound),
|
||||||
|
upper_bound_(upper_bound),
|
||||||
|
upper_bound_inclusive_(upper_bound_inclusive),
|
||||||
|
heap_(StartKeyMinComparator(icmp)) {
|
||||||
|
for (auto& child : children) {
|
||||||
|
if (child != nullptr) {
|
||||||
|
assert(child->lower_bound() == 0);
|
||||||
|
assert(child->upper_bound() == kMaxSequenceNumber);
|
||||||
|
children_.push_back(child.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Valid() const override {
|
||||||
|
return !heap_.empty() && BeforeEndKey(heap_.top());
|
||||||
|
}
|
||||||
|
Status status() const override { return Status::OK(); }
|
||||||
|
|
||||||
|
void SeekToFirst() override {
|
||||||
|
heap_.clear();
|
||||||
|
for (auto& child : children_) {
|
||||||
|
if (lower_bound_ != nullptr) {
|
||||||
|
child->Seek(*lower_bound_);
|
||||||
|
} else {
|
||||||
|
child->SeekToFirst();
|
||||||
|
}
|
||||||
|
if (child->Valid()) {
|
||||||
|
heap_.push(child);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void Next() override {
|
||||||
|
auto* top = heap_.top();
|
||||||
|
top->InternalNext();
|
||||||
|
if (top->Valid()) {
|
||||||
|
heap_.replace_top(top);
|
||||||
|
} else {
|
||||||
|
heap_.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Slice key() const override {
|
||||||
|
auto* top = heap_.top();
|
||||||
|
cur_start_key_.Set(top->start_key().user_key, top->seq(),
|
||||||
|
kTypeRangeDeletion);
|
||||||
|
return cur_start_key_.Encode();
|
||||||
|
}
|
||||||
|
|
||||||
|
Slice value() const override {
|
||||||
|
auto* top = heap_.top();
|
||||||
|
assert(top->end_key().sequence == kMaxSequenceNumber);
|
||||||
|
return top->end_key().user_key;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unused InternalIterator methods
|
||||||
|
void Prev() override { assert(false); }
|
||||||
|
void Seek(const Slice& /* target */) override { assert(false); }
|
||||||
|
void SeekForPrev(const Slice& /* target */) override { assert(false); }
|
||||||
|
void SeekToLast() override { assert(false); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool BeforeEndKey(const TruncatedRangeDelIterator* iter) const {
|
||||||
|
if (upper_bound_ == nullptr) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
int cmp = icmp_->user_comparator()->Compare(iter->start_key().user_key,
|
||||||
|
*upper_bound_);
|
||||||
|
return upper_bound_inclusive_ ? cmp <= 0 : cmp < 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
const InternalKeyComparator* icmp_;
|
||||||
|
const Slice* lower_bound_;
|
||||||
|
const Slice* upper_bound_;
|
||||||
|
bool upper_bound_inclusive_;
|
||||||
|
BinaryHeap<TruncatedRangeDelIterator*, StartKeyMinComparator> heap_;
|
||||||
|
std::vector<TruncatedRangeDelIterator*> children_;
|
||||||
|
|
||||||
|
mutable InternalKey cur_start_key_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator>
|
||||||
|
CompactionRangeDelAggregatorV2::NewIterator(const Slice* lower_bound,
|
||||||
|
const Slice* upper_bound,
|
||||||
|
bool upper_bound_inclusive) {
|
||||||
|
InvalidateRangeDelMapPositions();
|
||||||
|
std::unique_ptr<TruncatedRangeDelMergingIter> merging_iter(
|
||||||
|
new TruncatedRangeDelMergingIter(icmp_, lower_bound, upper_bound,
|
||||||
|
upper_bound_inclusive, parent_iters_));
|
||||||
|
|
||||||
|
// TODO: add tests where tombstone fragments can be outside of upper and lower
|
||||||
|
// bound range
|
||||||
|
auto fragmented_tombstone_list =
|
||||||
|
std::make_shared<FragmentedRangeTombstoneList>(
|
||||||
|
std::move(merging_iter), *icmp_, true /* for_compaction */,
|
||||||
|
*snapshots_);
|
||||||
|
|
||||||
|
return std::unique_ptr<FragmentedRangeTombstoneIterator>(
|
||||||
|
new FragmentedRangeTombstoneIterator(
|
||||||
|
fragmented_tombstone_list, *icmp_,
|
||||||
|
kMaxSequenceNumber /* upper_bound */));
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -5,6 +5,8 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <iterator>
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <set>
|
#include <set>
|
||||||
@ -27,8 +29,6 @@
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
class RangeDelAggregatorV2;
|
|
||||||
|
|
||||||
class TruncatedRangeDelIterator {
|
class TruncatedRangeDelIterator {
|
||||||
public:
|
public:
|
||||||
TruncatedRangeDelIterator(
|
TruncatedRangeDelIterator(
|
||||||
@ -41,6 +41,8 @@ class TruncatedRangeDelIterator {
|
|||||||
void Next();
|
void Next();
|
||||||
void Prev();
|
void Prev();
|
||||||
|
|
||||||
|
void InternalNext();
|
||||||
|
|
||||||
// Seeks to the tombstone with the highest viisble sequence number that covers
|
// Seeks to the tombstone with the highest viisble sequence number that covers
|
||||||
// target (a user key). If no such tombstone exists, the position will be at
|
// target (a user key). If no such tombstone exists, the position will be at
|
||||||
// the earliest tombstone that ends after target.
|
// the earliest tombstone that ends after target.
|
||||||
@ -70,12 +72,22 @@ class TruncatedRangeDelIterator {
|
|||||||
|
|
||||||
SequenceNumber seq() const { return iter_->seq(); }
|
SequenceNumber seq() const { return iter_->seq(); }
|
||||||
|
|
||||||
|
std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
|
||||||
|
SplitBySnapshot(const std::vector<SequenceNumber>& snapshots);
|
||||||
|
|
||||||
|
SequenceNumber upper_bound() const { return iter_->upper_bound(); }
|
||||||
|
|
||||||
|
SequenceNumber lower_bound() const { return iter_->lower_bound(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unique_ptr<FragmentedRangeTombstoneIterator> iter_;
|
std::unique_ptr<FragmentedRangeTombstoneIterator> iter_;
|
||||||
const InternalKeyComparator* icmp_;
|
const InternalKeyComparator* icmp_;
|
||||||
const ParsedInternalKey* smallest_ = nullptr;
|
const ParsedInternalKey* smallest_ = nullptr;
|
||||||
const ParsedInternalKey* largest_ = nullptr;
|
const ParsedInternalKey* largest_ = nullptr;
|
||||||
std::list<ParsedInternalKey> pinned_bounds_;
|
std::list<ParsedInternalKey> pinned_bounds_;
|
||||||
|
|
||||||
|
const InternalKey* smallest_ikey_;
|
||||||
|
const InternalKey* largest_ikey_;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SeqMaxComparator {
|
struct SeqMaxComparator {
|
||||||
@ -85,6 +97,17 @@ struct SeqMaxComparator {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct StartKeyMinComparator {
|
||||||
|
explicit StartKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {}
|
||||||
|
|
||||||
|
bool operator()(const TruncatedRangeDelIterator* a,
|
||||||
|
const TruncatedRangeDelIterator* b) const {
|
||||||
|
return icmp->Compare(a->start_key(), b->start_key()) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
const InternalKeyComparator* icmp;
|
||||||
|
};
|
||||||
|
|
||||||
class ForwardRangeDelIterator {
|
class ForwardRangeDelIterator {
|
||||||
public:
|
public:
|
||||||
ForwardRangeDelIterator(
|
ForwardRangeDelIterator(
|
||||||
@ -94,20 +117,20 @@ class ForwardRangeDelIterator {
|
|||||||
bool ShouldDelete(const ParsedInternalKey& parsed);
|
bool ShouldDelete(const ParsedInternalKey& parsed);
|
||||||
void Invalidate();
|
void Invalidate();
|
||||||
|
|
||||||
|
void AddNewIter(TruncatedRangeDelIterator* iter,
|
||||||
|
const ParsedInternalKey& parsed) {
|
||||||
|
iter->Seek(parsed.user_key);
|
||||||
|
PushIter(iter, parsed);
|
||||||
|
assert(active_iters_.size() == active_seqnums_.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t UnusedIdx() const { return unused_idx_; }
|
||||||
|
void IncUnusedIdx() { unused_idx_++; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using ActiveSeqSet =
|
using ActiveSeqSet =
|
||||||
std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>;
|
std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>;
|
||||||
|
|
||||||
struct StartKeyMinComparator {
|
|
||||||
explicit StartKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {}
|
|
||||||
|
|
||||||
bool operator()(const TruncatedRangeDelIterator* a,
|
|
||||||
const TruncatedRangeDelIterator* b) const {
|
|
||||||
return icmp->Compare(a->start_key(), b->start_key()) > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
const InternalKeyComparator* icmp;
|
|
||||||
};
|
|
||||||
struct EndKeyMinComparator {
|
struct EndKeyMinComparator {
|
||||||
explicit EndKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {}
|
explicit EndKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {}
|
||||||
|
|
||||||
@ -124,7 +147,10 @@ class ForwardRangeDelIterator {
|
|||||||
if (!iter->Valid()) {
|
if (!iter->Valid()) {
|
||||||
// The iterator has been fully consumed, so we don't need to add it to
|
// The iterator has been fully consumed, so we don't need to add it to
|
||||||
// either of the heaps.
|
// either of the heaps.
|
||||||
} else if (icmp_->Compare(parsed, iter->start_key()) < 0) {
|
return;
|
||||||
|
}
|
||||||
|
int cmp = icmp_->Compare(parsed, iter->start_key());
|
||||||
|
if (cmp < 0) {
|
||||||
PushInactiveIter(iter);
|
PushInactiveIter(iter);
|
||||||
} else {
|
} else {
|
||||||
PushActiveIter(iter);
|
PushActiveIter(iter);
|
||||||
@ -171,6 +197,16 @@ class ReverseRangeDelIterator {
|
|||||||
bool ShouldDelete(const ParsedInternalKey& parsed);
|
bool ShouldDelete(const ParsedInternalKey& parsed);
|
||||||
void Invalidate();
|
void Invalidate();
|
||||||
|
|
||||||
|
void AddNewIter(TruncatedRangeDelIterator* iter,
|
||||||
|
const ParsedInternalKey& parsed) {
|
||||||
|
iter->SeekForPrev(parsed.user_key);
|
||||||
|
PushIter(iter, parsed);
|
||||||
|
assert(active_iters_.size() == active_seqnums_.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t UnusedIdx() const { return unused_idx_; }
|
||||||
|
void IncUnusedIdx() { unused_idx_++; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using ActiveSeqSet =
|
using ActiveSeqSet =
|
||||||
std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>;
|
std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>;
|
||||||
@ -241,55 +277,160 @@ class ReverseRangeDelIterator {
|
|||||||
|
|
||||||
class RangeDelAggregatorV2 {
|
class RangeDelAggregatorV2 {
|
||||||
public:
|
public:
|
||||||
RangeDelAggregatorV2(const InternalKeyComparator* icmp,
|
explicit RangeDelAggregatorV2(const InternalKeyComparator* icmp)
|
||||||
SequenceNumber upper_bound);
|
: icmp_(icmp) {}
|
||||||
|
virtual ~RangeDelAggregatorV2() {}
|
||||||
|
|
||||||
void AddTombstones(
|
virtual void AddTombstones(
|
||||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
|
||||||
const InternalKey* smallest = nullptr,
|
const InternalKey* smallest = nullptr,
|
||||||
const InternalKey* largest = nullptr);
|
const InternalKey* largest = nullptr) = 0;
|
||||||
|
|
||||||
bool ShouldDelete(const ParsedInternalKey& parsed,
|
bool ShouldDelete(const Slice& key, RangeDelPositioningMode mode) {
|
||||||
RangeDelPositioningMode mode);
|
ParsedInternalKey parsed;
|
||||||
|
if (!ParseInternalKey(key, &parsed)) {
|
||||||
bool IsRangeOverlapped(const Slice& start, const Slice& end);
|
return false;
|
||||||
|
}
|
||||||
void InvalidateRangeDelMapPositions() {
|
return ShouldDelete(parsed, mode);
|
||||||
forward_iter_.Invalidate();
|
|
||||||
reverse_iter_.Invalidate();
|
|
||||||
}
|
}
|
||||||
|
virtual bool ShouldDelete(const ParsedInternalKey& parsed,
|
||||||
|
RangeDelPositioningMode mode) = 0;
|
||||||
|
|
||||||
|
virtual void InvalidateRangeDelMapPositions() = 0;
|
||||||
|
|
||||||
|
virtual bool IsEmpty() const = 0;
|
||||||
|
|
||||||
bool IsEmpty() const { return iters_.empty(); }
|
|
||||||
bool AddFile(uint64_t file_number) {
|
bool AddFile(uint64_t file_number) {
|
||||||
return files_seen_.insert(file_number).second;
|
return files_seen_.insert(file_number).second;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adaptor method to pass calls through to an old-style RangeDelAggregator.
|
protected:
|
||||||
// Will be removed once this new version supports an iterator that can be used
|
class StripeRep {
|
||||||
// during flush/compaction.
|
public:
|
||||||
RangeDelAggregator* DelegateToRangeDelAggregator(
|
StripeRep(const InternalKeyComparator* icmp, SequenceNumber upper_bound,
|
||||||
const std::vector<SequenceNumber>& snapshots) {
|
SequenceNumber lower_bound)
|
||||||
wrapped_range_del_agg.reset(new RangeDelAggregator(
|
: icmp_(icmp),
|
||||||
*icmp_, snapshots, true /* collapse_deletions */));
|
forward_iter_(icmp, &iters_),
|
||||||
return wrapped_range_del_agg.get();
|
reverse_iter_(icmp, &iters_),
|
||||||
}
|
upper_bound_(upper_bound),
|
||||||
|
lower_bound_(lower_bound) {}
|
||||||
|
|
||||||
std::unique_ptr<RangeDelIterator> NewIterator() {
|
void AddTombstones(std::unique_ptr<TruncatedRangeDelIterator> input_iter) {
|
||||||
assert(wrapped_range_del_agg != nullptr);
|
iters_.push_back(std::move(input_iter));
|
||||||
return wrapped_range_del_agg->NewIterator();
|
}
|
||||||
}
|
|
||||||
|
bool IsEmpty() const { return iters_.empty(); }
|
||||||
|
|
||||||
|
bool ShouldDelete(const ParsedInternalKey& parsed,
|
||||||
|
RangeDelPositioningMode mode);
|
||||||
|
|
||||||
|
void Invalidate() {
|
||||||
|
InvalidateForwardIter();
|
||||||
|
InvalidateReverseIter();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool IsRangeOverlapped(const Slice& start, const Slice& end);
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool InStripe(SequenceNumber seq) const {
|
||||||
|
return lower_bound_ <= seq && seq <= upper_bound_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InvalidateForwardIter() { forward_iter_.Invalidate(); }
|
||||||
|
|
||||||
|
void InvalidateReverseIter() { reverse_iter_.Invalidate(); }
|
||||||
|
|
||||||
|
const InternalKeyComparator* icmp_;
|
||||||
|
std::vector<std::unique_ptr<TruncatedRangeDelIterator>> iters_;
|
||||||
|
ForwardRangeDelIterator forward_iter_;
|
||||||
|
ReverseRangeDelIterator reverse_iter_;
|
||||||
|
SequenceNumber upper_bound_;
|
||||||
|
SequenceNumber lower_bound_;
|
||||||
|
};
|
||||||
|
|
||||||
private:
|
|
||||||
const InternalKeyComparator* icmp_;
|
const InternalKeyComparator* icmp_;
|
||||||
|
|
||||||
std::vector<std::unique_ptr<TruncatedRangeDelIterator>> iters_;
|
private:
|
||||||
std::set<uint64_t> files_seen_;
|
std::set<uint64_t> files_seen_;
|
||||||
|
};
|
||||||
|
|
||||||
ForwardRangeDelIterator forward_iter_;
|
class ReadRangeDelAggregatorV2 : public RangeDelAggregatorV2 {
|
||||||
ReverseRangeDelIterator reverse_iter_;
|
public:
|
||||||
|
ReadRangeDelAggregatorV2(const InternalKeyComparator* icmp,
|
||||||
|
SequenceNumber upper_bound)
|
||||||
|
: RangeDelAggregatorV2(icmp),
|
||||||
|
rep_(icmp, upper_bound, 0 /* lower_bound */) {}
|
||||||
|
~ReadRangeDelAggregatorV2() override {}
|
||||||
|
|
||||||
// TODO: remove once V2 supports exposing tombstone iterators
|
using RangeDelAggregatorV2::ShouldDelete;
|
||||||
std::unique_ptr<RangeDelAggregator> wrapped_range_del_agg;
|
void AddTombstones(
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
|
||||||
|
const InternalKey* smallest = nullptr,
|
||||||
|
const InternalKey* largest = nullptr) override;
|
||||||
|
|
||||||
|
bool ShouldDelete(const ParsedInternalKey& parsed,
|
||||||
|
RangeDelPositioningMode mode) override;
|
||||||
|
|
||||||
|
bool IsRangeOverlapped(const Slice& start, const Slice& end);
|
||||||
|
|
||||||
|
void InvalidateRangeDelMapPositions() override { rep_.Invalidate(); }
|
||||||
|
|
||||||
|
bool IsEmpty() const override { return rep_.IsEmpty(); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
StripeRep rep_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class CompactionRangeDelAggregatorV2 : public RangeDelAggregatorV2 {
|
||||||
|
public:
|
||||||
|
CompactionRangeDelAggregatorV2(const InternalKeyComparator* icmp,
|
||||||
|
const std::vector<SequenceNumber>& snapshots)
|
||||||
|
: RangeDelAggregatorV2(icmp), snapshots_(&snapshots) {}
|
||||||
|
~CompactionRangeDelAggregatorV2() override {}
|
||||||
|
|
||||||
|
void AddTombstones(
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
|
||||||
|
const InternalKey* smallest = nullptr,
|
||||||
|
const InternalKey* largest = nullptr) override;
|
||||||
|
|
||||||
|
using RangeDelAggregatorV2::ShouldDelete;
|
||||||
|
bool ShouldDelete(const ParsedInternalKey& parsed,
|
||||||
|
RangeDelPositioningMode mode) override;
|
||||||
|
|
||||||
|
bool IsRangeOverlapped(const Slice& start, const Slice& end);
|
||||||
|
|
||||||
|
void InvalidateRangeDelMapPositions() override {
|
||||||
|
for (auto& rep : reps_) {
|
||||||
|
rep.second.Invalidate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool IsEmpty() const override {
|
||||||
|
for (const auto& rep : reps_) {
|
||||||
|
if (!rep.second.IsEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates an iterator over all the range tombstones in the aggregator, for
|
||||||
|
// use in compaction. Nullptr arguments indicate that the iterator range is
|
||||||
|
// unbounded.
|
||||||
|
// NOTE: the boundaries are used for optimization purposes to reduce the
|
||||||
|
// number of tombstones that are passed to the fragmenter; they do not
|
||||||
|
// guarantee that the resulting iterator only contains range tombstones that
|
||||||
|
// cover keys in the provided range. If required, these bounds must be
|
||||||
|
// enforced during iteration.
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> NewIterator(
|
||||||
|
const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr,
|
||||||
|
bool upper_bound_inclusive = false);
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::vector<std::unique_ptr<TruncatedRangeDelIterator>> parent_iters_;
|
||||||
|
std::map<SequenceNumber, StripeRep> reps_;
|
||||||
|
|
||||||
|
const std::vector<SequenceNumber>* snapshots_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -158,7 +158,7 @@ void VerifyShouldDelete(RangeDelAggregatorV2* range_del_agg,
|
|||||||
}
|
}
|
||||||
|
|
||||||
void VerifyIsRangeOverlapped(
|
void VerifyIsRangeOverlapped(
|
||||||
RangeDelAggregatorV2* range_del_agg,
|
ReadRangeDelAggregatorV2* range_del_agg,
|
||||||
const std::vector<IsRangeOverlappedTestCase>& test_cases) {
|
const std::vector<IsRangeOverlappedTestCase>& test_cases) {
|
||||||
for (const auto& test_case : test_cases) {
|
for (const auto& test_case : test_cases) {
|
||||||
EXPECT_EQ(test_case.result,
|
EXPECT_EQ(test_case.result,
|
||||||
@ -166,6 +166,30 @@ void VerifyIsRangeOverlapped(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CheckIterPosition(const RangeTombstone& tombstone,
|
||||||
|
const FragmentedRangeTombstoneIterator* iter) {
|
||||||
|
// Test InternalIterator interface.
|
||||||
|
EXPECT_EQ(tombstone.start_key_, ExtractUserKey(iter->key()));
|
||||||
|
EXPECT_EQ(tombstone.end_key_, iter->value());
|
||||||
|
EXPECT_EQ(tombstone.seq_, iter->seq());
|
||||||
|
|
||||||
|
// Test FragmentedRangeTombstoneIterator interface.
|
||||||
|
EXPECT_EQ(tombstone.start_key_, iter->start_key());
|
||||||
|
EXPECT_EQ(tombstone.end_key_, iter->end_key());
|
||||||
|
EXPECT_EQ(tombstone.seq_, GetInternalKeySeqno(iter->key()));
|
||||||
|
}
|
||||||
|
|
||||||
|
void VerifyFragmentedRangeDels(
|
||||||
|
FragmentedRangeTombstoneIterator* iter,
|
||||||
|
const std::vector<RangeTombstone>& expected_tombstones) {
|
||||||
|
iter->SeekToFirst();
|
||||||
|
for (size_t i = 0; i < expected_tombstones.size(); i++, iter->Next()) {
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
CheckIterPosition(expected_tombstones[i], iter);
|
||||||
|
}
|
||||||
|
EXPECT_FALSE(iter->Valid());
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
TEST_F(RangeDelAggregatorV2Test, EmptyTruncatedIter) {
|
TEST_F(RangeDelAggregatorV2Test, EmptyTruncatedIter) {
|
||||||
@ -253,7 +277,7 @@ TEST_F(RangeDelAggregatorV2Test, UntruncatedIterWithSnapshot) {
|
|||||||
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}});
|
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}});
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RangeDelAggregatorV2Test, TruncatedIter) {
|
TEST_F(RangeDelAggregatorV2Test, TruncatedIterPartiallyCutTombstones) {
|
||||||
auto range_del_iter =
|
auto range_del_iter =
|
||||||
MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}});
|
MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}});
|
||||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||||
@ -289,6 +313,36 @@ TEST_F(RangeDelAggregatorV2Test, TruncatedIter) {
|
|||||||
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}});
|
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(RangeDelAggregatorV2Test, TruncatedIterFullyCutTombstones) {
|
||||||
|
auto range_del_iter =
|
||||||
|
MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}});
|
||||||
|
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||||
|
bytewise_icmp);
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||||
|
new FragmentedRangeTombstoneIterator(&fragment_list, bytewise_icmp,
|
||||||
|
kMaxSequenceNumber));
|
||||||
|
|
||||||
|
InternalKey smallest("f", 7, kTypeValue);
|
||||||
|
InternalKey largest("i", 9, kTypeValue);
|
||||||
|
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp,
|
||||||
|
&smallest, &largest);
|
||||||
|
|
||||||
|
VerifyIterator(&iter, bytewise_icmp,
|
||||||
|
{{InternalValue("f", 7), UncutEndpoint("g"), 8}});
|
||||||
|
|
||||||
|
VerifySeek(
|
||||||
|
&iter, bytewise_icmp,
|
||||||
|
{{"d", InternalValue("f", 7), UncutEndpoint("g"), 8},
|
||||||
|
{"f", InternalValue("f", 7), UncutEndpoint("g"), 8},
|
||||||
|
{"j", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}});
|
||||||
|
|
||||||
|
VerifySeekForPrev(
|
||||||
|
&iter, bytewise_icmp,
|
||||||
|
{{"d", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */},
|
||||||
|
{"f", InternalValue("f", 7), UncutEndpoint("g"), 8},
|
||||||
|
{"j", InternalValue("f", 7), UncutEndpoint("g"), 8}});
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(RangeDelAggregatorV2Test, SingleIterInAggregator) {
|
TEST_F(RangeDelAggregatorV2Test, SingleIterInAggregator) {
|
||||||
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, {"c", "g", 8}});
|
auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, {"c", "g", 8}});
|
||||||
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
|
||||||
@ -297,7 +351,7 @@ TEST_F(RangeDelAggregatorV2Test, SingleIterInAggregator) {
|
|||||||
new FragmentedRangeTombstoneIterator(&fragment_list, bytewise_icmp,
|
new FragmentedRangeTombstoneIterator(&fragment_list, bytewise_icmp,
|
||||||
kMaxSequenceNumber));
|
kMaxSequenceNumber));
|
||||||
|
|
||||||
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber);
|
ReadRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber);
|
||||||
range_del_agg.AddTombstones(std::move(input_iter));
|
range_del_agg.AddTombstones(std::move(input_iter));
|
||||||
|
|
||||||
VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), false},
|
VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), false},
|
||||||
@ -318,7 +372,7 @@ TEST_F(RangeDelAggregatorV2Test, MultipleItersInAggregator) {
|
|||||||
{{{"a", "e", 10}, {"c", "g", 8}},
|
{{{"a", "e", 10}, {"c", "g", 8}},
|
||||||
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
|
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
|
||||||
|
|
||||||
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber);
|
ReadRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber);
|
||||||
for (const auto& fragment_list : fragment_lists) {
|
for (const auto& fragment_list : fragment_lists) {
|
||||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||||
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
|
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
|
||||||
@ -350,7 +404,7 @@ TEST_F(RangeDelAggregatorV2Test, MultipleItersInAggregatorWithUpperBound) {
|
|||||||
{{{"a", "e", 10}, {"c", "g", 8}},
|
{{{"a", "e", 10}, {"c", "g", 8}},
|
||||||
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
|
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
|
||||||
|
|
||||||
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19);
|
ReadRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19);
|
||||||
for (const auto& fragment_list : fragment_lists) {
|
for (const auto& fragment_list : fragment_lists) {
|
||||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||||
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
|
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
|
||||||
@ -387,7 +441,7 @@ TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregator) {
|
|||||||
InternalKey("x", kMaxSequenceNumber, kTypeRangeDeletion)},
|
InternalKey("x", kMaxSequenceNumber, kTypeRangeDeletion)},
|
||||||
{InternalKey("x", 5, kTypeValue), InternalKey("zz", 30, kTypeValue)}};
|
{InternalKey("x", 5, kTypeValue), InternalKey("zz", 30, kTypeValue)}};
|
||||||
|
|
||||||
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19);
|
ReadRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19);
|
||||||
for (size_t i = 0; i < fragment_lists.size(); i++) {
|
for (size_t i = 0; i < fragment_lists.size(); i++) {
|
||||||
const auto& fragment_list = fragment_lists[i];
|
const auto& fragment_list = fragment_lists[i];
|
||||||
const auto& bounds = iter_bounds[i];
|
const auto& bounds = iter_bounds[i];
|
||||||
@ -427,7 +481,7 @@ TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregatorSameLevel) {
|
|||||||
InternalKey("x", kMaxSequenceNumber, kTypeRangeDeletion)},
|
InternalKey("x", kMaxSequenceNumber, kTypeRangeDeletion)},
|
||||||
{InternalKey("x", 5, kTypeValue), InternalKey("zz", 30, kTypeValue)}};
|
{InternalKey("x", 5, kTypeValue), InternalKey("zz", 30, kTypeValue)}};
|
||||||
|
|
||||||
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19);
|
ReadRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19);
|
||||||
|
|
||||||
auto add_iter_to_agg = [&](size_t i) {
|
auto add_iter_to_agg = [&](size_t i) {
|
||||||
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||||
@ -461,6 +515,192 @@ TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregatorSameLevel) {
|
|||||||
{"zz", "zzz", false}});
|
{"zz", "zzz", false}});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(RangeDelAggregatorV2Test, CompactionAggregatorNoSnapshots) {
|
||||||
|
auto fragment_lists = MakeFragmentedTombstoneLists(
|
||||||
|
{{{"a", "e", 10}, {"c", "g", 8}},
|
||||||
|
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
|
||||||
|
|
||||||
|
std::vector<SequenceNumber> snapshots;
|
||||||
|
CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots);
|
||||||
|
for (const auto& fragment_list : fragment_lists) {
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||||
|
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
|
||||||
|
kMaxSequenceNumber));
|
||||||
|
range_del_agg.AddTombstones(std::move(input_iter));
|
||||||
|
}
|
||||||
|
|
||||||
|
VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), true},
|
||||||
|
{InternalValue("b", 19), false},
|
||||||
|
{InternalValue("b", 9), true},
|
||||||
|
{InternalValue("d", 9), true},
|
||||||
|
{InternalValue("e", 7), true},
|
||||||
|
{InternalValue("g", 7), false},
|
||||||
|
{InternalValue("h", 24), true},
|
||||||
|
{InternalValue("i", 24), false},
|
||||||
|
{InternalValue("ii", 14), true},
|
||||||
|
{InternalValue("j", 14), false}});
|
||||||
|
|
||||||
|
auto range_del_compaction_iter = range_del_agg.NewIterator();
|
||||||
|
VerifyFragmentedRangeDels(range_del_compaction_iter.get(), {{"a", "b", 20},
|
||||||
|
{"b", "c", 10},
|
||||||
|
{"c", "e", 10},
|
||||||
|
{"e", "g", 8},
|
||||||
|
{"h", "i", 25},
|
||||||
|
{"ii", "j", 15}});
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RangeDelAggregatorV2Test, CompactionAggregatorWithSnapshots) {
|
||||||
|
auto fragment_lists = MakeFragmentedTombstoneLists(
|
||||||
|
{{{"a", "e", 10}, {"c", "g", 8}},
|
||||||
|
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
|
||||||
|
|
||||||
|
std::vector<SequenceNumber> snapshots{9, 19};
|
||||||
|
CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots);
|
||||||
|
for (const auto& fragment_list : fragment_lists) {
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||||
|
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
|
||||||
|
kMaxSequenceNumber));
|
||||||
|
range_del_agg.AddTombstones(std::move(input_iter));
|
||||||
|
}
|
||||||
|
|
||||||
|
VerifyShouldDelete(
|
||||||
|
&range_del_agg,
|
||||||
|
{
|
||||||
|
{InternalValue("a", 19), false}, // [10, 19]
|
||||||
|
{InternalValue("a", 9), false}, // [0, 9]
|
||||||
|
{InternalValue("b", 9), false}, // [0, 9]
|
||||||
|
{InternalValue("d", 9), false}, // [0, 9]
|
||||||
|
{InternalValue("d", 7), true}, // [0, 9]
|
||||||
|
{InternalValue("e", 7), true}, // [0, 9]
|
||||||
|
{InternalValue("g", 7), false}, // [0, 9]
|
||||||
|
{InternalValue("h", 24), true}, // [20, kMaxSequenceNumber]
|
||||||
|
{InternalValue("i", 24), false}, // [20, kMaxSequenceNumber]
|
||||||
|
{InternalValue("ii", 14), true}, // [10, 19]
|
||||||
|
{InternalValue("j", 14), false} // [10, 19]
|
||||||
|
});
|
||||||
|
|
||||||
|
auto range_del_compaction_iter = range_del_agg.NewIterator();
|
||||||
|
VerifyFragmentedRangeDels(range_del_compaction_iter.get(), {{"a", "b", 20},
|
||||||
|
{"a", "b", 10},
|
||||||
|
{"b", "c", 10},
|
||||||
|
{"c", "e", 10},
|
||||||
|
{"c", "e", 8},
|
||||||
|
{"e", "g", 8},
|
||||||
|
{"h", "i", 25},
|
||||||
|
{"ii", "j", 15}});
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RangeDelAggregatorV2Test, CompactionAggregatorEmptyIteratorLeft) {
|
||||||
|
auto fragment_lists = MakeFragmentedTombstoneLists(
|
||||||
|
{{{"a", "e", 10}, {"c", "g", 8}},
|
||||||
|
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
|
||||||
|
|
||||||
|
std::vector<SequenceNumber> snapshots{9, 19};
|
||||||
|
CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots);
|
||||||
|
for (const auto& fragment_list : fragment_lists) {
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||||
|
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
|
||||||
|
kMaxSequenceNumber));
|
||||||
|
range_del_agg.AddTombstones(std::move(input_iter));
|
||||||
|
}
|
||||||
|
|
||||||
|
Slice start("_");
|
||||||
|
Slice end("__");
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RangeDelAggregatorV2Test, CompactionAggregatorEmptyIteratorRight) {
|
||||||
|
auto fragment_lists = MakeFragmentedTombstoneLists(
|
||||||
|
{{{"a", "e", 10}, {"c", "g", 8}},
|
||||||
|
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
|
||||||
|
|
||||||
|
std::vector<SequenceNumber> snapshots{9, 19};
|
||||||
|
CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots);
|
||||||
|
for (const auto& fragment_list : fragment_lists) {
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||||
|
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
|
||||||
|
kMaxSequenceNumber));
|
||||||
|
range_del_agg.AddTombstones(std::move(input_iter));
|
||||||
|
}
|
||||||
|
|
||||||
|
Slice start("p");
|
||||||
|
Slice end("q");
|
||||||
|
auto range_del_compaction_iter1 =
|
||||||
|
range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */);
|
||||||
|
VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), {});
|
||||||
|
|
||||||
|
auto range_del_compaction_iter2 =
|
||||||
|
range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */);
|
||||||
|
VerifyFragmentedRangeDels(range_del_compaction_iter2.get(), {});
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RangeDelAggregatorV2Test, CompactionAggregatorBoundedIterator) {
|
||||||
|
auto fragment_lists = MakeFragmentedTombstoneLists(
|
||||||
|
{{{"a", "e", 10}, {"c", "g", 8}},
|
||||||
|
{{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}});
|
||||||
|
|
||||||
|
std::vector<SequenceNumber> snapshots{9, 19};
|
||||||
|
CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots);
|
||||||
|
for (const auto& fragment_list : fragment_lists) {
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||||
|
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
|
||||||
|
kMaxSequenceNumber));
|
||||||
|
range_del_agg.AddTombstones(std::move(input_iter));
|
||||||
|
}
|
||||||
|
|
||||||
|
Slice start("bb");
|
||||||
|
Slice end("e");
|
||||||
|
auto range_del_compaction_iter1 =
|
||||||
|
range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */);
|
||||||
|
VerifyFragmentedRangeDels(range_del_compaction_iter1.get(),
|
||||||
|
{{"a", "c", 10}, {"c", "e", 10}, {"c", "e", 8}});
|
||||||
|
|
||||||
|
auto range_del_compaction_iter2 =
|
||||||
|
range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */);
|
||||||
|
VerifyFragmentedRangeDels(
|
||||||
|
range_del_compaction_iter2.get(),
|
||||||
|
{{"a", "c", 10}, {"c", "e", 10}, {"c", "e", 8}, {"e", "g", 8}});
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RangeDelAggregatorV2Test,
|
||||||
|
CompactionAggregatorBoundedIteratorExtraFragments) {
|
||||||
|
auto fragment_lists = MakeFragmentedTombstoneLists(
|
||||||
|
{{{"a", "d", 10}, {"c", "g", 8}},
|
||||||
|
{{"b", "c", 20}, {"d", "f", 30}, {"h", "i", 25}, {"ii", "j", 15}}});
|
||||||
|
|
||||||
|
std::vector<SequenceNumber> snapshots{9, 19};
|
||||||
|
CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots);
|
||||||
|
for (const auto& fragment_list : fragment_lists) {
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
|
||||||
|
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
|
||||||
|
kMaxSequenceNumber));
|
||||||
|
range_del_agg.AddTombstones(std::move(input_iter));
|
||||||
|
}
|
||||||
|
|
||||||
|
Slice start("bb");
|
||||||
|
Slice end("e");
|
||||||
|
auto range_del_compaction_iter1 =
|
||||||
|
range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */);
|
||||||
|
VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), {{"a", "b", 10},
|
||||||
|
{"b", "c", 20},
|
||||||
|
{"b", "c", 10},
|
||||||
|
{"c", "d", 10},
|
||||||
|
{"c", "d", 8},
|
||||||
|
{"d", "f", 30},
|
||||||
|
{"d", "f", 8},
|
||||||
|
{"f", "g", 8}});
|
||||||
|
|
||||||
|
auto range_del_compaction_iter2 =
|
||||||
|
range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */);
|
||||||
|
VerifyFragmentedRangeDels(range_del_compaction_iter2.get(), {{"a", "b", 10},
|
||||||
|
{"b", "c", 20},
|
||||||
|
{"b", "c", 10},
|
||||||
|
{"c", "d", 10},
|
||||||
|
{"c", "d", 8},
|
||||||
|
{"d", "f", 30},
|
||||||
|
{"d", "f", 8},
|
||||||
|
{"f", "g", 8}});
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -174,6 +174,11 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
|
|||||||
const Slice& ikey = unfragmented_tombstones->key();
|
const Slice& ikey = unfragmented_tombstones->key();
|
||||||
Slice tombstone_start_key = ExtractUserKey(ikey);
|
Slice tombstone_start_key = ExtractUserKey(ikey);
|
||||||
SequenceNumber tombstone_seq = GetInternalKeySeqno(ikey);
|
SequenceNumber tombstone_seq = GetInternalKeySeqno(ikey);
|
||||||
|
if (!unfragmented_tombstones->IsKeyPinned()) {
|
||||||
|
pinned_slices_.emplace_back(tombstone_start_key.data(),
|
||||||
|
tombstone_start_key.size());
|
||||||
|
tombstone_start_key = pinned_slices_.back();
|
||||||
|
}
|
||||||
no_tombstones = false;
|
no_tombstones = false;
|
||||||
|
|
||||||
Slice tombstone_end_key = unfragmented_tombstones->value();
|
Slice tombstone_end_key = unfragmented_tombstones->value();
|
||||||
@ -188,13 +193,7 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
|
|||||||
// this new start key.
|
// this new start key.
|
||||||
flush_current_tombstones(tombstone_start_key);
|
flush_current_tombstones(tombstone_start_key);
|
||||||
}
|
}
|
||||||
if (unfragmented_tombstones->IsKeyPinned()) {
|
cur_start_key = tombstone_start_key;
|
||||||
cur_start_key = tombstone_start_key;
|
|
||||||
} else {
|
|
||||||
pinned_slices_.emplace_back(tombstone_start_key.data(),
|
|
||||||
tombstone_start_key.size());
|
|
||||||
cur_start_key = pinned_slices_.back();
|
|
||||||
}
|
|
||||||
|
|
||||||
cur_end_keys.emplace(tombstone_end_key, tombstone_seq, kTypeRangeDeletion);
|
cur_end_keys.emplace(tombstone_end_key, tombstone_seq, kTypeRangeDeletion);
|
||||||
}
|
}
|
||||||
|
@ -146,6 +146,9 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
|
|||||||
seq_pos_ = tombstones_->seq_end();
|
seq_pos_ = tombstones_->seq_end();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RangeTombstone Tombstone() const {
|
||||||
|
return RangeTombstone(start_key(), end_key(), seq());
|
||||||
|
}
|
||||||
Slice start_key() const { return pos_->start_key; }
|
Slice start_key() const { return pos_->start_key; }
|
||||||
Slice end_key() const { return pos_->end_key; }
|
Slice end_key() const { return pos_->end_key; }
|
||||||
SequenceNumber seq() const { return *seq_pos_; }
|
SequenceNumber seq() const { return *seq_pos_; }
|
||||||
|
11
db/repair.cc
11
db/repair.cc
@ -417,11 +417,16 @@ class Repairer {
|
|||||||
SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
|
SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
|
||||||
|
|
||||||
auto write_hint = cfd->CalculateSSTWriteHint(0);
|
auto write_hint = cfd->CalculateSSTWriteHint(0);
|
||||||
|
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
||||||
|
range_del_iters;
|
||||||
|
auto range_del_iter =
|
||||||
|
mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
|
||||||
|
if (range_del_iter != nullptr) {
|
||||||
|
range_del_iters.emplace_back(range_del_iter);
|
||||||
|
}
|
||||||
status = BuildTable(
|
status = BuildTable(
|
||||||
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
|
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
|
||||||
env_options_, table_cache_, iter.get(),
|
env_options_, table_cache_, iter.get(), std::move(range_del_iters),
|
||||||
std::unique_ptr<InternalIterator>(
|
|
||||||
mem->NewRangeTombstoneIterator(ro, vset_.LastSequence())),
|
|
||||||
&meta, cfd->internal_comparator(),
|
&meta, cfd->internal_comparator(),
|
||||||
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
|
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
|
||||||
{}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
|
{}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
|
||||||
|
@ -1057,8 +1057,8 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
|
|||||||
|
|
||||||
Arena arena;
|
Arena arena;
|
||||||
Status status;
|
Status status;
|
||||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
ReadRangeDelAggregatorV2 range_del_agg(&icmp,
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
kMaxSequenceNumber /* upper_bound */);
|
||||||
|
|
||||||
*overlap = false;
|
*overlap = false;
|
||||||
|
|
||||||
|
@ -92,9 +92,7 @@ class BinaryHeap {
|
|||||||
reset_root_cmp_cache();
|
reset_root_cmp_cache();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool empty() const {
|
bool empty() const { return data_.empty(); }
|
||||||
return data_.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t size() const { return data_.size(); }
|
size_t size() const { return data_.size(); }
|
||||||
|
|
||||||
|
@ -19,8 +19,8 @@ Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
|
|||||||
|
|
||||||
DBImpl* idb = static_cast<DBImpl*>(db->GetRootDB());
|
DBImpl* idb = static_cast<DBImpl*>(db->GetRootDB());
|
||||||
auto icmp = InternalKeyComparator(idb->GetOptions().comparator);
|
auto icmp = InternalKeyComparator(idb->GetOptions().comparator);
|
||||||
RangeDelAggregatorV2 range_del_agg(&icmp,
|
ReadRangeDelAggregatorV2 range_del_agg(&icmp,
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
kMaxSequenceNumber /* upper_bound */);
|
||||||
Arena arena;
|
Arena arena;
|
||||||
ScopedArenaIterator iter(
|
ScopedArenaIterator iter(
|
||||||
idb->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber));
|
idb->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user