diff --git a/Makefile b/Makefile index e73489f2f..d768647b9 100644 --- a/Makefile +++ b/Makefile @@ -418,6 +418,7 @@ TESTS = \ persistent_cache_test \ statistics_test \ lua_test \ + range_del_aggregator_test \ lru_cache_test \ PARALLEL_TEST = \ @@ -1308,6 +1309,9 @@ lru_cache_test: util/lru_cache_test.o $(LIBOBJECTS) $(TESTHARNESS) lua_test: utilities/lua/rocks_lua_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +range_del_aggregator_test: db/range_del_aggregator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/db/db_iter.cc b/db/db_iter.cc index 64409c060..260eb2c5f 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -123,7 +123,8 @@ class DBIter: public Iterator { prefix_same_as_start_(prefix_same_as_start), pin_thru_lifetime_(pin_data), total_order_seek_(total_order_seek), - range_del_agg_(ioptions.internal_comparator, s) { + range_del_agg_(ioptions.internal_comparator, s, + true /* collapse_deletions */) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = ioptions.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index b8e7b1c89..2c959ba0e 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -41,7 +41,7 @@ TEST_F(DBRangeDelTest, NonBlockBasedTableNotSupported) { TEST_F(DBRangeDelTest, FlushOutputHasOnlyRangeTombstones) { ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1", - "dr1")); + "dr2")); ASSERT_OK(db_->Flush(FlushOptions())); ASSERT_EQ(1, NumTableFilesAtLevel(0)); } diff --git a/db/dbformat.h b/db/dbformat.h index 06939d923..e4fc29f60 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -528,10 +528,11 @@ struct RangeTombstone { Slice start_key_; Slice end_key_; SequenceNumber seq_; - explicit RangeTombstone(Slice sk, Slice ek, SequenceNumber sn) + RangeTombstone() = default; + RangeTombstone(Slice sk, Slice ek, SequenceNumber sn) : start_key_(sk), end_key_(ek), seq_(sn) {} - explicit RangeTombstone(ParsedInternalKey parsed_key, Slice value) { + RangeTombstone(ParsedInternalKey parsed_key, Slice value) { start_key_ = parsed_key.user_key; seq_ = parsed_key.sequence; end_key_ = value; diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc index b97c426b8..6e6574837 100644 --- a/db/range_del_aggregator.cc +++ b/db/range_del_aggregator.cc @@ -11,25 +11,33 @@ namespace rocksdb { RangeDelAggregator::RangeDelAggregator( const InternalKeyComparator& icmp, - const std::vector& snapshots) - : upper_bound_(kMaxSequenceNumber), icmp_(icmp) { + const std::vector& snapshots, + bool collapse_deletions /* = true */) + : upper_bound_(kMaxSequenceNumber), + icmp_(icmp), + collapse_deletions_(collapse_deletions) { InitRep(snapshots); } RangeDelAggregator::RangeDelAggregator(const InternalKeyComparator& icmp, - SequenceNumber snapshot) - : upper_bound_(snapshot), icmp_(icmp) {} + SequenceNumber snapshot, + bool collapse_deletions /* = false */) + : upper_bound_(snapshot), + icmp_(icmp), + collapse_deletions_(collapse_deletions) {} void RangeDelAggregator::InitRep(const std::vector& snapshots) { assert(rep_ == nullptr); rep_.reset(new Rep()); for (auto snapshot : snapshots) { rep_->stripe_map_.emplace( - snapshot, TombstoneMap(stl_wrappers::LessOfComparator(&icmp_))); + snapshot, + TombstoneMap(stl_wrappers::LessOfComparator(icmp_.user_comparator()))); } // Data newer than any snapshot falls in this catch-all stripe rep_->stripe_map_.emplace( - kMaxSequenceNumber, TombstoneMap(stl_wrappers::LessOfComparator(&icmp_))); + kMaxSequenceNumber, + TombstoneMap(stl_wrappers::LessOfComparator(icmp_.user_comparator()))); rep_->pinned_iters_mgr_.StartPinning(); } @@ -50,6 +58,14 @@ bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) { return false; } const auto& tombstone_map = GetTombstoneMap(parsed.sequence); + if (collapse_deletions_) { + auto iter = tombstone_map.upper_bound(parsed.user_key); + if (iter == tombstone_map.begin()) { + return false; + } + --iter; + return parsed.sequence < iter->second.seq_; + } for (const auto& start_key_and_tombstone : tombstone_map) { const auto& tombstone = start_key_and_tombstone.second; if (icmp_.user_comparator()->Compare(parsed.user_key, @@ -67,6 +83,9 @@ bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) { bool RangeDelAggregator::ShouldAddTombstones( bool bottommost_level /* = false */) { + // TODO(andrewkr): can we just open a file and throw it away if it ends up + // empty after AddToBuilder()? This function doesn't take into subcompaction + // boundaries so isn't completely accurate. if (rep_ == nullptr) { return false; } @@ -105,8 +124,7 @@ Status RangeDelAggregator::AddTombstones( return Status::Corruption("Unable to parse range tombstone InternalKey"); } RangeTombstone tombstone(parsed_key, input->value()); - auto& tombstone_map = GetTombstoneMap(tombstone.seq_); - tombstone_map.emplace(input->key(), std::move(tombstone)); + AddTombstone(std::move(tombstone)); input->Next(); } if (!first_iter) { @@ -115,6 +133,159 @@ Status RangeDelAggregator::AddTombstones( return Status::OK(); } +Status RangeDelAggregator::AddTombstone(RangeTombstone tombstone) { + auto& tombstone_map = GetTombstoneMap(tombstone.seq_); + if (collapse_deletions_) { + // In collapsed mode, we only fill the seq_ field in the TombstoneMap's + // values. The end_key is unneeded because we assume the tombstone extends + // until the next tombstone starts. For gaps between real tombstones and + // for the last real tombstone, we denote end keys by inserting fake + // tombstones with sequence number zero. + std::vector new_range_dels{ + tombstone, RangeTombstone(tombstone.end_key_, Slice(), 0)}; + auto new_range_dels_iter = new_range_dels.begin(); + // Position at the first overlapping existing tombstone; if none exists, + // insert until we find an existing one overlapping a new point + const Slice* tombstone_map_begin = nullptr; + if (!tombstone_map.empty()) { + tombstone_map_begin = &tombstone_map.begin()->first; + } + auto last_range_dels_iter = new_range_dels_iter; + while (new_range_dels_iter != new_range_dels.end() && + (tombstone_map_begin == nullptr || + icmp_.user_comparator()->Compare(new_range_dels_iter->start_key_, + *tombstone_map_begin) < 0)) { + tombstone_map.emplace( + new_range_dels_iter->start_key_, + RangeTombstone(Slice(), Slice(), new_range_dels_iter->seq_)); + last_range_dels_iter = new_range_dels_iter; + ++new_range_dels_iter; + } + if (new_range_dels_iter == new_range_dels.end()) { + return Status::OK(); + } + // above loop advances one too far + new_range_dels_iter = last_range_dels_iter; + auto tombstone_map_iter = + tombstone_map.upper_bound(new_range_dels_iter->start_key_); + // if nothing overlapped we would've already inserted all the new points + // and returned early + assert(tombstone_map_iter != tombstone_map.begin()); + tombstone_map_iter--; + + // untermed_seq is non-kMaxSequenceNumber when we covered an existing point + // but haven't seen its corresponding endpoint. It's used for (1) deciding + // whether to forcibly insert the new interval's endpoint; and (2) possibly + // raising the seqnum for the to-be-inserted element (we insert the max + // seqnum between the next new interval and the unterminated interval). + SequenceNumber untermed_seq = kMaxSequenceNumber; + while (tombstone_map_iter != tombstone_map.end() && + new_range_dels_iter != new_range_dels.end()) { + const Slice *tombstone_map_iter_end = nullptr, + *new_range_dels_iter_end = nullptr; + if (tombstone_map_iter != tombstone_map.end()) { + auto next_tombstone_map_iter = std::next(tombstone_map_iter); + if (next_tombstone_map_iter != tombstone_map.end()) { + tombstone_map_iter_end = &next_tombstone_map_iter->first; + } + } + if (new_range_dels_iter != new_range_dels.end()) { + auto next_new_range_dels_iter = std::next(new_range_dels_iter); + if (next_new_range_dels_iter != new_range_dels.end()) { + new_range_dels_iter_end = &next_new_range_dels_iter->start_key_; + } + } + + // our positions in existing/new tombstone collections should always + // overlap. The non-overlapping cases are handled above and below this + // loop. + assert(new_range_dels_iter_end == nullptr || + icmp_.user_comparator()->Compare(tombstone_map_iter->first, + *new_range_dels_iter_end) < 0); + assert(tombstone_map_iter_end == nullptr || + icmp_.user_comparator()->Compare(new_range_dels_iter->start_key_, + *tombstone_map_iter_end) < 0); + + int new_to_old_start_cmp = icmp_.user_comparator()->Compare( + new_range_dels_iter->start_key_, tombstone_map_iter->first); + // nullptr end means extends infinitely rightwards, set new_to_old_end_cmp + // accordingly so we can use common code paths later. + int new_to_old_end_cmp; + if (new_range_dels_iter_end == nullptr && + tombstone_map_iter_end == nullptr) { + new_to_old_end_cmp = 0; + } else if (new_range_dels_iter_end == nullptr) { + new_to_old_end_cmp = 1; + } else if (tombstone_map_iter_end == nullptr) { + new_to_old_end_cmp = -1; + } else { + new_to_old_end_cmp = icmp_.user_comparator()->Compare( + *new_range_dels_iter_end, *tombstone_map_iter_end); + } + + if (new_to_old_start_cmp < 0) { + // the existing one's left endpoint comes after, so raise/delete it if + // it's covered. + if (tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { + untermed_seq = tombstone_map_iter->second.seq_; + if (tombstone_map_iter != tombstone_map.begin() && + std::prev(tombstone_map_iter)->second.seq_ == + new_range_dels_iter->seq_) { + tombstone_map_iter = tombstone_map.erase(tombstone_map_iter); + --tombstone_map_iter; + } else { + tombstone_map_iter->second.seq_ = new_range_dels_iter->seq_; + } + } + } else if (new_to_old_start_cmp > 0) { + if (untermed_seq != kMaxSequenceNumber || + tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { + auto seq = tombstone_map_iter->second.seq_; + // need to adjust this element if not intended to span beyond the new + // element (i.e., was_tombstone_map_iter_raised == true), or if it + // can be raised + tombstone_map_iter = tombstone_map.emplace( + new_range_dels_iter->start_key_, + RangeTombstone( + Slice(), Slice(), + std::max( + untermed_seq == kMaxSequenceNumber ? 0 : untermed_seq, + new_range_dels_iter->seq_))); + untermed_seq = seq; + } + } else { + // their left endpoints coincide, so raise the existing one if needed + if (tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { + untermed_seq = tombstone_map_iter->second.seq_; + tombstone_map_iter->second.seq_ = new_range_dels_iter->seq_; + } + } + + // advance whichever one ends earlier, or both if their right endpoints + // coincide + if (new_to_old_end_cmp < 0) { + ++new_range_dels_iter; + } else if (new_to_old_end_cmp > 0) { + ++tombstone_map_iter; + untermed_seq = kMaxSequenceNumber; + } else { + ++new_range_dels_iter; + ++tombstone_map_iter; + untermed_seq = kMaxSequenceNumber; + } + } + while (new_range_dels_iter != new_range_dels.end()) { + tombstone_map.emplace( + new_range_dels_iter->start_key_, + RangeTombstone(Slice(), Slice(), new_range_dels_iter->seq_)); + ++new_range_dels_iter; + } + } else { + tombstone_map.emplace(tombstone.start_key_, std::move(tombstone)); + } + return Status::OK(); +} + RangeDelAggregator::TombstoneMap& RangeDelAggregator::GetTombstoneMap( SequenceNumber seq) { assert(rep_ != nullptr); @@ -148,10 +319,16 @@ void RangeDelAggregator::AddToBuilder( auto stripe_map_iter = rep_->stripe_map_.begin(); assert(stripe_map_iter != rep_->stripe_map_.end()); if (bottommost_level) { - range_del_out_stats->num_range_del_drop_obsolete += - static_cast(stripe_map_iter->second.size()); - range_del_out_stats->num_record_drop_obsolete += - static_cast(stripe_map_iter->second.size()); + // TODO(andrewkr): these are counted for each compaction output file, so + // lots of double-counting. + if (!stripe_map_iter->second.empty()) { + range_del_out_stats->num_range_del_drop_obsolete += + static_cast(stripe_map_iter->second.size()) - + (collapse_deletions_ ? 1 : 0); + range_del_out_stats->num_record_drop_obsolete += + static_cast(stripe_map_iter->second.size()) - + (collapse_deletions_ ? 1 : 0); + } // For the bottommost level, keys covered by tombstones in the first // (oldest) stripe have been compacted away, so the tombstones are obsolete. ++stripe_map_iter; @@ -161,8 +338,22 @@ void RangeDelAggregator::AddToBuilder( // insert them into a std::map on the read path. bool first_added = false; while (stripe_map_iter != rep_->stripe_map_.end()) { - for (const auto& start_key_and_tombstone : stripe_map_iter->second) { - const auto& tombstone = start_key_and_tombstone.second; + for (auto tombstone_map_iter = stripe_map_iter->second.begin(); + tombstone_map_iter != stripe_map_iter->second.end(); + ++tombstone_map_iter) { + RangeTombstone tombstone; + if (collapse_deletions_) { + auto next_tombstone_map_iter = std::next(tombstone_map_iter); + if (next_tombstone_map_iter == stripe_map_iter->second.end()) { + // it's the sentinel tombstone + break; + } + tombstone.start_key_ = tombstone_map_iter->first; + tombstone.end_key_ = next_tombstone_map_iter->first; + tombstone.seq_ = tombstone_map_iter->second.seq_; + } else { + tombstone = tombstone_map_iter->second; + } if (upper_bound != nullptr && icmp_.user_comparator()->Compare(*upper_bound, tombstone.start_key_) <= 0) { diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index 47c29a043..be5b3fa61 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -32,22 +32,25 @@ class RangeDelAggregator { // stripes, which is the seqnum range between consecutive snapshots, // including the higher snapshot and excluding the lower one. Currently, // this is used by ShouldDelete() to prevent deletion of keys that are - // covered by range tombstones in other snapshot stripes. In case of writes - // (flush/compaction), all DB snapshots are provided such that no keys are - // removed that are uncovered according to any DB snapshot. In case of read - // (get/iterator), only the user snapshot is provided such that the seqnum - // space is divided into two stripes, where only tombstones in the older - // stripe are considered by ShouldDelete(). + // covered by range tombstones in other snapshot stripes. This constructor + // is used for writes (flush/compaction). All DB snapshots are provided + // such that no keys are removed that are uncovered according to any DB + // snapshot. // Note this overload does not lazily initialize Rep. RangeDelAggregator(const InternalKeyComparator& icmp, - const std::vector& snapshots); + const std::vector& snapshots, + bool collapse_deletions = true); // @param upper_bound Similar to snapshots above, except with a single // snapshot, which allows us to store the snapshot on the stack and defer // initialization of heap-allocating members (in Rep) until the first range - // deletion is encountered. + // deletion is encountered. This constructor is used in case of reads (get/ + // iterator), for which only the user snapshot (upper_bound) is provided + // such that the seqnum space is divided into two stripes. Only the older + // stripe will be used by ShouldDelete(). RangeDelAggregator(const InternalKeyComparator& icmp, - SequenceNumber upper_bound); + SequenceNumber upper_bound, + bool collapse_deletions = false); // Returns whether the key should be deleted, which is the case when it is // covered by a range tombstone residing in the same snapshot stripe. @@ -87,8 +90,8 @@ class RangeDelAggregator { bool IsEmpty(); private: - // Maps tombstone internal start key -> tombstone object - typedef std::map + // Maps tombstone user start key -> tombstone object + typedef std::multimap TombstoneMap; // Maps snapshot seqnum -> map of tombstones that fall in that stripe, i.e., // their seqnums are greater than the next smaller snapshot's seqnum. @@ -104,10 +107,13 @@ class RangeDelAggregator { void InitRep(const std::vector& snapshots); TombstoneMap& GetTombstoneMap(SequenceNumber seq); + Status AddTombstone(RangeTombstone tombstone); SequenceNumber upper_bound_; std::unique_ptr rep_; const InternalKeyComparator& icmp_; + // collapse range deletions so they're binary searchable + const bool collapse_deletions_; }; } // namespace rocksdb diff --git a/db/range_del_aggregator_test.cc b/db/range_del_aggregator_test.cc new file mode 100644 index 000000000..975418946 --- /dev/null +++ b/db/range_del_aggregator_test.cc @@ -0,0 +1,153 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include + +#include "db/db_test_util.h" +#include "db/range_del_aggregator.h" +#include "rocksdb/comparator.h" +#include "util/testutil.h" + +namespace rocksdb { + +class RangeDelAggregatorTest : public testing::Test {}; + +namespace { + +struct ExpectedPoint { + Slice begin; + SequenceNumber seq; +}; + +enum Direction { + kForward, + kReverse, +}; + +void VerifyRangeDels(const std::vector& range_dels, + const std::vector& expected_points) { + // Test same result regardless of which order the range deletions are added. + for (Direction dir : {kForward, kReverse}) { + auto icmp = InternalKeyComparator(BytewiseComparator()); + RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, true); + std::vector keys, values; + for (const auto& range_del : range_dels) { + auto key_and_value = range_del.Serialize(); + keys.push_back(key_and_value.first.Encode().ToString()); + values.push_back(key_and_value.second.ToString()); + } + if (dir == kReverse) { + std::reverse(keys.begin(), keys.end()); + std::reverse(values.begin(), values.end()); + } + std::unique_ptr range_del_iter( + new test::VectorIterator(keys, values)); + range_del_agg.AddTombstones(std::move(range_del_iter)); + + for (const auto expected_point : expected_points) { + ParsedInternalKey parsed_key; + parsed_key.user_key = expected_point.begin; + parsed_key.sequence = expected_point.seq; + parsed_key.type = kTypeValue; + ASSERT_FALSE(range_del_agg.ShouldDelete(parsed_key)); + if (parsed_key.sequence > 0) { + --parsed_key.sequence; + ASSERT_TRUE(range_del_agg.ShouldDelete(parsed_key)); + } + } + } +} + +} // anonymous namespace + +TEST_F(RangeDelAggregatorTest, Empty) { VerifyRangeDels({}, {{"a", 0}}); } + +TEST_F(RangeDelAggregatorTest, SameStartAndEnd) { + VerifyRangeDels({{"a", "a", 5}}, {{" ", 0}, {"a", 0}, {"b", 0}}); +} + +TEST_F(RangeDelAggregatorTest, Single) { + VerifyRangeDels({{"a", "b", 10}}, {{" ", 0}, {"a", 10}, {"b", 0}}); +} + +TEST_F(RangeDelAggregatorTest, OverlapAboveLeft) { + VerifyRangeDels({{"a", "c", 10}, {"b", "d", 5}}, + {{" ", 0}, {"a", 10}, {"c", 5}, {"d", 0}}); +} + +TEST_F(RangeDelAggregatorTest, OverlapAboveRight) { + VerifyRangeDels({{"a", "c", 5}, {"b", "d", 10}}, + {{" ", 0}, {"a", 5}, {"b", 10}, {"d", 0}}); +} + +TEST_F(RangeDelAggregatorTest, OverlapAboveMiddle) { + VerifyRangeDels({{"a", "d", 5}, {"b", "c", 10}}, + {{" ", 0}, {"a", 5}, {"b", 10}, {"c", 5}, {"d", 0}}); +} + +TEST_F(RangeDelAggregatorTest, OverlapFully) { + VerifyRangeDels({{"a", "d", 10}, {"b", "c", 5}}, + {{" ", 0}, {"a", 10}, {"d", 0}}); +} + +TEST_F(RangeDelAggregatorTest, OverlapPoint) { + VerifyRangeDels({{"a", "b", 5}, {"b", "c", 10}}, + {{" ", 0}, {"a", 5}, {"b", 10}, {"c", 0}}); +} + +TEST_F(RangeDelAggregatorTest, SameStartKey) { + VerifyRangeDels({{"a", "c", 5}, {"a", "b", 10}}, + {{" ", 0}, {"a", 10}, {"b", 5}, {"c", 0}}); +} + +TEST_F(RangeDelAggregatorTest, SameEndKey) { + VerifyRangeDels({{"a", "d", 5}, {"b", "d", 10}}, + {{" ", 0}, {"a", 5}, {"b", 10}, {"d", 0}}); +} + +TEST_F(RangeDelAggregatorTest, GapsBetweenRanges) { + VerifyRangeDels( + {{"a", "b", 5}, {"c", "d", 10}, {"e", "f", 15}}, + {{" ", 0}, {"a", 5}, {"b", 0}, {"c", 10}, {"d", 0}, {"e", 15}, {"f", 0}}); +} + +// Note the Cover* tests also test cases where tombstones are inserted under a +// larger one when VerifyRangeDels() runs them in reverse +TEST_F(RangeDelAggregatorTest, CoverMultipleFromLeft) { + VerifyRangeDels( + {{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"a", "f", 20}}, + {{" ", 0}, {"a", 20}, {"f", 15}, {"g", 0}}); +} + +TEST_F(RangeDelAggregatorTest, CoverMultipleFromRight) { + VerifyRangeDels( + {{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"c", "h", 20}}, + {{" ", 0}, {"b", 5}, {"c", 20}, {"h", 0}}); +} + +TEST_F(RangeDelAggregatorTest, CoverMultipleFully) { + VerifyRangeDels( + {{"b", "d", 5}, {"c", "f", 10}, {"e", "g", 15}, {"a", "h", 20}}, + {{" ", 0}, {"a", 20}, {"h", 0}}); +} + +TEST_F(RangeDelAggregatorTest, AlternateMultipleAboveBelow) { + VerifyRangeDels( + {{"b", "d", 15}, {"c", "f", 10}, {"e", "g", 20}, {"a", "h", 5}}, + {{" ", 0}, + {"a", 5}, + {"b", 15}, + {"d", 10}, + {"e", 20}, + {"g", 5}, + {"h", 0}}); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}