Allow compaction iterator to perform garbage collection (#7556)

Summary:
Add a threshold timestamp, full_history_ts_low_ of type `std::string*` to
`CompactionIterator`, so that RocksDB can also perform garbage collection during
compaction.
* If full_history_ts_low_ is nullptr, then compaction iterator does not perform
  GC, preserving all timestamp history for all keys. Compaction iterator will
treat user key with different timestamps as different user keys.
* If full_history_ts_low_ is not nullptr, then compaction iterator performs
  GC. GC will look at keys older than `*full_history_ts_low_` and determine their
  eligibility based on factors including snapshots.

Current rules of GC:
 * If an internal key is in the same snapshot as a previous counterpart
    with the same user key, and this key is eligible for GC, and the key is
    not single-delete or merge operand, then this key can be dropped. Note
    that the previous internal key cannot be a merge operand either.
 * If a tombstone is the most recent one in the earliest snapshot and it
    is eligible for GC, and keyNotExistsBeyondLevel() is true, then this
    tombstone can be dropped.
 * If a tombstone is the most recent one in a snapshot and it is eligible
    for GC, and the compaction is at bottommost level, then all other older
    internal keys of the same user key must also be eligible for GC, thus
    can be dropped
* Single-delete, delete-range and merge are not currently supported.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7556

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D24507728

Pulled By: riversand963

fbshipit-source-id: 3c09c7301f41eed76dfcf4d1527e68cf6e0a8bb3
This commit is contained in:
Yanqin Jin 2020-10-23 22:58:05 -07:00 committed by Facebook GitHub Bot
parent 1b224324b5
commit 6595267980
8 changed files with 348 additions and 31 deletions

View File

@ -383,7 +383,13 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(
auto* f = files[level_ptrs->at(lvl)]; auto* f = files[level_ptrs->at(lvl)];
if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
// We've advanced far enough // We've advanced far enough
if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) { // In the presence of user-defined timestamp, we may need to handle
// the case in which f->smallest.user_key() (including ts) has the
// same user key, but the ts part is smaller. If so,
// Compare(user_key, f->smallest.user_key()) returns -1.
// That's why we need CompareWithoutTimestamp().
if (user_cmp->CompareWithoutTimestamp(user_key,
f->smallest.user_key()) >= 0) {
// Key falls in this file's range, so it may // Key falls in this file's range, so it may
// exist beyond output level // exist beyond output level
return false; return false;

View File

@ -44,7 +44,8 @@ CompactionIterator::CompactionIterator(
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, const SequenceNumber preserve_deletes_seqnum,
const std::atomic<int>* manual_compaction_paused, const std::atomic<int>* manual_compaction_paused,
const std::shared_ptr<Logger> info_log) const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low)
: CompactionIterator( : CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots, input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, env, earliest_write_conflict_snapshot, snapshot_checker, env,
@ -53,7 +54,7 @@ CompactionIterator::CompactionIterator(
std::unique_ptr<CompactionProxy>( std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr), compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, shutting_down, preserve_deletes_seqnum, compaction_filter, shutting_down, preserve_deletes_seqnum,
manual_compaction_paused, info_log) {} manual_compaction_paused, info_log, full_history_ts_low) {}
CompactionIterator::CompactionIterator( CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
@ -68,7 +69,8 @@ CompactionIterator::CompactionIterator(
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, const SequenceNumber preserve_deletes_seqnum,
const std::atomic<int>* manual_compaction_paused, const std::atomic<int>* manual_compaction_paused,
const std::shared_ptr<Logger> info_log) const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low)
: input_(input), : input_(input),
cmp_(cmp), cmp_(cmp),
merge_helper_(merge_helper), merge_helper_(merge_helper),
@ -90,7 +92,10 @@ CompactionIterator::CompactionIterator(
merge_out_iter_(merge_helper_), merge_out_iter_(merge_helper_),
current_key_committed_(false), current_key_committed_(false),
info_log_(info_log), info_log_(info_log),
allow_data_in_errors_(allow_data_in_errors) { allow_data_in_errors_(allow_data_in_errors),
timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
full_history_ts_low_(full_history_ts_low),
cmp_with_history_ts_low_(0) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr); assert(compaction_filter_ == nullptr || compaction_ != nullptr);
assert(snapshots_ != nullptr); assert(snapshots_ != nullptr);
bottommost_level_ = compaction_ == nullptr bottommost_level_ = compaction_ == nullptr
@ -117,6 +122,8 @@ CompactionIterator::CompactionIterator(
for (size_t i = 1; i < snapshots_->size(); ++i) { for (size_t i = 1; i < snapshots_->size(); ++i) {
assert(snapshots_->at(i - 1) < snapshots_->at(i)); assert(snapshots_->at(i - 1) < snapshots_->at(i));
} }
assert(timestamp_size_ == 0 || !full_history_ts_low_ ||
timestamp_size_ == full_history_ts_low_->size());
#endif #endif
input_->SetPinnedItersMgr(&pinned_iters_mgr_); input_->SetPinnedItersMgr(&pinned_iters_mgr_);
TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get()); TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
@ -298,7 +305,8 @@ void CompactionIterator::NextFromInput() {
TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
// Update input statistics // Update input statistics
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
ikey_.type == kTypeDeletionWithTimestamp) {
iter_stats_.num_input_deletion_records++; iter_stats_.num_input_deletion_records++;
} }
iter_stats_.total_input_raw_key_bytes += key_.size(); iter_stats_.total_input_raw_key_bytes += key_.size();
@ -319,11 +327,33 @@ void CompactionIterator::NextFromInput() {
// First occurrence of this user key // First occurrence of this user key
// Copy key for output // Copy key for output
key_ = current_key_.SetInternalKey(key_, &ikey_); key_ = current_key_.SetInternalKey(key_, &ikey_);
UpdateTimestampAndCompareWithFullHistoryLow();
// If
// (1) !has_current_user_key_, OR
// (2) timestamp is disabled, OR
// (3) all history will be preserved, OR
// (4) user key (excluding timestamp) is different from previous key, OR
// (5) timestamp is NO older than *full_history_ts_low_
// then current_user_key_ must be treated as a different user key.
// This means, if a user key (excluding ts) is the same as the previous
// user key, and its ts is older than *full_history_ts_low_, then we
// consider this key for GC, e.g. it may be dropped if certain conditions
// match.
if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
0 != cmp_->CompareWithoutTimestamp(ikey_.user_key,
current_user_key_) ||
cmp_with_history_ts_low_ >= 0) {
// Initialize for future comparison for rule (A) and etc.
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
has_current_user_key_ = true;
}
current_user_key_ = ikey_.user_key; current_user_key_ = ikey_.user_key;
has_current_user_key_ = true;
has_outputted_key_ = false; has_outputted_key_ = false;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
current_key_committed_ = KeyCommitted(ikey_.sequence); current_key_committed_ = KeyCommitted(ikey_.sequence);
// Apply the compaction filter to the first committed version of the user // Apply the compaction filter to the first committed version of the user
@ -543,9 +573,12 @@ void CompactionIterator::NextFromInput() {
last_sequence, current_user_key_sequence_); last_sequence, current_user_key_sequence_);
} }
++iter_stats_.num_record_drop_hidden; // (A) ++iter_stats_.num_record_drop_hidden; // rule (A)
input_->Next(); input_->Next();
} else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && } else if (compaction_ != nullptr &&
(ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeDeletionWithTimestamp &&
cmp_with_history_ts_low_ < 0)) &&
IN_EARLIEST_SNAPSHOT(ikey_.sequence) && IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
ikeyNotNeededForIncrementalSnapshot() && ikeyNotNeededForIncrementalSnapshot() &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
@ -569,13 +602,19 @@ void CompactionIterator::NextFromInput() {
// given that: // given that:
// (1) The deletion is earlier than earliest_write_conflict_snapshot, and // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
// (2) No value exist earlier than the deletion. // (2) No value exist earlier than the deletion.
//
// Note also that a deletion marker of type kTypeDeletionWithTimestamp
// will be treated as a different user key unless the timestamp is older
// than *full_history_ts_low_.
++iter_stats_.num_record_drop_obsolete; ++iter_stats_.num_record_drop_obsolete;
if (!bottommost_level_) { if (!bottommost_level_) {
++iter_stats_.num_optimized_del_drop_obsolete; ++iter_stats_.num_optimized_del_drop_obsolete;
} }
input_->Next(); input_->Next();
} else if ((ikey_.type == kTypeDeletion) && bottommost_level_ && } else if ((ikey_.type == kTypeDeletion ||
ikeyNotNeededForIncrementalSnapshot()) { (ikey_.type == kTypeDeletionWithTimestamp &&
cmp_with_history_ts_low_ < 0)) &&
bottommost_level_ && ikeyNotNeededForIncrementalSnapshot()) {
// Handle the case where we have a delete key at the bottom most level // Handle the case where we have a delete key at the bottom most level
// We can skip outputting the key iff there are no subsequent puts for this // We can skip outputting the key iff there are no subsequent puts for this
// key // key
@ -583,12 +622,17 @@ void CompactionIterator::NextFromInput() {
ikey_.user_key, &level_ptrs_)); ikey_.user_key, &level_ptrs_));
ParsedInternalKey next_ikey; ParsedInternalKey next_ikey;
input_->Next(); input_->Next();
// Skip over all versions of this key that happen to occur in the same snapshot // Skip over all versions of this key that happen to occur in the same
// range as the delete // snapshot range as the delete.
//
// Note that a deletion marker of type kTypeDeletionWithTimestamp will be
// considered to have a different user key unless the timestamp is older
// than *full_history_ts_low_.
while (!IsPausingManualCompaction() && !IsShuttingDown() && while (!IsPausingManualCompaction() && !IsShuttingDown() &&
input_->Valid() && input_->Valid() &&
(ParseInternalKey(input_->key(), &next_ikey) == Status::OK()) && (ParseInternalKey(input_->key(), &next_ikey) == Status::OK()) &&
cmp_->Equal(ikey_.user_key, next_ikey.user_key) && 0 == cmp_->CompareWithoutTimestamp(ikey_.user_key,
next_ikey.user_key) &&
(prev_snapshot == 0 || (prev_snapshot == 0 ||
DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) { DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) {
input_->Next(); input_->Next();
@ -597,7 +641,8 @@ void CompactionIterator::NextFromInput() {
// delete too // delete too
if (input_->Valid() && if (input_->Valid() &&
(ParseInternalKey(input_->key(), &next_ikey) == Status::OK()) && (ParseInternalKey(input_->key(), &next_ikey) == Status::OK()) &&
cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { 0 == cmp_->CompareWithoutTimestamp(ikey_.user_key,
next_ikey.user_key)) {
valid_ = true; valid_ = true;
at_next_ = true; at_next_ = true;
} }
@ -735,7 +780,18 @@ void CompactionIterator::PrepareOutput() {
ikey_.type); ikey_.type);
} }
ikey_.sequence = 0; ikey_.sequence = 0;
current_key_.UpdateInternalKey(0, ikey_.type); if (!timestamp_size_) {
current_key_.UpdateInternalKey(0, ikey_.type);
} else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
// We can also zero out timestamp for better compression.
// For the same user key (excluding timestamp), the timestamp-based
// history can be collapsed to save some space if the timestamp is
// older than *full_history_ts_low_.
const std::string kTsMin(timestamp_size_, static_cast<char>(0));
const Slice ts_slice = kTsMin;
ikey_.SetTimestamp(ts_slice);
current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice);
}
} }
} }
} }

