From 96de211f4cadd547a3662f4ac1595888d3d05b0c Mon Sep 17 00:00:00 2001 From: Abhishek Madan Date: Mon, 17 Dec 2018 13:12:22 -0800 Subject: [PATCH] Add compaction logic to RangeDelAggregatorV2 (#4758) Summary: RangeDelAggregatorV2 now supports ShouldDelete calls on snapshot stripes and creation of range tombstone compaction iterators. RangeDelAggregator is no longer used on any non-test code path, and will be removed in a future commit. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4758 Differential Revision: D13439254 Pulled By: abhimadan fbshipit-source-id: fe105bcf8e3d4a2df37a622d5510843cd71b0401 --- db/builder.cc | 22 +- db/builder.h | 6 +- db/column_family.cc | 2 +- db/compaction_iterator.cc | 4 +- db/compaction_iterator.h | 8 +- db/compaction_iterator_test.cc | 14 +- db/compaction_job.cc | 39 ++-- db/compaction_job.h | 6 +- db/db_compaction_filter_test.cc | 12 +- db/db_impl_open.cc | 54 ++--- db/db_iter.cc | 6 +- db/db_iter.h | 2 +- db/db_test_util.cc | 12 +- db/flush_job.cc | 16 +- db/forward_iterator.cc | 12 +- db/merge_helper.cc | 5 +- db/merge_helper.h | 4 +- db/range_del_aggregator_bench.cc | 2 +- db/range_del_aggregator_v2.cc | 311 +++++++++++++++++++++++------ db/range_del_aggregator_v2.h | 231 ++++++++++++++++----- db/range_del_aggregator_v2_test.cc | 254 ++++++++++++++++++++++- db/range_tombstone_fragmenter.cc | 13 +- db/range_tombstone_fragmenter.h | 3 + db/repair.cc | 11 +- db/version_set.cc | 4 +- util/heap.h | 4 +- utilities/debug.cc | 4 +- 27 files changed, 830 insertions(+), 231 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 0d896846f..60067c425 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -18,6 +18,7 @@ #include "db/event_helpers.h" #include "db/internal_stats.h" #include "db/merge_helper.h" +#include "db/range_del_aggregator_v2.h" #include "db/table_cache.h" #include "db/version_edit.h" #include "monitoring/iostats_context_imp.h" @@ -65,8 +66,9 @@ Status BuildTable( const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, TableCache* table_cache, InternalIterator* iter, - std::unique_ptr range_del_iter, FileMetaData* meta, - const InternalKeyComparator& internal_comparator, + std::vector> + range_del_iters, + FileMetaData* meta, const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, @@ -86,12 +88,10 @@ Status BuildTable( Status s; meta->fd.file_size = 0; iter->SeekToFirst(); - std::unique_ptr range_del_agg( - new RangeDelAggregator(internal_comparator, snapshots)); - s = range_del_agg->AddTombstones(std::move(range_del_iter)); - if (!s.ok()) { - // may be non-ok if a range tombstone key is unparsable - return s; + std::unique_ptr range_del_agg( + new CompactionRangeDelAggregatorV2(&internal_comparator, snapshots)); + for (auto& range_del_iter : range_del_iters) { + range_del_agg->AddTombstones(std::move(range_del_iter)); } std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(), @@ -158,8 +158,10 @@ Status BuildTable( } } - for (auto it = range_del_agg->NewIterator(); it->Valid(); it->Next()) { - auto tombstone = it->Tombstone(); + auto range_del_it = range_del_agg->NewIterator(); + for (range_del_it->SeekToFirst(); range_del_it->Valid(); + range_del_it->Next()) { + auto tombstone = range_del_it->Tombstone(); auto kv = tombstone.Serialize(); builder->Add(kv.first.Encode(), kv.second); meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(), diff --git a/db/builder.h b/db/builder.h index 9995723df..b81355703 100644 --- a/db/builder.h +++ b/db/builder.h @@ -9,6 +9,7 @@ #include #include #include +#include "db/range_tombstone_fragmenter.h" #include "db/table_properties_collector.h" #include "options/cf_options.h" #include "rocksdb/comparator.h" @@ -65,8 +66,9 @@ extern Status BuildTable( const std::string& dbname, Env* env, const ImmutableCFOptions& options, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, TableCache* table_cache, InternalIterator* iter, - std::unique_ptr range_del_iter, FileMetaData* meta, - const InternalKeyComparator& internal_comparator, + std::vector> + range_del_iters, + FileMetaData* meta, const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, diff --git a/db/column_family.cc b/db/column_family.cc index 29298e62a..c1a85a341 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -945,7 +945,7 @@ Status ColumnFamilyData::RangesOverlapWithMemtables( ScopedArenaIterator memtable_iter(merge_iter_builder.Finish()); auto read_seq = super_version->current->version_set()->LastSequence(); - RangeDelAggregatorV2 range_del_agg(&internal_comparator_, read_seq); + ReadRangeDelAggregatorV2 range_del_agg(&internal_comparator_, read_seq); auto* active_range_del_iter = super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq); range_del_agg.AddTombstones( diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index d81b630f3..ad45602cc 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -18,7 +18,7 @@ CompactionIterator::CompactionIterator( SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, - RangeDelAggregator* range_del_agg, const Compaction* compaction, + CompactionRangeDelAggregatorV2* range_del_agg, const Compaction* compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum) @@ -36,7 +36,7 @@ CompactionIterator::CompactionIterator( SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, - RangeDelAggregator* range_del_agg, + CompactionRangeDelAggregatorV2* range_del_agg, std::unique_ptr compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 71359169c..1f6a135b8 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -13,7 +13,7 @@ #include "db/compaction_iteration_stats.h" #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" -#include "db/range_del_aggregator.h" +#include "db/range_del_aggregator_v2.h" #include "db/snapshot_checker.h" #include "options/cf_options.h" #include "rocksdb/compaction_filter.h" @@ -64,7 +64,7 @@ class CompactionIterator { SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, - RangeDelAggregator* range_del_agg, + CompactionRangeDelAggregatorV2* range_del_agg, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, @@ -77,7 +77,7 @@ class CompactionIterator { SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, - RangeDelAggregator* range_del_agg, + CompactionRangeDelAggregatorV2* range_del_agg, std::unique_ptr compaction, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, @@ -141,7 +141,7 @@ class CompactionIterator { Env* env_; bool report_detailed_time_; bool expect_valid_internal_key_; - RangeDelAggregator* range_del_agg_; + CompactionRangeDelAggregatorV2* range_del_agg_; std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 03c5a9c62..a81efafaa 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -221,10 +221,16 @@ class CompactionIteratorTest : public testing::TestWithParam { MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr, bool bottommost_level = false, SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) { - std::unique_ptr range_del_iter( + std::unique_ptr unfragmented_range_del_iter( new test::VectorIterator(range_del_ks, range_del_vs)); - range_del_agg_.reset(new RangeDelAggregator(icmp_, snapshots_)); - ASSERT_OK(range_del_agg_->AddTombstones(std::move(range_del_iter))); + auto tombstone_list = std::make_shared( + std::move(unfragmented_range_del_iter), icmp_); + std::unique_ptr range_del_iter( + new FragmentedRangeTombstoneIterator(tombstone_list, icmp_, + kMaxSequenceNumber)); + range_del_agg_.reset( + new CompactionRangeDelAggregatorV2(&icmp_, snapshots_)); + range_del_agg_->AddTombstones(std::move(range_del_iter)); std::unique_ptr compaction; if (filter || bottommost_level) { @@ -292,7 +298,7 @@ class CompactionIteratorTest : public testing::TestWithParam { std::unique_ptr merge_helper_; std::unique_ptr iter_; std::unique_ptr c_iter_; - std::unique_ptr range_del_agg_; + std::unique_ptr range_del_agg_; std::unique_ptr snapshot_checker_; std::atomic shutting_down_{false}; FakeCompaction* compaction_proxy_; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 8a878fe72..17be3156b 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -805,15 +805,13 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); - RangeDelAggregatorV2 range_del_agg_v2(&cfd->internal_comparator(), - kMaxSequenceNumber /* upper_bound */); - auto* range_del_agg = - range_del_agg_v2.DelegateToRangeDelAggregator(existing_snapshots_); + CompactionRangeDelAggregatorV2 range_del_agg(&cfd->internal_comparator(), + existing_snapshots_); // Although the v2 aggregator is what the level iterator(s) know about, // the AddTombstones calls will be propagated down to the v1 aggregator. std::unique_ptr input(versions_->MakeInputIterator( - sub_compact->compaction, &range_del_agg_v2, env_optiosn_for_read_)); + sub_compact->compaction, &range_del_agg, env_optiosn_for_read_)); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); @@ -902,8 +900,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false, - range_del_agg, sub_compact->compaction, compaction_filter, shutting_down_, - preserve_deletes_seqnum_)); + &range_del_agg, sub_compact->compaction, compaction_filter, + shutting_down_, preserve_deletes_seqnum_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { @@ -1041,7 +1039,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } CompactionIterationStats range_del_out_stats; status = - FinishCompactionOutputFile(input_status, sub_compact, range_del_agg, + FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg, &range_del_out_stats, next_key); RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); @@ -1092,8 +1090,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } if (status.ok() && sub_compact->builder == nullptr && - sub_compact->outputs.size() == 0 && - !range_del_agg->IsEmpty()) { + sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) { // handle subcompaction containing only range deletions status = OpenCompactionOutputFile(sub_compact); } @@ -1102,7 +1099,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // close the output file. if (sub_compact->builder != nullptr) { CompactionIterationStats range_del_out_stats; - Status s = FinishCompactionOutputFile(status, sub_compact, range_del_agg, + Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg, &range_del_out_stats); if (status.ok()) { status = s; @@ -1168,7 +1165,7 @@ void CompactionJob::RecordDroppedKeys( Status CompactionJob::FinishCompactionOutputFile( const Status& input_status, SubcompactionState* sub_compact, - RangeDelAggregator* range_del_agg, + CompactionRangeDelAggregatorV2* range_del_agg, CompactionIterationStats* range_del_out_stats, const Slice* next_table_min_key /* = nullptr */) { AutoThreadOperationStageUpdater stage_updater( @@ -1220,11 +1217,6 @@ Status CompactionJob::FinishCompactionOutputFile( if (existing_snapshots_.size() > 0) { earliest_snapshot = existing_snapshots_[0]; } - auto it = range_del_agg->NewIterator(); - if (lower_bound != nullptr) { - it->Seek(*lower_bound); - } - bool has_overlapping_endpoints; if (upper_bound != nullptr && meta->largest.size() > 0) { has_overlapping_endpoints = @@ -1232,6 +1224,17 @@ Status CompactionJob::FinishCompactionOutputFile( } else { has_overlapping_endpoints = false; } + + auto it = range_del_agg->NewIterator(lower_bound, upper_bound, + has_overlapping_endpoints); + // Position the range tombstone output iterator. There may be tombstone + // fragments that are entirely out of range, so make sure that we do not + // include those. + if (lower_bound != nullptr) { + it->Seek(*lower_bound); + } else { + it->SeekToFirst(); + } for (; it->Valid(); it->Next()) { auto tombstone = it->Tombstone(); if (upper_bound != nullptr) { @@ -1257,6 +1260,8 @@ Status CompactionJob::FinishCompactionOutputFile( } auto kv = tombstone.Serialize(); + assert(lower_bound == nullptr || + ucmp->Compare(*lower_bound, kv.second) < 0); sub_compact->builder->Add(kv.first.Encode(), kv.second); InternalKey smallest_candidate = std::move(kv.first); if (lower_bound != nullptr && diff --git a/db/compaction_job.h b/db/compaction_job.h index a31e8c142..86d97e1db 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -25,12 +25,12 @@ #include "db/job_context.h" #include "db/log_writer.h" #include "db/memtable_list.h" -#include "db/range_del_aggregator.h" +#include "db/range_del_aggregator_v2.h" #include "db/version_edit.h" #include "db/write_controller.h" #include "db/write_thread.h" -#include "options/db_options.h" #include "options/cf_options.h" +#include "options/db_options.h" #include "port/port.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_job_stats.h" @@ -104,7 +104,7 @@ class CompactionJob { Status FinishCompactionOutputFile( const Status& input_status, SubcompactionState* sub_compact, - RangeDelAggregator* range_del_agg, + CompactionRangeDelAggregatorV2* range_del_agg, CompactionIterationStats* range_del_out_stats, const Slice* next_table_min_key = nullptr); Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options); diff --git a/db/db_compaction_filter_test.cc b/db/db_compaction_filter_test.cc index 8dc2ce32c..63d2829d5 100644 --- a/db/db_compaction_filter_test.cc +++ b/db/db_compaction_filter_test.cc @@ -340,8 +340,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) { Arena arena; { InternalKeyComparator icmp(options.comparator); - RangeDelAggregatorV2 range_del_agg(&icmp, - kMaxSequenceNumber /* upper_bound */); + ReadRangeDelAggregatorV2 range_del_agg( + &icmp, kMaxSequenceNumber /* upper_bound */); ScopedArenaIterator iter(dbfull()->NewInternalIterator( &arena, &range_del_agg, kMaxSequenceNumber, handles_[1])); iter->SeekToFirst(); @@ -430,8 +430,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) { count = 0; { InternalKeyComparator icmp(options.comparator); - RangeDelAggregatorV2 range_del_agg(&icmp, - kMaxSequenceNumber /* upper_bound */); + ReadRangeDelAggregatorV2 range_del_agg( + &icmp, kMaxSequenceNumber /* upper_bound */); ScopedArenaIterator iter(dbfull()->NewInternalIterator( &arena, &range_del_agg, kMaxSequenceNumber, handles_[1])); iter->SeekToFirst(); @@ -648,8 +648,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) { int total = 0; Arena arena; InternalKeyComparator icmp(options.comparator); - RangeDelAggregatorV2 range_del_agg(&icmp, - kMaxSequenceNumber /* snapshots */); + ReadRangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* snapshots */); ScopedArenaIterator iter(dbfull()->NewInternalIterator( &arena, &range_del_agg, kMaxSequenceNumber)); iter->SeekToFirst(); diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 24e649973..5ea8c61b5 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -23,8 +23,7 @@ #include "util/sync_point.h" namespace rocksdb { -Options SanitizeOptions(const std::string& dbname, - const Options& src) { +Options SanitizeOptions(const std::string& dbname, const Options& src) { auto db_options = SanitizeOptions(dbname, DBOptions(src)); ImmutableDBOptions immutable_db_options(db_options); auto cf_options = @@ -56,10 +55,9 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.write_buffer_manager.reset( new WriteBufferManager(result.db_write_buffer_size)); } - auto bg_job_limits = DBImpl::GetBGJobLimits(result.max_background_flushes, - result.max_background_compactions, - result.max_background_jobs, - true /* parallelize_compactions */); + auto bg_job_limits = DBImpl::GetBGJobLimits( + result.max_background_flushes, result.max_background_compactions, + result.max_background_jobs, true /* parallelize_compactions */); result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions, Env::Priority::LOW); result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes, @@ -107,14 +105,12 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.db_paths.emplace_back(dbname, std::numeric_limits::max()); } - if (result.use_direct_reads && - result.compaction_readahead_size == 0) { + if (result.use_direct_reads && result.compaction_readahead_size == 0) { TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr); result.compaction_readahead_size = 1024 * 1024 * 2; } - if (result.compaction_readahead_size > 0 || - result.use_direct_reads) { + if (result.compaction_readahead_size > 0 || result.use_direct_reads) { result.new_table_reader_for_compaction_inputs = true; } @@ -218,7 +214,7 @@ static Status ValidateOptions( return Status::OK(); } -} // namespace +} // namespace Status DBImpl::NewDB() { VersionEdit new_db; new_db.SetLogNumber(0); @@ -258,9 +254,8 @@ Status DBImpl::NewDB() { return s; } -Status DBImpl::CreateAndNewDirectory( - Env* env, const std::string& dirname, - std::unique_ptr* directory) { +Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname, + std::unique_ptr* directory) { // We call CreateDirIfMissing() as the directory may already exist (if we // are reopening a DB), when this happens we don't want creating the // directory to cause an error. However, we need to check if creating the @@ -341,8 +336,8 @@ Status DBImpl::Recover( } } else if (s.ok()) { if (immutable_db_options_.error_if_exists) { - return Status::InvalidArgument( - dbname_, "exists (error_if_exists is true)"); + return Status::InvalidArgument(dbname_, + "exists (error_if_exists is true)"); } } else { // Unexpected error reading file @@ -527,10 +522,9 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, std::map cf_name_id_map; std::map cf_lognumber_map; for (auto cfd : *versions_->GetColumnFamilySet()) { - cf_name_id_map.insert( - std::make_pair(cfd->GetName(), cfd->GetID())); + cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID())); cf_lognumber_map.insert( - std::make_pair(cfd->GetID(), cfd->GetLogNumber())); + std::make_pair(cfd->GetID(), cfd->GetLogNumber())); } immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map, @@ -880,8 +874,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // VersionSet::next_file_number_ always to be strictly greater than any // log number versions_->MarkFileNumberUsed(max_log_number + 1); - status = versions_->LogAndApply( - cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_); + status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + edit, &mutex_); if (!status.ok()) { // Recovery failed break; @@ -994,12 +988,17 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, if (use_custom_gc_ && snapshot_checker == nullptr) { snapshot_checker = DisableGCSnapshotChecker::Instance(); } + std::vector> + range_del_iters; + auto range_del_iter = + mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber); + if (range_del_iter != nullptr) { + range_del_iters.emplace_back(range_del_iter); + } s = BuildTable( dbname_, env_, *cfd->ioptions(), mutable_cf_options, env_options_for_compaction_, cfd->table_cache(), iter.get(), - std::unique_ptr( - mem->NewRangeTombstoneIterator(ro, versions_->LastSequence())), - &meta, cfd->internal_comparator(), + std::move(range_del_iters), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), @@ -1033,8 +1032,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, stats.bytes_written = meta.fd.GetFileSize(); stats.num_output_files = 1; cfd->internal_stats()->AddCompactionStats(level, stats); - cfd->internal_stats()->AddCFStats( - InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize()); + cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, + meta.fd.GetFileSize()); RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); return s; } @@ -1227,7 +1226,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, !cfd->mem()->IsMergeOperatorSupported()) { s = Status::InvalidArgument( "The memtable of column family %s does not support merge operator " - "its options.merge_operator is non-null", cfd->GetName().c_str()); + "its options.merge_operator is non-null", + cfd->GetName().c_str()); } if (!s.ok()) { break; diff --git a/db/db_iter.cc b/db/db_iter.cc index 78a6bf47c..cc4a0d5f4 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -171,7 +171,7 @@ class DBIter final: public Iterator { iter_ = iter; iter_->SetPinnedItersMgr(&pinned_iters_mgr_); } - virtual RangeDelAggregatorV2* GetRangeDelAggregator() { + virtual ReadRangeDelAggregatorV2* GetRangeDelAggregator() { return &range_del_agg_; } @@ -341,7 +341,7 @@ class DBIter final: public Iterator { const bool total_order_seek_; // List of operands for merge operator. MergeContext merge_context_; - RangeDelAggregatorV2 range_del_agg_; + ReadRangeDelAggregatorV2 range_del_agg_; LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; ReadCallback* read_callback_; @@ -1479,7 +1479,7 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } -RangeDelAggregatorV2* ArenaWrappedDBIter::GetRangeDelAggregator() { +ReadRangeDelAggregatorV2* ArenaWrappedDBIter::GetRangeDelAggregator() { return db_iter_->GetRangeDelAggregator(); } diff --git a/db/db_iter.h b/db/db_iter.h index 3d359bbb1..6ee869135 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -48,7 +48,7 @@ class ArenaWrappedDBIter : public Iterator { // Get the arena to be used to allocate memory for DBIter to be wrapped, // as well as child iterators in it. virtual Arena* GetArena() { return &arena_; } - virtual RangeDelAggregatorV2* GetRangeDelAggregator(); + virtual ReadRangeDelAggregatorV2* GetRangeDelAggregator(); // Set the internal iterator wrapped inside the DB Iterator. Usually it is // a merging iterator. diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 50092653b..eeff7be51 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -814,8 +814,8 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) { Arena arena; auto options = CurrentOptions(); InternalKeyComparator icmp(options.comparator); - RangeDelAggregatorV2 range_del_agg(&icmp, - kMaxSequenceNumber /* upper_bound */); + ReadRangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); ScopedArenaIterator iter; if (cf == 0) { iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg, @@ -1227,8 +1227,8 @@ void DBTestBase::validateNumberOfEntries(int numValues, int cf) { Arena arena; auto options = CurrentOptions(); InternalKeyComparator icmp(options.comparator); - RangeDelAggregatorV2 range_del_agg(&icmp, - kMaxSequenceNumber /* upper_bound */); + ReadRangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); // This should be defined after range_del_agg so that it destructs the // assigned iterator before it range_del_agg is already destructed. ScopedArenaIterator iter; @@ -1437,8 +1437,8 @@ void DBTestBase::VerifyDBInternal( std::vector> true_data) { Arena arena; InternalKeyComparator icmp(last_options_.comparator); - RangeDelAggregatorV2 range_del_agg(&icmp, - kMaxSequenceNumber /* upper_bound */); + ReadRangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); auto iter = dbfull()->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber); iter->SeekToFirst(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 17ec22ed9..8769c849e 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -24,14 +24,15 @@ #include "db/event_helpers.h" #include "db/log_reader.h" #include "db/log_writer.h" +#include "db/memtable.h" #include "db/memtable_list.h" #include "db/merge_context.h" +#include "db/range_tombstone_fragmenter.h" #include "db/version_set.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" #include "monitoring/thread_status_util.h" #include "port/port.h" -#include "db/memtable.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/statistics.h" @@ -295,7 +296,8 @@ Status FlushJob::WriteLevel0Table() { // memtable and its associated range deletion memtable, respectively, at // corresponding indexes. std::vector memtables; - std::vector range_del_iters; + std::vector> + range_del_iters; ReadOptions ro; ro.total_order_seek = true; Arena arena; @@ -308,9 +310,9 @@ Status FlushJob::WriteLevel0Table() { cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); memtables.push_back(m->NewIterator(ro, &arena)); auto* range_del_iter = - m->NewRangeTombstoneIterator(ro, versions_->LastSequence()); + m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber); if (range_del_iter != nullptr) { - range_del_iters.push_back(range_del_iter); + range_del_iters.emplace_back(range_del_iter); } total_num_entries += m->num_entries(); total_num_deletes += m->num_deletes(); @@ -329,10 +331,6 @@ Status FlushJob::WriteLevel0Table() { ScopedArenaIterator iter( NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], static_cast(memtables.size()), &arena)); - std::unique_ptr range_del_iter(NewMergingIterator( - &cfd_->internal_comparator(), - range_del_iters.empty() ? nullptr : &range_del_iters[0], - static_cast(range_del_iters.size()))); ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", cfd_->GetName().c_str(), job_context_->job_id, @@ -358,7 +356,7 @@ Status FlushJob::WriteLevel0Table() { s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, env_options_, cfd_->table_cache(), iter.get(), - std::move(range_del_iter), &meta_, cfd_->internal_comparator(), + std::move(range_del_iters), &meta_, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->GetName(), existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index cdf9a07f6..226d56d5f 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -73,8 +73,8 @@ class ForwardLevelIterator : public InternalIterator { delete file_iter_; } - RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(), - kMaxSequenceNumber /* upper_bound */); + ReadRangeDelAggregatorV2 range_del_agg( + &cfd_->internal_comparator(), kMaxSequenceNumber /* upper_bound */); file_iter_ = cfd_->table_cache()->NewIterator( read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), *files_[file_index_], @@ -610,8 +610,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { // New sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); } - RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(), - kMaxSequenceNumber /* upper_bound */); + ReadRangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(), + kMaxSequenceNumber /* upper_bound */); mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_); sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_); if (!read_options_.ignore_range_deletions) { @@ -669,8 +669,8 @@ void ForwardIterator::RenewIterators() { mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_); svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_); - RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(), - kMaxSequenceNumber /* upper_bound */); + ReadRangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(), + kMaxSequenceNumber /* upper_bound */); if (!read_options_.ignore_range_deletions) { std::unique_ptr range_del_iter( svnew->mem->NewRangeTombstoneIterator( diff --git a/db/merge_helper.cc b/db/merge_helper.cc index dc6baa963..6f7e760ec 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -110,8 +110,11 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, // keys_ stores the list of keys encountered while merging. // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. +// +// TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator +// and just pass the StripeRep corresponding to the stripe being merged. Status MergeHelper::MergeUntil(InternalIterator* iter, - RangeDelAggregator* range_del_agg, + CompactionRangeDelAggregatorV2* range_del_agg, const SequenceNumber stop_before, const bool at_bottom) { // Get a copy of the internal key, before it's invalidated by iter->Next() diff --git a/db/merge_helper.h b/db/merge_helper.h index 993bbe3e9..1c92a3492 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -11,7 +11,7 @@ #include "db/dbformat.h" #include "db/merge_context.h" -#include "db/range_del_aggregator.h" +#include "db/range_del_aggregator_v2.h" #include "db/snapshot_checker.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" @@ -78,7 +78,7 @@ class MergeHelper { // // REQUIRED: The first key in the input is not corrupted. Status MergeUntil(InternalIterator* iter, - RangeDelAggregator* range_del_agg = nullptr, + CompactionRangeDelAggregatorV2* range_del_agg = nullptr, const SequenceNumber stop_before = 0, const bool at_bottom = false); diff --git a/db/range_del_aggregator_bench.cc b/db/range_del_aggregator_bench.cc index 9fdcefc39..0b8260960 100644 --- a/db/range_del_aggregator_bench.cc +++ b/db/range_del_aggregator_bench.cc @@ -194,7 +194,7 @@ int main(int argc, char** argv) { for (int i = 0; i < FLAGS_num_runs; i++) { rocksdb::RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, FLAGS_use_collapsed); - rocksdb::RangeDelAggregatorV2 range_del_agg_v2( + rocksdb::ReadRangeDelAggregatorV2 range_del_agg_v2( &icmp, rocksdb::kMaxSequenceNumber /* upper_bound */); std::vector > diff --git a/db/range_del_aggregator_v2.cc b/db/range_del_aggregator_v2.cc index a2ae4f7a3..b0667f6fd 100644 --- a/db/range_del_aggregator_v2.cc +++ b/db/range_del_aggregator_v2.cc @@ -26,7 +26,10 @@ TruncatedRangeDelIterator::TruncatedRangeDelIterator( std::unique_ptr iter, const InternalKeyComparator* icmp, const InternalKey* smallest, const InternalKey* largest) - : iter_(std::move(iter)), icmp_(icmp) { + : iter_(std::move(iter)), + icmp_(icmp), + smallest_ikey_(smallest), + largest_ikey_(largest) { if (smallest != nullptr) { pinned_bounds_.emplace_back(); auto& parsed_smallest = pinned_bounds_.back(); @@ -78,6 +81,8 @@ void TruncatedRangeDelIterator::Next() { iter_->TopNext(); } void TruncatedRangeDelIterator::Prev() { iter_->TopPrev(); } +void TruncatedRangeDelIterator::InternalNext() { iter_->Next(); } + // NOTE: target is a user key void TruncatedRangeDelIterator::Seek(const Slice& target) { if (largest_ != nullptr && @@ -86,6 +91,11 @@ void TruncatedRangeDelIterator::Seek(const Slice& target) { iter_->Invalidate(); return; } + if (smallest_ != nullptr && + icmp_->user_comparator()->Compare(target, smallest_->user_key) < 0) { + iter_->Seek(smallest_->user_key); + return; + } iter_->Seek(target); } @@ -97,12 +107,51 @@ void TruncatedRangeDelIterator::SeekForPrev(const Slice& target) { iter_->Invalidate(); return; } + if (largest_ != nullptr && + icmp_->user_comparator()->Compare(largest_->user_key, target) < 0) { + iter_->SeekForPrev(largest_->user_key); + return; + } iter_->SeekForPrev(target); } -void TruncatedRangeDelIterator::SeekToFirst() { iter_->SeekToTopFirst(); } +void TruncatedRangeDelIterator::SeekToFirst() { + if (smallest_ != nullptr) { + iter_->Seek(smallest_->user_key); + return; + } + iter_->SeekToTopFirst(); +} -void TruncatedRangeDelIterator::SeekToLast() { iter_->SeekToTopLast(); } +void TruncatedRangeDelIterator::SeekToLast() { + if (largest_ != nullptr) { + iter_->SeekForPrev(largest_->user_key); + return; + } + iter_->SeekToTopLast(); +} + +std::map> +TruncatedRangeDelIterator::SplitBySnapshot( + const std::vector& snapshots) { + using FragmentedIterPair = + std::pair>; + + auto split_untruncated_iters = iter_->SplitBySnapshot(snapshots); + std::map> + split_truncated_iters; + std::for_each( + split_untruncated_iters.begin(), split_untruncated_iters.end(), + [&](FragmentedIterPair& iter_pair) { + std::unique_ptr truncated_iter( + new TruncatedRangeDelIterator(std::move(iter_pair.second), icmp_, + smallest_ikey_, largest_ikey_)); + split_truncated_iters.emplace(iter_pair.first, + std::move(truncated_iter)); + }); + return split_truncated_iters; +} ForwardRangeDelIterator::ForwardRangeDelIterator( const InternalKeyComparator* icmp, @@ -116,15 +165,6 @@ ForwardRangeDelIterator::ForwardRangeDelIterator( bool ForwardRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) { assert(iters_ != nullptr); - // Pick up previously unseen iterators. - for (auto it = std::next(iters_->begin(), unused_idx_); it != iters_->end(); - ++it, ++unused_idx_) { - auto& iter = *it; - iter->Seek(parsed.user_key); - PushIter(iter.get(), parsed); - assert(active_iters_.size() == active_seqnums_.size()); - } - // Move active iterators that end before parsed. while (!active_iters_.empty() && icmp_->Compare((*active_iters_.top())->end_key(), parsed) <= 0) { @@ -171,15 +211,6 @@ ReverseRangeDelIterator::ReverseRangeDelIterator( bool ReverseRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) { assert(iters_ != nullptr); - // Pick up previously unseen iterators. - for (auto it = std::next(iters_->begin(), unused_idx_); it != iters_->end(); - ++it, ++unused_idx_) { - auto& iter = *it; - iter->SeekForPrev(parsed.user_key); - PushIter(iter.get(), parsed); - assert(active_iters_.size() == active_seqnums_.size()); - } - // Move active iterators that start after parsed. while (!active_iters_.empty() && icmp_->Compare(parsed, (*active_iters_.top())->start_key()) < 0) { @@ -214,38 +245,33 @@ void ReverseRangeDelIterator::Invalidate() { inactive_iters_.clear(); } -RangeDelAggregatorV2::RangeDelAggregatorV2(const InternalKeyComparator* icmp, - SequenceNumber /* upper_bound */) - : icmp_(icmp), forward_iter_(icmp, &iters_), reverse_iter_(icmp, &iters_) {} - -void RangeDelAggregatorV2::AddTombstones( - std::unique_ptr input_iter, - const InternalKey* smallest, const InternalKey* largest) { - if (input_iter == nullptr || input_iter->empty()) { - return; +bool RangeDelAggregatorV2::StripeRep::ShouldDelete( + const ParsedInternalKey& parsed, RangeDelPositioningMode mode) { + if (!InStripe(parsed.sequence) || IsEmpty()) { + return false; } - if (wrapped_range_del_agg != nullptr) { - wrapped_range_del_agg->AddTombstones(std::move(input_iter), smallest, - largest); - // TODO: this eats the status of the wrapped call; may want to propagate it - return; - } - iters_.emplace_back(new TruncatedRangeDelIterator(std::move(input_iter), - icmp_, smallest, largest)); -} - -bool RangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed, - RangeDelPositioningMode mode) { - if (wrapped_range_del_agg != nullptr) { - return wrapped_range_del_agg->ShouldDelete(parsed, mode); - } - switch (mode) { case RangeDelPositioningMode::kForwardTraversal: - reverse_iter_.Invalidate(); + InvalidateReverseIter(); + + // Pick up previously unseen iterators. + for (auto it = std::next(iters_.begin(), forward_iter_.UnusedIdx()); + it != iters_.end(); ++it, forward_iter_.IncUnusedIdx()) { + auto& iter = *it; + forward_iter_.AddNewIter(iter.get(), parsed); + } + return forward_iter_.ShouldDelete(parsed); case RangeDelPositioningMode::kBackwardTraversal: - forward_iter_.Invalidate(); + InvalidateForwardIter(); + + // Pick up previously unseen iterators. + for (auto it = std::next(iters_.begin(), reverse_iter_.UnusedIdx()); + it != iters_.end(); ++it, reverse_iter_.IncUnusedIdx()) { + auto& iter = *it; + reverse_iter_.AddNewIter(iter.get(), parsed); + } + return reverse_iter_.ShouldDelete(parsed); default: assert(false); @@ -253,14 +279,13 @@ bool RangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed, } } -bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start, - const Slice& end) { - assert(wrapped_range_del_agg == nullptr); - InvalidateRangeDelMapPositions(); +bool RangeDelAggregatorV2::StripeRep::IsRangeOverlapped(const Slice& start, + const Slice& end) { + Invalidate(); // Set the internal start/end keys so that: - // - if start_ikey has the same user key and sequence number as the current - // end key, start_ikey will be considered greater; and + // - if start_ikey has the same user key and sequence number as the + // current end key, start_ikey will be considered greater; and // - if end_ikey has the same user key and sequence number as the current // start key, end_ikey will be considered greater. ParsedInternalKey start_ikey(start, kMaxSequenceNumber, @@ -279,9 +304,9 @@ bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start, } if (!checked_candidate_tombstones) { - // Do an additional check for when the end of the range is the begin key - // of a tombstone, which we missed earlier since SeekForPrev'ing to the - // start was invalid. + // Do an additional check for when the end of the range is the begin + // key of a tombstone, which we missed earlier since SeekForPrev'ing + // to the start was invalid. iter->SeekForPrev(end); if (iter->Valid() && icmp_->Compare(start_ikey, iter->end_key()) < 0 && icmp_->Compare(iter->start_key(), end_ikey) <= 0) { @@ -292,4 +317,176 @@ bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start, return false; } +void ReadRangeDelAggregatorV2::AddTombstones( + std::unique_ptr input_iter, + const InternalKey* smallest, const InternalKey* largest) { + if (input_iter == nullptr || input_iter->empty()) { + return; + } + rep_.AddTombstones( + std::unique_ptr(new TruncatedRangeDelIterator( + std::move(input_iter), icmp_, smallest, largest))); +} + +bool ReadRangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode) { + return rep_.ShouldDelete(parsed, mode); +} + +bool ReadRangeDelAggregatorV2::IsRangeOverlapped(const Slice& start, + const Slice& end) { + InvalidateRangeDelMapPositions(); + return rep_.IsRangeOverlapped(start, end); +} + +void CompactionRangeDelAggregatorV2::AddTombstones( + std::unique_ptr input_iter, + const InternalKey* smallest, const InternalKey* largest) { + if (input_iter == nullptr || input_iter->empty()) { + return; + } + assert(input_iter->lower_bound() == 0); + assert(input_iter->upper_bound() == kMaxSequenceNumber); + parent_iters_.emplace_back(new TruncatedRangeDelIterator( + std::move(input_iter), icmp_, smallest, largest)); + + auto split_iters = parent_iters_.back()->SplitBySnapshot(*snapshots_); + for (auto& split_iter : split_iters) { + auto it = reps_.find(split_iter.first); + if (it == reps_.end()) { + bool inserted; + SequenceNumber upper_bound = split_iter.second->upper_bound(); + SequenceNumber lower_bound = split_iter.second->lower_bound(); + std::tie(it, inserted) = reps_.emplace( + split_iter.first, StripeRep(icmp_, upper_bound, lower_bound)); + assert(inserted); + } + assert(it != reps_.end()); + it->second.AddTombstones(std::move(split_iter.second)); + } +} + +bool CompactionRangeDelAggregatorV2::ShouldDelete( + const ParsedInternalKey& parsed, RangeDelPositioningMode mode) { + auto it = reps_.lower_bound(parsed.sequence); + if (it == reps_.end()) { + return false; + } + return it->second.ShouldDelete(parsed, mode); +} + +namespace { + +class TruncatedRangeDelMergingIter : public InternalIterator { + public: + TruncatedRangeDelMergingIter( + const InternalKeyComparator* icmp, const Slice* lower_bound, + const Slice* upper_bound, bool upper_bound_inclusive, + const std::vector>& children) + : icmp_(icmp), + lower_bound_(lower_bound), + upper_bound_(upper_bound), + upper_bound_inclusive_(upper_bound_inclusive), + heap_(StartKeyMinComparator(icmp)) { + for (auto& child : children) { + if (child != nullptr) { + assert(child->lower_bound() == 0); + assert(child->upper_bound() == kMaxSequenceNumber); + children_.push_back(child.get()); + } + } + } + + bool Valid() const override { + return !heap_.empty() && BeforeEndKey(heap_.top()); + } + Status status() const override { return Status::OK(); } + + void SeekToFirst() override { + heap_.clear(); + for (auto& child : children_) { + if (lower_bound_ != nullptr) { + child->Seek(*lower_bound_); + } else { + child->SeekToFirst(); + } + if (child->Valid()) { + heap_.push(child); + } + } + } + + void Next() override { + auto* top = heap_.top(); + top->InternalNext(); + if (top->Valid()) { + heap_.replace_top(top); + } else { + heap_.pop(); + } + } + + Slice key() const override { + auto* top = heap_.top(); + cur_start_key_.Set(top->start_key().user_key, top->seq(), + kTypeRangeDeletion); + return cur_start_key_.Encode(); + } + + Slice value() const override { + auto* top = heap_.top(); + assert(top->end_key().sequence == kMaxSequenceNumber); + return top->end_key().user_key; + } + + // Unused InternalIterator methods + void Prev() override { assert(false); } + void Seek(const Slice& /* target */) override { assert(false); } + void SeekForPrev(const Slice& /* target */) override { assert(false); } + void SeekToLast() override { assert(false); } + + private: + bool BeforeEndKey(const TruncatedRangeDelIterator* iter) const { + if (upper_bound_ == nullptr) { + return true; + } + int cmp = icmp_->user_comparator()->Compare(iter->start_key().user_key, + *upper_bound_); + return upper_bound_inclusive_ ? cmp <= 0 : cmp < 0; + } + + const InternalKeyComparator* icmp_; + const Slice* lower_bound_; + const Slice* upper_bound_; + bool upper_bound_inclusive_; + BinaryHeap heap_; + std::vector children_; + + mutable InternalKey cur_start_key_; +}; + +} // namespace + +std::unique_ptr +CompactionRangeDelAggregatorV2::NewIterator(const Slice* lower_bound, + const Slice* upper_bound, + bool upper_bound_inclusive) { + InvalidateRangeDelMapPositions(); + std::unique_ptr merging_iter( + new TruncatedRangeDelMergingIter(icmp_, lower_bound, upper_bound, + upper_bound_inclusive, parent_iters_)); + + // TODO: add tests where tombstone fragments can be outside of upper and lower + // bound range + auto fragmented_tombstone_list = + std::make_shared( + std::move(merging_iter), *icmp_, true /* for_compaction */, + *snapshots_); + + return std::unique_ptr( + new FragmentedRangeTombstoneIterator( + fragmented_tombstone_list, *icmp_, + kMaxSequenceNumber /* upper_bound */)); +} + } // namespace rocksdb diff --git a/db/range_del_aggregator_v2.h b/db/range_del_aggregator_v2.h index 8413bb9cb..306dbf249 100644 --- a/db/range_del_aggregator_v2.h +++ b/db/range_del_aggregator_v2.h @@ -5,6 +5,8 @@ #pragma once +#include +#include #include #include #include @@ -27,8 +29,6 @@ namespace rocksdb { -class RangeDelAggregatorV2; - class TruncatedRangeDelIterator { public: TruncatedRangeDelIterator( @@ -41,6 +41,8 @@ class TruncatedRangeDelIterator { void Next(); void Prev(); + void InternalNext(); + // Seeks to the tombstone with the highest viisble sequence number that covers // target (a user key). If no such tombstone exists, the position will be at // the earliest tombstone that ends after target. @@ -70,12 +72,22 @@ class TruncatedRangeDelIterator { SequenceNumber seq() const { return iter_->seq(); } + std::map> + SplitBySnapshot(const std::vector& snapshots); + + SequenceNumber upper_bound() const { return iter_->upper_bound(); } + + SequenceNumber lower_bound() const { return iter_->lower_bound(); } + private: std::unique_ptr iter_; const InternalKeyComparator* icmp_; const ParsedInternalKey* smallest_ = nullptr; const ParsedInternalKey* largest_ = nullptr; std::list pinned_bounds_; + + const InternalKey* smallest_ikey_; + const InternalKey* largest_ikey_; }; struct SeqMaxComparator { @@ -85,6 +97,17 @@ struct SeqMaxComparator { } }; +struct StartKeyMinComparator { + explicit StartKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {} + + bool operator()(const TruncatedRangeDelIterator* a, + const TruncatedRangeDelIterator* b) const { + return icmp->Compare(a->start_key(), b->start_key()) > 0; + } + + const InternalKeyComparator* icmp; +}; + class ForwardRangeDelIterator { public: ForwardRangeDelIterator( @@ -94,20 +117,20 @@ class ForwardRangeDelIterator { bool ShouldDelete(const ParsedInternalKey& parsed); void Invalidate(); + void AddNewIter(TruncatedRangeDelIterator* iter, + const ParsedInternalKey& parsed) { + iter->Seek(parsed.user_key); + PushIter(iter, parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + size_t UnusedIdx() const { return unused_idx_; } + void IncUnusedIdx() { unused_idx_++; } + private: using ActiveSeqSet = std::multiset; - struct StartKeyMinComparator { - explicit StartKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {} - - bool operator()(const TruncatedRangeDelIterator* a, - const TruncatedRangeDelIterator* b) const { - return icmp->Compare(a->start_key(), b->start_key()) > 0; - } - - const InternalKeyComparator* icmp; - }; struct EndKeyMinComparator { explicit EndKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {} @@ -124,7 +147,10 @@ class ForwardRangeDelIterator { if (!iter->Valid()) { // The iterator has been fully consumed, so we don't need to add it to // either of the heaps. - } else if (icmp_->Compare(parsed, iter->start_key()) < 0) { + return; + } + int cmp = icmp_->Compare(parsed, iter->start_key()); + if (cmp < 0) { PushInactiveIter(iter); } else { PushActiveIter(iter); @@ -171,6 +197,16 @@ class ReverseRangeDelIterator { bool ShouldDelete(const ParsedInternalKey& parsed); void Invalidate(); + void AddNewIter(TruncatedRangeDelIterator* iter, + const ParsedInternalKey& parsed) { + iter->SeekForPrev(parsed.user_key); + PushIter(iter, parsed); + assert(active_iters_.size() == active_seqnums_.size()); + } + + size_t UnusedIdx() const { return unused_idx_; } + void IncUnusedIdx() { unused_idx_++; } + private: using ActiveSeqSet = std::multiset; @@ -241,55 +277,160 @@ class ReverseRangeDelIterator { class RangeDelAggregatorV2 { public: - RangeDelAggregatorV2(const InternalKeyComparator* icmp, - SequenceNumber upper_bound); + explicit RangeDelAggregatorV2(const InternalKeyComparator* icmp) + : icmp_(icmp) {} + virtual ~RangeDelAggregatorV2() {} - void AddTombstones( + virtual void AddTombstones( std::unique_ptr input_iter, const InternalKey* smallest = nullptr, - const InternalKey* largest = nullptr); + const InternalKey* largest = nullptr) = 0; - bool ShouldDelete(const ParsedInternalKey& parsed, - RangeDelPositioningMode mode); - - bool IsRangeOverlapped(const Slice& start, const Slice& end); - - void InvalidateRangeDelMapPositions() { - forward_iter_.Invalidate(); - reverse_iter_.Invalidate(); + bool ShouldDelete(const Slice& key, RangeDelPositioningMode mode) { + ParsedInternalKey parsed; + if (!ParseInternalKey(key, &parsed)) { + return false; + } + return ShouldDelete(parsed, mode); } + virtual bool ShouldDelete(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode) = 0; + + virtual void InvalidateRangeDelMapPositions() = 0; + + virtual bool IsEmpty() const = 0; - bool IsEmpty() const { return iters_.empty(); } bool AddFile(uint64_t file_number) { return files_seen_.insert(file_number).second; } - // Adaptor method to pass calls through to an old-style RangeDelAggregator. - // Will be removed once this new version supports an iterator that can be used - // during flush/compaction. - RangeDelAggregator* DelegateToRangeDelAggregator( - const std::vector& snapshots) { - wrapped_range_del_agg.reset(new RangeDelAggregator( - *icmp_, snapshots, true /* collapse_deletions */)); - return wrapped_range_del_agg.get(); - } + protected: + class StripeRep { + public: + StripeRep(const InternalKeyComparator* icmp, SequenceNumber upper_bound, + SequenceNumber lower_bound) + : icmp_(icmp), + forward_iter_(icmp, &iters_), + reverse_iter_(icmp, &iters_), + upper_bound_(upper_bound), + lower_bound_(lower_bound) {} - std::unique_ptr NewIterator() { - assert(wrapped_range_del_agg != nullptr); - return wrapped_range_del_agg->NewIterator(); - } + void AddTombstones(std::unique_ptr input_iter) { + iters_.push_back(std::move(input_iter)); + } + + bool IsEmpty() const { return iters_.empty(); } + + bool ShouldDelete(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode); + + void Invalidate() { + InvalidateForwardIter(); + InvalidateReverseIter(); + } + + bool IsRangeOverlapped(const Slice& start, const Slice& end); + + private: + bool InStripe(SequenceNumber seq) const { + return lower_bound_ <= seq && seq <= upper_bound_; + } + + void InvalidateForwardIter() { forward_iter_.Invalidate(); } + + void InvalidateReverseIter() { reverse_iter_.Invalidate(); } + + const InternalKeyComparator* icmp_; + std::vector> iters_; + ForwardRangeDelIterator forward_iter_; + ReverseRangeDelIterator reverse_iter_; + SequenceNumber upper_bound_; + SequenceNumber lower_bound_; + }; - private: const InternalKeyComparator* icmp_; - std::vector> iters_; + private: std::set files_seen_; +}; - ForwardRangeDelIterator forward_iter_; - ReverseRangeDelIterator reverse_iter_; +class ReadRangeDelAggregatorV2 : public RangeDelAggregatorV2 { + public: + ReadRangeDelAggregatorV2(const InternalKeyComparator* icmp, + SequenceNumber upper_bound) + : RangeDelAggregatorV2(icmp), + rep_(icmp, upper_bound, 0 /* lower_bound */) {} + ~ReadRangeDelAggregatorV2() override {} - // TODO: remove once V2 supports exposing tombstone iterators - std::unique_ptr wrapped_range_del_agg; + using RangeDelAggregatorV2::ShouldDelete; + void AddTombstones( + std::unique_ptr input_iter, + const InternalKey* smallest = nullptr, + const InternalKey* largest = nullptr) override; + + bool ShouldDelete(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode) override; + + bool IsRangeOverlapped(const Slice& start, const Slice& end); + + void InvalidateRangeDelMapPositions() override { rep_.Invalidate(); } + + bool IsEmpty() const override { return rep_.IsEmpty(); } + + private: + StripeRep rep_; +}; + +class CompactionRangeDelAggregatorV2 : public RangeDelAggregatorV2 { + public: + CompactionRangeDelAggregatorV2(const InternalKeyComparator* icmp, + const std::vector& snapshots) + : RangeDelAggregatorV2(icmp), snapshots_(&snapshots) {} + ~CompactionRangeDelAggregatorV2() override {} + + void AddTombstones( + std::unique_ptr input_iter, + const InternalKey* smallest = nullptr, + const InternalKey* largest = nullptr) override; + + using RangeDelAggregatorV2::ShouldDelete; + bool ShouldDelete(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode) override; + + bool IsRangeOverlapped(const Slice& start, const Slice& end); + + void InvalidateRangeDelMapPositions() override { + for (auto& rep : reps_) { + rep.second.Invalidate(); + } + } + + bool IsEmpty() const override { + for (const auto& rep : reps_) { + if (!rep.second.IsEmpty()) { + return false; + } + } + return true; + } + + // Creates an iterator over all the range tombstones in the aggregator, for + // use in compaction. Nullptr arguments indicate that the iterator range is + // unbounded. + // NOTE: the boundaries are used for optimization purposes to reduce the + // number of tombstones that are passed to the fragmenter; they do not + // guarantee that the resulting iterator only contains range tombstones that + // cover keys in the provided range. If required, these bounds must be + // enforced during iteration. + std::unique_ptr NewIterator( + const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr, + bool upper_bound_inclusive = false); + + private: + std::vector> parent_iters_; + std::map reps_; + + const std::vector* snapshots_; }; } // namespace rocksdb diff --git a/db/range_del_aggregator_v2_test.cc b/db/range_del_aggregator_v2_test.cc index 79cb548b1..64f8ed079 100644 --- a/db/range_del_aggregator_v2_test.cc +++ b/db/range_del_aggregator_v2_test.cc @@ -158,7 +158,7 @@ void VerifyShouldDelete(RangeDelAggregatorV2* range_del_agg, } void VerifyIsRangeOverlapped( - RangeDelAggregatorV2* range_del_agg, + ReadRangeDelAggregatorV2* range_del_agg, const std::vector& test_cases) { for (const auto& test_case : test_cases) { EXPECT_EQ(test_case.result, @@ -166,6 +166,30 @@ void VerifyIsRangeOverlapped( } } +void CheckIterPosition(const RangeTombstone& tombstone, + const FragmentedRangeTombstoneIterator* iter) { + // Test InternalIterator interface. + EXPECT_EQ(tombstone.start_key_, ExtractUserKey(iter->key())); + EXPECT_EQ(tombstone.end_key_, iter->value()); + EXPECT_EQ(tombstone.seq_, iter->seq()); + + // Test FragmentedRangeTombstoneIterator interface. + EXPECT_EQ(tombstone.start_key_, iter->start_key()); + EXPECT_EQ(tombstone.end_key_, iter->end_key()); + EXPECT_EQ(tombstone.seq_, GetInternalKeySeqno(iter->key())); +} + +void VerifyFragmentedRangeDels( + FragmentedRangeTombstoneIterator* iter, + const std::vector& expected_tombstones) { + iter->SeekToFirst(); + for (size_t i = 0; i < expected_tombstones.size(); i++, iter->Next()) { + ASSERT_TRUE(iter->Valid()); + CheckIterPosition(expected_tombstones[i], iter); + } + EXPECT_FALSE(iter->Valid()); +} + } // namespace TEST_F(RangeDelAggregatorV2Test, EmptyTruncatedIter) { @@ -253,7 +277,7 @@ TEST_F(RangeDelAggregatorV2Test, UntruncatedIterWithSnapshot) { {"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); } -TEST_F(RangeDelAggregatorV2Test, TruncatedIter) { +TEST_F(RangeDelAggregatorV2Test, TruncatedIterPartiallyCutTombstones) { auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}}); FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter), @@ -289,6 +313,36 @@ TEST_F(RangeDelAggregatorV2Test, TruncatedIter) { {"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); } +TEST_F(RangeDelAggregatorV2Test, TruncatedIterFullyCutTombstones) { + auto range_del_iter = + MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}}); + FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter), + bytewise_icmp); + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(&fragment_list, bytewise_icmp, + kMaxSequenceNumber)); + + InternalKey smallest("f", 7, kTypeValue); + InternalKey largest("i", 9, kTypeValue); + TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, + &smallest, &largest); + + VerifyIterator(&iter, bytewise_icmp, + {{InternalValue("f", 7), UncutEndpoint("g"), 8}}); + + VerifySeek( + &iter, bytewise_icmp, + {{"d", InternalValue("f", 7), UncutEndpoint("g"), 8}, + {"f", InternalValue("f", 7), UncutEndpoint("g"), 8}, + {"j", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); + + VerifySeekForPrev( + &iter, bytewise_icmp, + {{"d", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, + {"f", InternalValue("f", 7), UncutEndpoint("g"), 8}, + {"j", InternalValue("f", 7), UncutEndpoint("g"), 8}}); +} + TEST_F(RangeDelAggregatorV2Test, SingleIterInAggregator) { auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, {"c", "g", 8}}); FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter), @@ -297,7 +351,7 @@ TEST_F(RangeDelAggregatorV2Test, SingleIterInAggregator) { new FragmentedRangeTombstoneIterator(&fragment_list, bytewise_icmp, kMaxSequenceNumber)); - RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber); + ReadRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber); range_del_agg.AddTombstones(std::move(input_iter)); VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), false}, @@ -318,7 +372,7 @@ TEST_F(RangeDelAggregatorV2Test, MultipleItersInAggregator) { {{{"a", "e", 10}, {"c", "g", 8}}, {{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}}); - RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber); + ReadRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber); for (const auto& fragment_list : fragment_lists) { std::unique_ptr input_iter( new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp, @@ -350,7 +404,7 @@ TEST_F(RangeDelAggregatorV2Test, MultipleItersInAggregatorWithUpperBound) { {{{"a", "e", 10}, {"c", "g", 8}}, {{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}}); - RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19); + ReadRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19); for (const auto& fragment_list : fragment_lists) { std::unique_ptr input_iter( new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp, @@ -387,7 +441,7 @@ TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregator) { InternalKey("x", kMaxSequenceNumber, kTypeRangeDeletion)}, {InternalKey("x", 5, kTypeValue), InternalKey("zz", 30, kTypeValue)}}; - RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19); + ReadRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19); for (size_t i = 0; i < fragment_lists.size(); i++) { const auto& fragment_list = fragment_lists[i]; const auto& bounds = iter_bounds[i]; @@ -427,7 +481,7 @@ TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregatorSameLevel) { InternalKey("x", kMaxSequenceNumber, kTypeRangeDeletion)}, {InternalKey("x", 5, kTypeValue), InternalKey("zz", 30, kTypeValue)}}; - RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19); + ReadRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19); auto add_iter_to_agg = [&](size_t i) { std::unique_ptr input_iter( @@ -461,6 +515,192 @@ TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregatorSameLevel) { {"zz", "zzz", false}}); } +TEST_F(RangeDelAggregatorV2Test, CompactionAggregatorNoSnapshots) { + auto fragment_lists = MakeFragmentedTombstoneLists( + {{{"a", "e", 10}, {"c", "g", 8}}, + {{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}}); + + std::vector snapshots; + CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots); + for (const auto& fragment_list : fragment_lists) { + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp, + kMaxSequenceNumber)); + range_del_agg.AddTombstones(std::move(input_iter)); + } + + VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), true}, + {InternalValue("b", 19), false}, + {InternalValue("b", 9), true}, + {InternalValue("d", 9), true}, + {InternalValue("e", 7), true}, + {InternalValue("g", 7), false}, + {InternalValue("h", 24), true}, + {InternalValue("i", 24), false}, + {InternalValue("ii", 14), true}, + {InternalValue("j", 14), false}}); + + auto range_del_compaction_iter = range_del_agg.NewIterator(); + VerifyFragmentedRangeDels(range_del_compaction_iter.get(), {{"a", "b", 20}, + {"b", "c", 10}, + {"c", "e", 10}, + {"e", "g", 8}, + {"h", "i", 25}, + {"ii", "j", 15}}); +} + +TEST_F(RangeDelAggregatorV2Test, CompactionAggregatorWithSnapshots) { + auto fragment_lists = MakeFragmentedTombstoneLists( + {{{"a", "e", 10}, {"c", "g", 8}}, + {{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}}); + + std::vector snapshots{9, 19}; + CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots); + for (const auto& fragment_list : fragment_lists) { + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp, + kMaxSequenceNumber)); + range_del_agg.AddTombstones(std::move(input_iter)); + } + + VerifyShouldDelete( + &range_del_agg, + { + {InternalValue("a", 19), false}, // [10, 19] + {InternalValue("a", 9), false}, // [0, 9] + {InternalValue("b", 9), false}, // [0, 9] + {InternalValue("d", 9), false}, // [0, 9] + {InternalValue("d", 7), true}, // [0, 9] + {InternalValue("e", 7), true}, // [0, 9] + {InternalValue("g", 7), false}, // [0, 9] + {InternalValue("h", 24), true}, // [20, kMaxSequenceNumber] + {InternalValue("i", 24), false}, // [20, kMaxSequenceNumber] + {InternalValue("ii", 14), true}, // [10, 19] + {InternalValue("j", 14), false} // [10, 19] + }); + + auto range_del_compaction_iter = range_del_agg.NewIterator(); + VerifyFragmentedRangeDels(range_del_compaction_iter.get(), {{"a", "b", 20}, + {"a", "b", 10}, + {"b", "c", 10}, + {"c", "e", 10}, + {"c", "e", 8}, + {"e", "g", 8}, + {"h", "i", 25}, + {"ii", "j", 15}}); +} + +TEST_F(RangeDelAggregatorV2Test, CompactionAggregatorEmptyIteratorLeft) { + auto fragment_lists = MakeFragmentedTombstoneLists( + {{{"a", "e", 10}, {"c", "g", 8}}, + {{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}}); + + std::vector snapshots{9, 19}; + CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots); + for (const auto& fragment_list : fragment_lists) { + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp, + kMaxSequenceNumber)); + range_del_agg.AddTombstones(std::move(input_iter)); + } + + Slice start("_"); + Slice end("__"); +} + +TEST_F(RangeDelAggregatorV2Test, CompactionAggregatorEmptyIteratorRight) { + auto fragment_lists = MakeFragmentedTombstoneLists( + {{{"a", "e", 10}, {"c", "g", 8}}, + {{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}}); + + std::vector snapshots{9, 19}; + CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots); + for (const auto& fragment_list : fragment_lists) { + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp, + kMaxSequenceNumber)); + range_del_agg.AddTombstones(std::move(input_iter)); + } + + Slice start("p"); + Slice end("q"); + auto range_del_compaction_iter1 = + range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */); + VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), {}); + + auto range_del_compaction_iter2 = + range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */); + VerifyFragmentedRangeDels(range_del_compaction_iter2.get(), {}); +} + +TEST_F(RangeDelAggregatorV2Test, CompactionAggregatorBoundedIterator) { + auto fragment_lists = MakeFragmentedTombstoneLists( + {{{"a", "e", 10}, {"c", "g", 8}}, + {{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}}); + + std::vector snapshots{9, 19}; + CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots); + for (const auto& fragment_list : fragment_lists) { + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp, + kMaxSequenceNumber)); + range_del_agg.AddTombstones(std::move(input_iter)); + } + + Slice start("bb"); + Slice end("e"); + auto range_del_compaction_iter1 = + range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */); + VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), + {{"a", "c", 10}, {"c", "e", 10}, {"c", "e", 8}}); + + auto range_del_compaction_iter2 = + range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */); + VerifyFragmentedRangeDels( + range_del_compaction_iter2.get(), + {{"a", "c", 10}, {"c", "e", 10}, {"c", "e", 8}, {"e", "g", 8}}); +} + +TEST_F(RangeDelAggregatorV2Test, + CompactionAggregatorBoundedIteratorExtraFragments) { + auto fragment_lists = MakeFragmentedTombstoneLists( + {{{"a", "d", 10}, {"c", "g", 8}}, + {{"b", "c", 20}, {"d", "f", 30}, {"h", "i", 25}, {"ii", "j", 15}}}); + + std::vector snapshots{9, 19}; + CompactionRangeDelAggregatorV2 range_del_agg(&bytewise_icmp, snapshots); + for (const auto& fragment_list : fragment_lists) { + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp, + kMaxSequenceNumber)); + range_del_agg.AddTombstones(std::move(input_iter)); + } + + Slice start("bb"); + Slice end("e"); + auto range_del_compaction_iter1 = + range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */); + VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), {{"a", "b", 10}, + {"b", "c", 20}, + {"b", "c", 10}, + {"c", "d", 10}, + {"c", "d", 8}, + {"d", "f", 30}, + {"d", "f", 8}, + {"f", "g", 8}}); + + auto range_del_compaction_iter2 = + range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */); + VerifyFragmentedRangeDels(range_del_compaction_iter2.get(), {{"a", "b", 10}, + {"b", "c", 20}, + {"b", "c", 10}, + {"c", "d", 10}, + {"c", "d", 8}, + {"d", "f", 30}, + {"d", "f", 8}, + {"f", "g", 8}}); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/range_tombstone_fragmenter.cc b/db/range_tombstone_fragmenter.cc index 1748c5430..f9d9f2feb 100644 --- a/db/range_tombstone_fragmenter.cc +++ b/db/range_tombstone_fragmenter.cc @@ -174,6 +174,11 @@ void FragmentedRangeTombstoneList::FragmentTombstones( const Slice& ikey = unfragmented_tombstones->key(); Slice tombstone_start_key = ExtractUserKey(ikey); SequenceNumber tombstone_seq = GetInternalKeySeqno(ikey); + if (!unfragmented_tombstones->IsKeyPinned()) { + pinned_slices_.emplace_back(tombstone_start_key.data(), + tombstone_start_key.size()); + tombstone_start_key = pinned_slices_.back(); + } no_tombstones = false; Slice tombstone_end_key = unfragmented_tombstones->value(); @@ -188,13 +193,7 @@ void FragmentedRangeTombstoneList::FragmentTombstones( // this new start key. flush_current_tombstones(tombstone_start_key); } - if (unfragmented_tombstones->IsKeyPinned()) { - cur_start_key = tombstone_start_key; - } else { - pinned_slices_.emplace_back(tombstone_start_key.data(), - tombstone_start_key.size()); - cur_start_key = pinned_slices_.back(); - } + cur_start_key = tombstone_start_key; cur_end_keys.emplace(tombstone_end_key, tombstone_seq, kTypeRangeDeletion); } diff --git a/db/range_tombstone_fragmenter.h b/db/range_tombstone_fragmenter.h index 306a0347b..a0b77b677 100644 --- a/db/range_tombstone_fragmenter.h +++ b/db/range_tombstone_fragmenter.h @@ -146,6 +146,9 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { seq_pos_ = tombstones_->seq_end(); } + RangeTombstone Tombstone() const { + return RangeTombstone(start_key(), end_key(), seq()); + } Slice start_key() const { return pos_->start_key; } Slice end_key() const { return pos_->end_key; } SequenceNumber seq() const { return *seq_pos_; } diff --git a/db/repair.cc b/db/repair.cc index e7d72f917..4e93a161c 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -417,11 +417,16 @@ class Repairer { SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance(); auto write_hint = cfd->CalculateSSTWriteHint(0); + std::vector> + range_del_iters; + auto range_del_iter = + mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber); + if (range_del_iter != nullptr) { + range_del_iters.emplace_back(range_del_iter); + } status = BuildTable( dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), - env_options_, table_cache_, iter.get(), - std::unique_ptr( - mem->NewRangeTombstoneIterator(ro, vset_.LastSequence())), + env_options_, table_cache_, iter.get(), std::move(range_del_iters), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, snapshot_checker, kNoCompression, diff --git a/db/version_set.cc b/db/version_set.cc index 8349f2857..ad5f898d0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1057,8 +1057,8 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, Arena arena; Status status; - RangeDelAggregatorV2 range_del_agg(&icmp, - kMaxSequenceNumber /* upper_bound */); + ReadRangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); *overlap = false; diff --git a/util/heap.h b/util/heap.h index 8f253d27c..6093c20e2 100644 --- a/util/heap.h +++ b/util/heap.h @@ -92,9 +92,7 @@ class BinaryHeap { reset_root_cmp_cache(); } - bool empty() const { - return data_.empty(); - } + bool empty() const { return data_.empty(); } size_t size() const { return data_.size(); } diff --git a/utilities/debug.cc b/utilities/debug.cc index 3e4912fe7..3dfde980e 100644 --- a/utilities/debug.cc +++ b/utilities/debug.cc @@ -19,8 +19,8 @@ Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key, DBImpl* idb = static_cast(db->GetRootDB()); auto icmp = InternalKeyComparator(idb->GetOptions().comparator); - RangeDelAggregatorV2 range_del_agg(&icmp, - kMaxSequenceNumber /* upper_bound */); + ReadRangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); Arena arena; ScopedArenaIterator iter( idb->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber));