diff --git a/HISTORY.md b/HISTORY.md index 03a36ba7d..4de77d6f7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ * Removed a call to `RenameFile()` on a non-existent info log file ("LOG") when opening a new DB. Such a call was guaranteed to fail though did not impact applications since we swallowed the error. Now we also stopped swallowing errors in renaming "LOG" file. * Fixed an issue where `OnFlushCompleted` was not called for atomic flush. * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`. +* Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction. ### New Features * Made the EventListener extend the Customizable class. diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index c84f03d8a..3647af1d7 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -495,11 +495,11 @@ void CompactionIterator::NextFromInput() { "Unexpected key type %d for compaction output", ikey_.type); } - assert(current_user_key_snapshot_ == last_snapshot); - if (current_user_key_snapshot_ != last_snapshot) { + assert(current_user_key_snapshot_ >= last_snapshot); + if (current_user_key_snapshot_ < last_snapshot) { ROCKS_LOG_FATAL(info_log_, "current_user_key_snapshot_ (%" PRIu64 - ") != last_snapshot (%" PRIu64 ")", + ") < last_snapshot (%" PRIu64 ")", current_user_key_snapshot_, last_snapshot); } @@ -555,10 +555,20 @@ void CompactionIterator::NextFromInput() { ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) .ok() && cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { +#ifndef NDEBUG + const Compaction* c = + compaction_ ? compaction_->real_compaction() : nullptr; +#endif + TEST_SYNC_POINT_CALLBACK( + "CompactionIterator::NextFromInput:SingleDelete:1", + const_cast(c)); + // Check whether the next key belongs to the same snapshot as the // SingleDelete. if (prev_snapshot == 0 || DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) { + TEST_SYNC_POINT_CALLBACK( + "CompactionIterator::NextFromInput:SingleDelete:2", nullptr); if (next_ikey.type == kTypeSingleDeletion) { // We encountered two SingleDeletes in a row. This could be due to // unexpected user input. @@ -604,6 +614,9 @@ void CompactionIterator::NextFromInput() { // Set up the Put to be outputted in the next iteration. // (Optimization 3). clear_and_output_next_key_ = true; + TEST_SYNC_POINT_CALLBACK( + "CompactionIterator::NextFromInput:KeepSDForWW", + /*arg=*/nullptr); } } else { // We hit the next snapshot without hitting a put, so the iterator @@ -619,7 +632,8 @@ void CompactionIterator::NextFromInput() { // iteration. If the next key is corrupt, we return before the // comparison, so the value of has_current_user_key does not matter. has_current_user_key_ = false; - if (compaction_ != nullptr && InEarliestSnapshot(ikey_.sequence) && + if (compaction_ != nullptr && + DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, &level_ptrs_)) { // Key doesn't exist outside of this range. @@ -663,7 +677,7 @@ void CompactionIterator::NextFromInput() { (ikey_.type == kTypeDeletion || (ikey_.type == kTypeDeletionWithTimestamp && cmp_with_history_ts_low_ < 0)) && - InEarliestSnapshot(ikey_.sequence) && + DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && ikeyNotNeededForIncrementalSnapshot() && compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, &level_ptrs_)) { @@ -706,6 +720,13 @@ void CompactionIterator::NextFromInput() { ikey_.user_key, &level_ptrs_)); ParsedInternalKey next_ikey; AdvanceInputIter(); +#ifndef NDEBUG + const Compaction* c = + compaction_ ? compaction_->real_compaction() : nullptr; +#endif + TEST_SYNC_POINT_CALLBACK( + "CompactionIterator::NextFromInput:BottommostDelete:1", + const_cast(c)); // Skip over all versions of this key that happen to occur in the same // snapshot range as the delete. // @@ -964,7 +985,8 @@ void CompactionIterator::PrepareOutput() { if (valid_ && compaction_ != nullptr && !compaction_->allow_ingest_behind() && ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && - InEarliestSnapshot(ikey_.sequence) && ikey_.type != kTypeMerge) { + DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && + ikey_.type != kTypeMerge) { assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { ROCKS_LOG_FATAL(info_log_, @@ -1040,7 +1062,7 @@ inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() { (ikey_.sequence < preserve_deletes_seqnum_); } -bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) { +bool CompactionIterator::IsInCurrentEarliestSnapshot(SequenceNumber sequence) { assert(snapshot_checker_ != nullptr); bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber || (earliest_snapshot_iter_ != snapshots_->end() && diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 9680fdc0d..f550c8e36 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -99,6 +99,8 @@ class CompactionIterator { virtual Version* input_version() const = 0; virtual bool DoesInputReferenceBlobFiles() const = 0; + + virtual const Compaction* real_compaction() const = 0; }; class RealCompaction : public CompactionProxy { @@ -152,6 +154,8 @@ class CompactionIterator { return compaction_->DoesInputReferenceBlobFiles(); } + const Compaction* real_compaction() const override { return compaction_; } + private: const Compaction* compaction_; }; @@ -267,13 +271,13 @@ class CompactionIterator { SnapshotCheckerResult::kInSnapshot; } - bool IsInEarliestSnapshot(SequenceNumber sequence); + bool IsInCurrentEarliestSnapshot(SequenceNumber sequence); bool DefinitelyInSnapshot(SequenceNumber seq, SequenceNumber snapshot); bool DefinitelyNotInSnapshot(SequenceNumber seq, SequenceNumber snapshot); - bool InEarliestSnapshot(SequenceNumber seq); + bool InCurrentEarliestSnapshot(SequenceNumber seq); // Extract user-defined timestamp from user key if possible and compare it // with *full_history_ts_low_ if applicable. @@ -435,9 +439,10 @@ inline bool CompactionIterator::DefinitelyNotInSnapshot( SnapshotCheckerResult::kNotInSnapshot))); } -inline bool CompactionIterator::InEarliestSnapshot(SequenceNumber seq) { +inline bool CompactionIterator::InCurrentEarliestSnapshot(SequenceNumber seq) { return ((seq) <= earliest_snapshot_ && - (snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq)))); + (snapshot_checker_ == nullptr || + LIKELY(IsInCurrentEarliestSnapshot(seq)))); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 168fb45b1..e16786619 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -184,6 +184,8 @@ class FakeCompaction : public CompactionIterator::CompactionProxy { bool DoesInputReferenceBlobFiles() const override { return false; } + const Compaction* real_compaction() const override { return nullptr; } + bool key_not_exists_beyond_output_level = false; bool is_bottommost_level = false; diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 492137cb8..ecc8ea1f9 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -95,8 +95,12 @@ class TransactionTestBase : public ::testing::Test { // seems to be a bug in btrfs that the makes readdir return recently // unlink-ed files. By using the default fs we simply ignore errors resulted // from attempting to delete such files in DestroyDB. - options.env = Env::Default(); - EXPECT_OK(DestroyDB(dbname, options)); + if (getenv("KEEP_DB") == nullptr) { + options.env = Env::Default(); + EXPECT_OK(DestroyDB(dbname, options)); + } else { + fprintf(stdout, "db is still in %s\n", dbname.c_str()); + } delete env; } diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index fd30c1491..f5d77611a 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -2758,6 +2758,7 @@ TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) { ASSERT_OK(ReOpen()); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); + SequenceNumber put_seq = db->GetLatestSequenceNumber(); auto* transaction = db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); ASSERT_OK(transaction->SetName("txn")); @@ -2799,11 +2800,202 @@ TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) { // Since the delete tombstone is not visible to snapshot2, we need to keep // at least one version of the key, for write-conflict check. VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion}, - {"key1", "value1", 0, kTypeValue}}); + {"key1", "value1", put_seq, kTypeValue}}); db->ReleaseSnapshot(snapshot2); SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_P(WritePreparedTransactionTest, + ReleaseEarliestSnapshotDuringCompaction_WithSD) { + constexpr size_t kSnapshotCacheBits = 7; // same as default + constexpr size_t kCommitCacheBits = 0; // minimum commit cache + UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); + options.disable_auto_compactions = true; + ASSERT_OK(ReOpen()); + + ASSERT_OK(db->Put(WriteOptions(), "key", "value")); + ASSERT_OK(db->Put(WriteOptions(), "foo", "value")); + ASSERT_OK(db->Flush(FlushOptions())); + + auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(), + /*old_txn=*/nullptr); + ASSERT_OK(txn->SingleDelete("key")); + ASSERT_OK(txn->Put("wow", "value")); + ASSERT_OK(txn->SetName("txn")); + ASSERT_OK(txn->Prepare()); + ASSERT_OK(db->Flush(FlushOptions())); + + const bool two_write_queues = std::get<1>(GetParam()); + if (two_write_queues) { + // In the case of two queues, commit another txn just to bump + // last_published_seq so that a subsequent GetSnapshot() call can return + // a snapshot with higher sequence. + auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(), + /*old_txn=*/nullptr); + ASSERT_OK(dummy_txn->Put("haha", "value")); + ASSERT_OK(dummy_txn->Commit()); + delete dummy_txn; + } + auto* snapshot = db->GetSnapshot(); + + ASSERT_OK(txn->Commit()); + delete txn; + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) { + if (!arg) { + return; + } + db->ReleaseSnapshot(snapshot); + + // Advance max_evicted_seq + ASSERT_OK(db->Put(WriteOptions(), "bar", "value")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(WritePreparedTransactionTest, + ReleaseEarliestSnapshotDuringCompaction_WithSD2) { + constexpr size_t kSnapshotCacheBits = 7; // same as default + constexpr size_t kCommitCacheBits = 0; // minimum commit cache + UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); + options.disable_auto_compactions = true; + ASSERT_OK(ReOpen()); + + ASSERT_OK(db->Put(WriteOptions(), "foo", "value")); + ASSERT_OK(db->Put(WriteOptions(), "key", "value")); + ASSERT_OK(db->Flush(FlushOptions())); + + auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(), + /*old_txn=*/nullptr); + ASSERT_OK(txn->Put("bar", "value")); + ASSERT_OK(txn->SingleDelete("key")); + ASSERT_OK(txn->SetName("txn")); + ASSERT_OK(txn->Prepare()); + ASSERT_OK(db->Flush(FlushOptions())); + + ASSERT_OK(txn->Commit()); + delete txn; + + ASSERT_OK(db->Put(WriteOptions(), "haha", "value")); + + // Create a dummy transaction to take a snapshot for ww-conflict detection. + TransactionOptions txn_opts; + txn_opts.set_snapshot = true; + auto* dummy_txn = + db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr); + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::NextFromInput:SingleDelete:2", [&](void* /*arg*/) { + ASSERT_OK(dummy_txn->Rollback()); + delete dummy_txn; + + ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->Put(WriteOptions(), "haha2", "value")); + auto* snapshot = db->GetSnapshot(); + + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + db->ReleaseSnapshot(snapshot); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(WritePreparedTransactionTest, + ReleaseEarliestSnapshotDuringCompaction_WithDelete) { + constexpr size_t kSnapshotCacheBits = 7; // same as default + constexpr size_t kCommitCacheBits = 0; // minimum commit cache + UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); + options.disable_auto_compactions = true; + ASSERT_OK(ReOpen()); + + ASSERT_OK(db->Put(WriteOptions(), "a", "value")); + ASSERT_OK(db->Put(WriteOptions(), "b", "value")); + ASSERT_OK(db->Put(WriteOptions(), "c", "value")); + ASSERT_OK(db->Flush(FlushOptions())); + + auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(), + /*old_txn=*/nullptr); + ASSERT_OK(txn->Delete("b")); + ASSERT_OK(txn->SetName("txn")); + ASSERT_OK(txn->Prepare()); + + const bool two_write_queues = std::get<1>(GetParam()); + if (two_write_queues) { + // In the case of two queues, commit another txn just to bump + // last_published_seq so that a subsequent GetSnapshot() call can return + // a snapshot with higher sequence. + auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(), + /*old_txn=*/nullptr); + ASSERT_OK(dummy_txn->Put("haha", "value")); + ASSERT_OK(dummy_txn->Commit()); + delete dummy_txn; + } + auto* snapshot1 = db->GetSnapshot(); + ASSERT_OK(txn->Commit()); + delete txn; + auto* snapshot2 = db->GetSnapshot(); + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::NextFromInput:BottommostDelete:1", [&](void* arg) { + if (!arg) { + return; + } + db->ReleaseSnapshot(snapshot1); + + // Advance max_evicted_seq + ASSERT_OK(db->Put(WriteOptions(), "dummy1", "value")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + db->ReleaseSnapshot(snapshot2); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(WritePreparedTransactionTest, + ReleaseSnapshotBetweenSDAndPutDuringCompaction) { + constexpr size_t kSnapshotCacheBits = 7; // same as default + constexpr size_t kCommitCacheBits = 0; // minimum commit cache + UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); + options.disable_auto_compactions = true; + ASSERT_OK(ReOpen()); + + // Create a dummy transaction to take a snapshot for ww-conflict detection. + TransactionOptions txn_opts; + txn_opts.set_snapshot = true; + auto* dummy_txn = + db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr); + // Increment seq + ASSERT_OK(db->Put(WriteOptions(), "bar", "value")); + + ASSERT_OK(db->Put(WriteOptions(), "foo", "value")); + ASSERT_OK(db->SingleDelete(WriteOptions(), "foo")); + auto* snapshot1 = db->GetSnapshot(); + // Increment seq + ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value")); + auto* snapshot2 = db->GetSnapshot(); + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::NextFromInput:KeepSDForWW", [&](void* /*arg*/) { + db->ReleaseSnapshot(snapshot1); + + ASSERT_OK(db->Put(WriteOptions(), "dontcare2", "value2")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->Flush(FlushOptions())); + db->ReleaseSnapshot(snapshot2); + ASSERT_OK(dummy_txn->Commit()); + delete dummy_txn; + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + // A more complex test to verify compaction/flush should keep keys visible // to snapshots. TEST_P(WritePreparedTransactionTest,