View File

@ -75,7 +75,8 @@ class CompactionIterator {
const std::atomic<bool>* shutting_down = nullptr, const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0, const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<int>* manual_compaction_paused = nullptr, const std::atomic<int>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr); const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr);
// Constructor with custom CompactionProxy, used for tests. // Constructor with custom CompactionProxy, used for tests.
CompactionIterator(InternalIterator* input, const Comparator* cmp, CompactionIterator(InternalIterator* input, const Comparator* cmp,
@ -92,7 +93,8 @@ class CompactionIterator {
const std::atomic<bool>* shutting_down = nullptr, const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0, const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<int>* manual_compaction_paused = nullptr, const std::atomic<int>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr); const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr);
~CompactionIterator(); ~CompactionIterator();
@ -152,6 +154,18 @@ class CompactionIterator {
bool IsInEarliestSnapshot(SequenceNumber sequence); bool IsInEarliestSnapshot(SequenceNumber sequence);
// Extract user-defined timestamp from user key if possible and compare it
// with *full_history_ts_low_ if applicable.
inline void UpdateTimestampAndCompareWithFullHistoryLow() {
if (full_history_ts_low_) {
assert(timestamp_size_ > 0);
current_ts_ =
ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
cmp_with_history_ts_low_ =
cmp_->CompareTimestamp(current_ts_, *full_history_ts_low_);
}
}
InternalIterator* input_; InternalIterator* input_;
const Comparator* cmp_; const Comparator* cmp_;
MergeHelper* merge_helper_; MergeHelper* merge_helper_;
@ -199,11 +213,13 @@ class CompactionIterator {
// Stores whether ikey_.user_key is valid. If set to false, the user key is // Stores whether ikey_.user_key is valid. If set to false, the user key is
// not compared against the current key in the underlying iterator. // not compared against the current key in the underlying iterator.
bool has_current_user_key_ = false; bool has_current_user_key_ = false;
bool at_next_ = false; // If false, the iterator // If false, the iterator holds a copy of the current compaction iterator
// Holds a copy of the current compaction iterator output (or current key in // output (or current key in the underlying iterator during NextFromInput()).
// the underlying iterator during NextFromInput()). bool at_next_ = false;
IterKey current_key_; IterKey current_key_;
Slice current_user_key_; Slice current_user_key_;
Slice current_ts_;
SequenceNumber current_user_key_sequence_; SequenceNumber current_user_key_sequence_;
SequenceNumber current_user_key_snapshot_; SequenceNumber current_user_key_snapshot_;
@ -237,6 +253,19 @@ class CompactionIterator {
bool allow_data_in_errors_; bool allow_data_in_errors_;
// Comes from comparator.
const size_t timestamp_size_;
// Lower bound timestamp to retain full history in terms of user-defined
// timestamp. If a key's timestamp is older than full_history_ts_low_, then
// the key *may* be eligible for garbage collection (GC). The skipping logic
// is in `NextFromInput()` and `PrepareOutput()`.
// If nullptr, NO GC will be performed and all history will be preserved.
const std::string* const full_history_ts_low_;
// Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_)
int cmp_with_history_ts_low_;
bool IsShuttingDown() { bool IsShuttingDown() {
// This is a best-effort facility, so memory_order_relaxed is sufficient. // This is a best-effort facility, so memory_order_relaxed is sufficient.
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);

View File

@ -216,6 +216,9 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
CompactionIteratorTest() CompactionIteratorTest()
: cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {} : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {}
explicit CompactionIteratorTest(const Comparator* ucmp)
: cmp_(ucmp), icmp_(cmp_), snapshots_({}) {}
void InitIterators( void InitIterators(
const std::vector<std::string>& ks, const std::vector<std::string>& vs, const std::vector<std::string>& ks, const std::vector<std::string>& vs,
const std::vector<std::string>& range_del_ks, const std::vector<std::string>& range_del_ks,
@ -224,7 +227,9 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
SequenceNumber last_committed_sequence = kMaxSequenceNumber, SequenceNumber last_committed_sequence = kMaxSequenceNumber,
MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr, MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr,
bool bottommost_level = false, bool bottommost_level = false,
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) { SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
bool key_not_exists_beyond_output_level = false,
const std::string* full_history_ts_low = nullptr) {
std::unique_ptr<InternalIterator> unfragmented_range_del_iter( std::unique_ptr<InternalIterator> unfragmented_range_del_iter(
new test::VectorIterator(range_del_ks, range_del_vs)); new test::VectorIterator(range_del_ks, range_del_vs));
auto tombstone_list = std::make_shared<FragmentedRangeTombstoneList>( auto tombstone_list = std::make_shared<FragmentedRangeTombstoneList>(
@ -236,10 +241,12 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
range_del_agg_->AddTombstones(std::move(range_del_iter)); range_del_agg_->AddTombstones(std::move(range_del_iter));
std::unique_ptr<CompactionIterator::CompactionProxy> compaction; std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
if (filter || bottommost_level) { if (filter || bottommost_level || key_not_exists_beyond_output_level) {
compaction_proxy_ = new FakeCompaction(); compaction_proxy_ = new FakeCompaction();
compaction_proxy_->is_bottommost_level = bottommost_level; compaction_proxy_->is_bottommost_level = bottommost_level;
compaction_proxy_->is_allow_ingest_behind = AllowIngestBehind(); compaction_proxy_->is_allow_ingest_behind = AllowIngestBehind();
compaction_proxy_->key_not_exists_beyond_output_level =
key_not_exists_beyond_output_level;
compaction.reset(compaction_proxy_); compaction.reset(compaction_proxy_);
} }
bool use_snapshot_checker = UseSnapshotChecker() || GetParam(); bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
@ -252,6 +259,11 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
0 /*latest_snapshot*/, snapshot_checker_.get(), 0 /*latest_snapshot*/, snapshot_checker_.get(),
0 /*level*/, nullptr /*statistics*/, &shutting_down_)); 0 /*level*/, nullptr /*statistics*/, &shutting_down_));
if (c_iter_) {
// Since iter_ is still used in ~CompactionIterator(), we call
// ~CompactionIterator() first.
c_iter_.reset();
}
iter_.reset(new LoggingForwardVectorIterator(ks, vs)); iter_.reset(new LoggingForwardVectorIterator(ks, vs));
iter_->SeekToFirst(); iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator( c_iter_.reset(new CompactionIterator(
@ -260,7 +272,9 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
Env::Default(), false /* report_detailed_time */, false, Env::Default(), false /* report_detailed_time */, false,
range_del_agg_.get(), nullptr /* blob_file_builder */, range_del_agg_.get(), nullptr /* blob_file_builder */,
false /*allow_data_in_errors*/, std::move(compaction), filter, false /*allow_data_in_errors*/, std::move(compaction), filter,
&shutting_down_)); &shutting_down_, /*preserve_deletes_seqnum=*/0,
/*manual_compaction_paused=*/nullptr, /*info_log=*/nullptr,
full_history_ts_low));
} }
void AddSnapshot(SequenceNumber snapshot, void AddSnapshot(SequenceNumber snapshot,
@ -282,10 +296,13 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
MergeOperator* merge_operator = nullptr, MergeOperator* merge_operator = nullptr,
CompactionFilter* compaction_filter = nullptr, CompactionFilter* compaction_filter = nullptr,
bool bottommost_level = false, bool bottommost_level = false,
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) { SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
bool key_not_exists_beyond_output_level = false,
const std::string* full_history_ts_low = nullptr) {
InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber, InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber,
last_committed_seq, merge_operator, compaction_filter, last_committed_seq, merge_operator, compaction_filter,
bottommost_level, earliest_write_conflict_snapshot); bottommost_level, earliest_write_conflict_snapshot,
key_not_exists_beyond_output_level, full_history_ts_low);
c_iter_->SeekToFirst(); c_iter_->SeekToFirst();
for (size_t i = 0; i < expected_keys.size(); i++) { for (size_t i = 0; i < expected_keys.size(); i++) {
std::string info = "i = " + ToString(i); std::string info = "i = " + ToString(i);
@ -299,6 +316,11 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
ASSERT_FALSE(c_iter_->Valid()); ASSERT_FALSE(c_iter_->Valid());
} }
void ClearSnapshots() {
snapshots_.clear();
snapshot_map_.clear();
}
const Comparator* cmp_; const Comparator* cmp_;
const InternalKeyComparator icmp_; const InternalKeyComparator icmp_;
std::vector<SequenceNumber> snapshots_; std::vector<SequenceNumber> snapshots_;
@ -1033,6 +1055,185 @@ INSTANTIATE_TEST_CASE_P(CompactionIteratorWithAllowIngestBehindTestInstance,
CompactionIteratorWithAllowIngestBehindTest, CompactionIteratorWithAllowIngestBehindTest,
testing::Values(true, false)); testing::Values(true, false));
class CompactionIteratorTsGcTest : public CompactionIteratorTest {
public:
CompactionIteratorTsGcTest()
: CompactionIteratorTest(test::ComparatorWithU64Ts()) {}
};
TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeValue),
test::KeyStr(/*ts=*/102, user_key, /*seq=*/3,
kTypeDeletionWithTimestamp)};
const std::vector<std::string> input_values = {"a3", ""};
std::string full_history_ts_low;
// All keys' timestamps are newer than or equal to 102, thus none of them
// will be eligible for GC.
PutFixed64(&full_history_ts_low, 102);
const std::vector<std::string>& expected_keys = input_keys;
const std::vector<std::string>& expected_values = input_values;
const std::vector<std::pair<bool, bool>> params = {
{false, false}, {false, true}, {true, true}};
for (const std::pair<bool, bool>& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
bottommost_level,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
key_not_exists_beyond_output_level, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "a1"};
std::string full_history_ts_low;
// All keys' timestamps are older than 104.
PutFixed64(&full_history_ts_low, 104);
{
// With a snapshot at seq 3, both the deletion marker and the key at 3 must
// be preserved.
AddSnapshot(3);
const std::vector<std::string> expected_keys = {input_keys[0],
input_keys[1]};
const std::vector<std::string> expected_values = {"", "a2"};
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
/*bottommost_level=*/false,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
/*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
ClearSnapshots();
}
{
// No snapshot, the deletion marker should be preserved because the user
// key may appear beyond output level.
const std::vector<std::string> expected_keys = {input_keys[0]};
const std::vector<std::string> expected_values = {""};
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
/*bottommost_level=*/false,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
/*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
}
{
// No snapshot, the deletion marker can be dropped because the user key
// does not appear in higher levels.
const std::vector<std::string> expected_keys = {};
const std::vector<std::string> expected_values = {};
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
/*bottommost_level=*/false,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
/*key_not_exists_beyond_output_level=*/true, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeValue),
test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "a1", "a0"};
{
std::string full_history_ts_low;
// Keys whose timestamps larger than or equal to 102 will be preserved.
PutFixed64(&full_history_ts_low, 102);
const std::vector<std::string> expected_keys = {input_keys[0],
input_keys[1]};
const std::vector<std::string> expected_values = {"", "a2"};
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
/*bottommost_level=*/false,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
/*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, DropTombstones) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeDeletionWithTimestamp),
test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "", "a0"};
const std::vector<std::string> expected_keys = {input_keys[0], input_keys[1]};
const std::vector<std::string> expected_values = {"", "a2"};
// Take a snapshot at seq 2.
AddSnapshot(2);
{
// Non-bottommost level, but key does not exist beyond output level.
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_sequence=*/kMaxSequenceNumber,
/*merge_op=*/nullptr, /*compaction_filter=*/nullptr,
/*bottommost_level=*/false,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
/*key_not_exists_beyond_output_level=*/true, &full_history_ts_low);
}
{
// Bottommost level
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
/*bottommost_level=*/true,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
/*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, RewriteTs) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeDeletionWithTimestamp),
test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "", "a0"};
const std::vector<std::string> expected_keys = {
input_keys[0], input_keys[1], input_keys[2],
test::KeyStr(/*ts=*/0, user_key, /*seq=*/0, kTypeValue)};
const std::vector<std::string> expected_values = {"", "a2", "", "a0"};
AddSnapshot(1);
AddSnapshot(2);
{
// Bottommost level and need to rewrite both ts and seq.
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
/*bottommost_level=*/true,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
/*key_not_exists_beyond_output_level=*/true, &full_history_ts_low);
}
}
INSTANTIATE_TEST_CASE_P(CompactionIteratorTsGcTestInstance,
CompactionIteratorTsGcTest,
testing::Values(true, false));
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -436,11 +436,11 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
&last_key, &last_key,
ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion)); ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
} else { } else {
std::string min_ts(timestamp_size_, static_cast<char>(0)); const std::string kTsMin(timestamp_size_, static_cast<char>(0));
AppendInternalKeyWithDifferentTimestamp( AppendInternalKeyWithDifferentTimestamp(
&last_key, &last_key,
ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion), ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
min_ts); kTsMin);
} }
// Don't set skipping_saved_key = false because we may still see more // Don't set skipping_saved_key = false because we may still see more
// user-keys equal to saved_key_. // user-keys equal to saved_key_.

