diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 7f15a186c..10e3c7113 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -9,6 +9,23 @@ #include "port/likely.h" #include "rocksdb/listener.h" #include "table/internal_iterator.h" +#include "util/sync_point.h" + +#define DEFINITELY_IN_SNAPSHOT(seq, snapshot) \ + ((seq) <= (snapshot) && \ + (snapshot_checker_ == nullptr || \ + LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \ + SnapshotCheckerResult::kInSnapshot))) + +#define DEFINITELY_NOT_IN_SNAPSHOT(seq, snapshot) \ + ((seq) > (snapshot) || \ + (snapshot_checker_ != nullptr && \ + UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \ + SnapshotCheckerResult::kNotInSnapshot))) + +#define IN_EARLIEST_SNAPSHOT(seq) \ + ((seq) <= earliest_snapshot_ && \ + (snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq)))) namespace rocksdb { @@ -61,19 +78,21 @@ CompactionIterator::CompactionIterator( merge_out_iter_(merge_helper_), current_key_committed_(false) { assert(compaction_filter_ == nullptr || compaction_ != nullptr); + assert(snapshots_ != nullptr); bottommost_level_ = compaction_ == nullptr ? false : compaction_->bottommost_level(); if (compaction_ != nullptr) { level_ptrs_ = std::vector(compaction_->number_levels(), 0); } - if (snapshots_->size() == 0) { // optimize for fast path if there are no snapshots visible_at_tip_ = true; + earliest_snapshot_iter_ = snapshots_->end(); earliest_snapshot_ = kMaxSequenceNumber; latest_snapshot_ = 0; } else { visible_at_tip_ = false; + earliest_snapshot_iter_ = snapshots_->begin(); earliest_snapshot_ = snapshots_->at(0); latest_snapshot_ = snapshots_->back(); } @@ -163,10 +182,7 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, if (compaction_filter_ != nullptr && (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex) && (visible_at_tip_ || ignore_snapshots_ || - ikey_.sequence > latest_snapshot_ || - (snapshot_checker_ != nullptr && - UNLIKELY(!snapshot_checker_->IsInSnapshot(ikey_.sequence, - latest_snapshot_))))) { + DEFINITELY_NOT_IN_SNAPSHOT(ikey_.sequence, latest_snapshot_))) { // If the user has specified a compaction filter and the sequence // number is greater than any external snapshot, then invoke the // filter. If the return value of the compaction filter is true, @@ -270,9 +286,7 @@ void CompactionIterator::NextFromInput() { has_outputted_key_ = false; current_user_key_sequence_ = kMaxSequenceNumber; current_user_key_snapshot_ = 0; - current_key_committed_ = - (snapshot_checker_ == nullptr || - snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber)); + current_key_committed_ = KeyCommitted(ikey_.sequence); // Apply the compaction filter to the first committed version of the user // key. @@ -294,8 +308,7 @@ void CompactionIterator::NextFromInput() { // to query snapshot_checker_ in that case. if (UNLIKELY(!current_key_committed_)) { assert(snapshot_checker_ != nullptr); - current_key_committed_ = - snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber); + current_key_committed_ = KeyCommitted(ikey_.sequence); // Apply the compaction filter to the first committed version of the // user key. if (current_key_committed_) { @@ -379,10 +392,8 @@ void CompactionIterator::NextFromInput() { cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { // Check whether the next key belongs to the same snapshot as the // SingleDelete. - if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot || - (snapshot_checker_ != nullptr && - UNLIKELY(!snapshot_checker_->IsInSnapshot(next_ikey.sequence, - prev_snapshot)))) { + if (prev_snapshot == 0 || + DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot)) { if (next_ikey.type == kTypeSingleDeletion) { // We encountered two SingleDeletes in a row. This could be due to // unexpected user input. @@ -394,11 +405,8 @@ void CompactionIterator::NextFromInput() { ++iter_stats_.num_record_drop_obsolete; ++iter_stats_.num_single_del_mismatch; } else if (has_outputted_key_ || - (ikey_.sequence <= earliest_write_conflict_snapshot_ && - (snapshot_checker_ == nullptr || - LIKELY(snapshot_checker_->IsInSnapshot( - ikey_.sequence, - earliest_write_conflict_snapshot_))))) { + DEFINITELY_IN_SNAPSHOT( + ikey_.sequence, earliest_write_conflict_snapshot_)) { // Found a matching value, we can drop the single delete and the // value. It is safe to drop both records since we've already // outputted a key in this snapshot, or there is no earlier @@ -446,10 +454,7 @@ void CompactionIterator::NextFromInput() { // iteration. If the next key is corrupt, we return before the // comparison, so the value of has_current_user_key does not matter. has_current_user_key_ = false; - if (compaction_ != nullptr && ikey_.sequence <= earliest_snapshot_ && - (snapshot_checker_ == nullptr || - LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence, - earliest_snapshot_))) && + if (compaction_ != nullptr && IN_EARLIEST_SNAPSHOT(ikey_.sequence) && compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, &level_ptrs_)) { // Key doesn't exist outside of this range. @@ -482,10 +487,7 @@ void CompactionIterator::NextFromInput() { ++iter_stats_.num_record_drop_hidden; // (A) input_->Next(); } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && - ikey_.sequence <= earliest_snapshot_ && - (snapshot_checker_ == nullptr || - LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence, - earliest_snapshot_))) && + IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikeyNotNeededForIncrementalSnapshot() && compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, &level_ptrs_)) { @@ -522,13 +524,10 @@ void CompactionIterator::NextFromInput() { input_->Next(); // Skip over all versions of this key that happen to occur in the same snapshot // range as the delete - while (input_->Valid() && - ParseInternalKey(input_->key(), &next_ikey) && + while (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && cmp_->Equal(ikey_.user_key, next_ikey.user_key) && - (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot || - (snapshot_checker_ != nullptr && - UNLIKELY(!snapshot_checker_->IsInSnapshot(next_ikey.sequence, - prev_snapshot))))) { + (prev_snapshot == 0 || + DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) { input_->Next(); } // If you find you still need to output a row with this key, we need to output the @@ -619,13 +618,9 @@ void CompactionIterator::PrepareOutput() { // // Can we do the same for levels above bottom level as long as // KeyNotExistsBeyondOutputLevel() return true? - if ((compaction_ != nullptr && - !compaction_->allow_ingest_behind()) && - ikeyNotNeededForIncrementalSnapshot() && - bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ && - (snapshot_checker_ == nullptr || LIKELY(snapshot_checker_->IsInSnapshot( - ikey_.sequence, earliest_snapshot_))) && - ikey_.type != kTypeMerge && + if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) && + ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && valid_ && + IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge && !cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) { assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); ikey_.sequence = 0; @@ -648,7 +643,8 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( auto cur = *snapshots_iter; assert(in <= cur); if (snapshot_checker_ == nullptr || - snapshot_checker_->IsInSnapshot(in, cur)) { + snapshot_checker_->CheckInSnapshot(in, cur) == + SnapshotCheckerResult::kInSnapshot) { return cur; } *prev_snapshot = cur; @@ -663,4 +659,25 @@ inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() { (ikey_.sequence < preserve_deletes_seqnum_); } +bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) { + assert(snapshot_checker_ != nullptr); + assert(earliest_snapshot_ == kMaxSequenceNumber || + (earliest_snapshot_iter_ != snapshots_->end() && + *earliest_snapshot_iter_ == earliest_snapshot_)); + auto in_snapshot = + snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_); + while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) { + earliest_snapshot_iter_++; + if (earliest_snapshot_iter_ == snapshots_->end()) { + earliest_snapshot_ = kMaxSequenceNumber; + } else { + earliest_snapshot_ = *earliest_snapshot_iter_; + } + in_snapshot = + snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_); + } + assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased); + return in_snapshot == SnapshotCheckerResult::kInSnapshot; +} + } // namespace rocksdb diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 6fbd3d0ef..1e39564df 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -132,10 +132,19 @@ class CompactionIterator { // or seqnum be zero-ed out even if all other conditions for it are met. inline bool ikeyNotNeededForIncrementalSnapshot(); + inline bool KeyCommitted(SequenceNumber sequence) { + return snapshot_checker_ == nullptr || + snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) == + SnapshotCheckerResult::kInSnapshot; + } + + bool IsInEarliestSnapshot(SequenceNumber sequence); + InternalIterator* input_; const Comparator* cmp_; MergeHelper* merge_helper_; const std::vector* snapshots_; + std::vector::const_iterator earliest_snapshot_iter_; const SequenceNumber earliest_write_conflict_snapshot_; const SnapshotChecker* const snapshot_checker_; Env* env_; @@ -151,6 +160,7 @@ class CompactionIterator { bool visible_at_tip_; SequenceNumber earliest_snapshot_; SequenceNumber latest_snapshot_; + bool ignore_snapshots_; // State diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 07a9e6ef8..280b748b9 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -190,13 +190,17 @@ class TestSnapshotChecker : public SnapshotChecker { : last_committed_sequence_(last_committed_sequence), snapshots_(snapshots) {} - bool IsInSnapshot(SequenceNumber seq, - SequenceNumber snapshot_seq) const override { + SnapshotCheckerResult CheckInSnapshot( + SequenceNumber seq, SequenceNumber snapshot_seq) const override { if (snapshot_seq == kMaxSequenceNumber) { - return seq <= last_committed_sequence_; + return seq <= last_committed_sequence_ + ? SnapshotCheckerResult::kInSnapshot + : SnapshotCheckerResult::kNotInSnapshot; } assert(snapshots_.count(snapshot_seq) > 0); - return seq <= snapshots_.at(snapshot_seq); + return seq <= snapshots_.at(snapshot_seq) + ? SnapshotCheckerResult::kInSnapshot + : SnapshotCheckerResult::kNotInSnapshot; } private: diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 2c9634755..d81febb79 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -21,7 +21,8 @@ class TestReadCallback : public ReadCallback { : snapshot_checker_(snapshot_checker), snapshot_seq_(snapshot_seq) {} bool IsVisible(SequenceNumber seq) override { - return snapshot_checker_->IsInSnapshot(seq, snapshot_seq_); + return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) == + SnapshotCheckerResult::kInSnapshot; } private: @@ -547,8 +548,15 @@ TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) { DestroyAndReopen(options); class TestSnapshotChecker : public SnapshotChecker { - bool IsInSnapshot(SequenceNumber seq, - SequenceNumber snapshot_seq) const override { + public: + SnapshotCheckerResult CheckInSnapshot( + SequenceNumber seq, SequenceNumber snapshot_seq) const override { + return IsInSnapshot(seq, snapshot_seq) + ? SnapshotCheckerResult::kInSnapshot + : SnapshotCheckerResult::kNotInSnapshot; + } + + bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const { switch (snapshot_seq) { case 0: return seq == 0; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index f33dafd8e..2dea15ff9 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -166,9 +166,11 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, break; } else if (stop_before > 0 && ikey.sequence <= stop_before && LIKELY(snapshot_checker_ == nullptr || - snapshot_checker_->IsInSnapshot(ikey.sequence, - stop_before))) { - // hit an entry that's visible by the previous snapshot, can't touch that + snapshot_checker_->CheckInSnapshot(ikey.sequence, + stop_before) != + SnapshotCheckerResult::kNotInSnapshot)) { + // hit an entry that's possibly visible by the previous snapshot, can't + // touch that break; } diff --git a/db/snapshot_checker.h b/db/snapshot_checker.h index 67d17bd6d..4d29b83c4 100644 --- a/db/snapshot_checker.h +++ b/db/snapshot_checker.h @@ -8,22 +8,30 @@ namespace rocksdb { -// Callback class that control GC of duplicate keys in flush/compaction +enum class SnapshotCheckerResult : int { + kInSnapshot = 0, + kNotInSnapshot = 1, + // In case snapshot is released and the checker has no clue whether + // the given sequence is visible to the snapshot. + kSnapshotReleased = 2, +}; + +// Callback class that control GC of duplicate keys in flush/compaction. class SnapshotChecker { public: virtual ~SnapshotChecker() {} - virtual bool IsInSnapshot(SequenceNumber sequence, - SequenceNumber snapshot_sequence) const = 0; + virtual SnapshotCheckerResult CheckInSnapshot( + SequenceNumber sequence, SequenceNumber snapshot_sequence) const = 0; }; class DisableGCSnapshotChecker : public SnapshotChecker { public: virtual ~DisableGCSnapshotChecker() {} - virtual bool IsInSnapshot( + virtual SnapshotCheckerResult CheckInSnapshot( SequenceNumber /*sequence*/, SequenceNumber /*snapshot_sequence*/) const override { - // By returning false, we prevent all the values from being GCed - return false; + // By returning kNotInSnapshot, we prevent all the values from being GCed + return SnapshotCheckerResult::kNotInSnapshot; } static DisableGCSnapshotChecker* Instance() { return &instance_; } @@ -41,8 +49,8 @@ class WritePreparedSnapshotChecker : public SnapshotChecker { explicit WritePreparedSnapshotChecker(WritePreparedTxnDB* txn_db); virtual ~WritePreparedSnapshotChecker() {} - virtual bool IsInSnapshot(SequenceNumber sequence, - SequenceNumber snapshot_sequence) const override; + virtual SnapshotCheckerResult CheckInSnapshot( + SequenceNumber sequence, SequenceNumber snapshot_sequence) const override; private: #ifndef ROCKSDB_LITE diff --git a/utilities/transactions/snapshot_checker.cc b/utilities/transactions/snapshot_checker.cc index 689502908..cfe51cb1b 100644 --- a/utilities/transactions/snapshot_checker.cc +++ b/utilities/transactions/snapshot_checker.cc @@ -17,11 +17,11 @@ namespace rocksdb { WritePreparedSnapshotChecker::WritePreparedSnapshotChecker( WritePreparedTxnDB* /*txn_db*/) {} -bool WritePreparedSnapshotChecker::IsInSnapshot( +SnapshotCheckerResult WritePreparedSnapshotChecker::CheckInSnapshot( SequenceNumber /*sequence*/, SequenceNumber /*snapshot_sequence*/) const { // Should never be called in LITE mode. assert(false); - return true; + return SnapshotCheckerResult::kInSnapshot; } #else @@ -30,9 +30,17 @@ WritePreparedSnapshotChecker::WritePreparedSnapshotChecker( WritePreparedTxnDB* txn_db) : txn_db_(txn_db){}; -bool WritePreparedSnapshotChecker::IsInSnapshot( +SnapshotCheckerResult WritePreparedSnapshotChecker::CheckInSnapshot( SequenceNumber sequence, SequenceNumber snapshot_sequence) const { - return txn_db_->IsInSnapshot(sequence, snapshot_sequence); + bool snapshot_released = false; + // TODO(myabandeh): set min_uncommitted + bool in_snapshot = txn_db_->IsInSnapshot( + sequence, snapshot_sequence, 0 /*min_uncommitted*/, &snapshot_released); + if (snapshot_released) { + return SnapshotCheckerResult::kSnapshotReleased; + } + return in_snapshot ? SnapshotCheckerResult::kInSnapshot + : SnapshotCheckerResult::kNotInSnapshot; } #endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 2ddfb9758..56bc2ab0b 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -2243,6 +2243,101 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) { } } +TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 0; // minimum commit cache + DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + + ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1")); + auto* transaction = + db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); + ASSERT_OK(transaction->SetName("txn")); + ASSERT_OK(transaction->Put("key1", "value1_2")); + ASSERT_OK(transaction->Prepare()); + auto snapshot1 = db->GetSnapshot(); + // Increment sequence number. + ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); + auto snapshot2 = db->GetSnapshot(); + ASSERT_OK(transaction->Commit()); + delete transaction; + VerifyKeys({{"key1", "value1_2"}}); + VerifyKeys({{"key1", "value1_1"}}, snapshot1); + VerifyKeys({{"key1", "value1_1"}}, snapshot2); + // Add a flush to avoid compaction to fallback to trivial move. + + auto callback = [&](void*) { + // Release snapshot1 after CompactionIterator init. + // CompactionIterator need to figure out the earliest snapshot + // that can see key1:value1_2 is kMaxSequenceNumber, not + // snapshot1 or snapshot2. + db->ReleaseSnapshot(snapshot1); + // Add some keys to advance max_evicted_seq. + ASSERT_OK(db->Put(WriteOptions(), "key3", "value3")); + ASSERT_OK(db->Put(WriteOptions(), "key4", "value4")); + }; + SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", + callback); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->Flush(FlushOptions())); + VerifyKeys({{"key1", "value1_2"}}); + VerifyKeys({{"key1", "value1_1"}}, snapshot2); + db->ReleaseSnapshot(snapshot2); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 0; // minimum commit cache + DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + + ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); + auto* transaction = + db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); + ASSERT_OK(transaction->SetName("txn")); + ASSERT_OK(transaction->Delete("key1")); + ASSERT_OK(transaction->Prepare()); + SequenceNumber del_seq = db->GetLatestSequenceNumber(); + auto snapshot1 = db->GetSnapshot(); + // Increment sequence number. + ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); + auto snapshot2 = db->GetSnapshot(); + ASSERT_OK(transaction->Commit()); + delete transaction; + VerifyKeys({{"key1", "NOT_FOUND"}}); + VerifyKeys({{"key1", "value1"}}, snapshot1); + VerifyKeys({{"key1", "value1"}}, snapshot2); + ASSERT_OK(db->Flush(FlushOptions())); + + auto callback = [&](void* compaction) { + // Release snapshot1 after CompactionIterator init. + // CompactionIterator need to double check and find out snapshot2 is now + // the earliest existing snapshot. + if (compaction != nullptr) { + db->ReleaseSnapshot(snapshot1); + // Add some keys to advance max_evicted_seq. + ASSERT_OK(db->Put(WriteOptions(), "key3", "value3")); + ASSERT_OK(db->Put(WriteOptions(), "key4", "value4")); + } + }; + SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", + callback); + SyncPoint::GetInstance()->EnableProcessing(); + + // Dummy keys to avoid compaction trivially move files and get around actual + // compaction logic. + ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); + ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // Only verify for key1. Both the put and delete for the key should be kept. + // Since the delete tombstone is not visible to snapshot2, we need to keep + // at least one version of the key, for write-conflict check. + VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion}, + {"key1", "value1", 0, kTypeValue}}); + db->ReleaseSnapshot(snapshot2); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + // A more complex test to verify compaction/flush should keep keys visible // to snapshots. TEST_P(WritePreparedTransactionTest, diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 7a134a9f5..c6eca8c47 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -124,6 +124,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { "IsInSnapshot %" PRIu64 " in %" PRIu64 " min_uncommitted %" PRIu64, prep_seq, snapshot_seq, min_uncommitted); + // Caller is responsible to initialize snap_released. + assert(snap_released == nullptr || *snap_released == false); // Here we try to infer the return value without looking into prepare list. // This would help avoiding synchronization over a shared map. // TODO(myabandeh): optimize this. This sequence of checks must be correct