View File

@ -119,6 +119,12 @@ struct ParsedInternalKey {
sequence = 0; sequence = 0;
type = kTypeDeletion; type = kTypeDeletion;
} }
void SetTimestamp(const Slice& ts) {
assert(ts.size() <= user_key.size());
const char* addr = user_key.data() - ts.size();
memcpy(const_cast<char*>(addr), ts.data(), ts.size());
}
}; };
// Return the length of the encoding of "key". // Return the length of the encoding of "key".
@ -475,9 +481,14 @@ class IterKey {
// Update the sequence number in the internal key. Guarantees not to // Update the sequence number in the internal key. Guarantees not to
// invalidate slices to the key (and the user key). // invalidate slices to the key (and the user key).
void UpdateInternalKey(uint64_t seq, ValueType t) { void UpdateInternalKey(uint64_t seq, ValueType t, const Slice* ts = nullptr) {
assert(!IsKeyPinned()); assert(!IsKeyPinned());
assert(key_size_ >= kNumInternalBytes); assert(key_size_ >= kNumInternalBytes);
if (ts) {
assert(key_size_ >= kNumInternalBytes + ts->size());
memcpy(&buf_[key_size_ - kNumInternalBytes - ts->size()], ts->data(),
ts->size());
}
uint64_t newval = (seq << 8) | t; uint64_t newval = (seq << 8) | t;
EncodeFixed64(&buf_[key_size_ - kNumInternalBytes], newval); EncodeFixed64(&buf_[key_size_ - kNumInternalBytes], newval);
} }

View File

@ -205,6 +205,16 @@ std::string KeyStr(const std::string& user_key, const SequenceNumber& seq,
return k.Encode().ToString(); return k.Encode().ToString();
} }
std::string KeyStr(uint64_t ts, const std::string& user_key,
const SequenceNumber& seq, const ValueType& t,
bool corrupt) {
std::string user_key_with_ts(user_key);
std::string ts_str;
PutFixed64(&ts_str, ts);
user_key_with_ts.append(ts_str);
return KeyStr(user_key_with_ts, seq, t, corrupt);
}
std::string RandomName(Random* rnd, const size_t len) { std::string RandomName(Random* rnd, const size_t len) {
std::stringstream ss; std::stringstream ss;
for (size_t i = 0; i < len; ++i) { for (size_t i = 0; i < len; ++i) {

View File

@ -394,6 +394,10 @@ extern std::string KeyStr(const std::string& user_key,
const SequenceNumber& seq, const ValueType& t, const SequenceNumber& seq, const ValueType& t,
bool corrupt = false); bool corrupt = false);
extern std::string KeyStr(uint64_t ts, const std::string& user_key,
const SequenceNumber& seq, const ValueType& t,
bool corrupt = false);
class SleepingBackgroundTask { class SleepingBackgroundTask {
public: public:
SleepingBackgroundTask() SleepingBackgroundTask()