diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 70b639578..609bd0dd7 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -731,6 +731,52 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) { MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); } +// Reproduce the bug with two snapshots with the same seuqence number and test +// that the release of the first snapshot will not affect the reads by the other +// snapshot +TEST_P(WritePreparedTransactionTest, DoubleSnapshot) { + TransactionOptions txn_options; + Status s; + + // Insert initial value + ASSERT_OK(db->Put(WriteOptions(), "key", "value1")); + + WritePreparedTxnDB* wp_db = dynamic_cast(db); + Transaction* txn = + wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr); + ASSERT_OK(txn->SetName("txn")); + ASSERT_OK(txn->Put("key", "value2")); + ASSERT_OK(txn->Prepare()); + // Two snapshots with the same seq number + const Snapshot* snapshot1 = wp_db->GetSnapshot(); + const Snapshot* snapshot2 = wp_db->GetSnapshot(); + ASSERT_OK(txn->Commit()); + SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE; + SequenceNumber overlap_seq = txn->GetId() + cache_size; + delete txn; + + // Cause an eviction to advance max evicted seq number + wp_db->AddCommitted(overlap_seq, overlap_seq); + + ReadOptions ropt; + // It should see the value before commit + ropt.snapshot = snapshot2; + PinnableSlice pinnable_val; + s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == "value1"); + pinnable_val.Reset(); + + wp_db->ReleaseSnapshot(snapshot1); + + // It should still see the value before commit + s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == "value1"); + pinnable_val.Reset(); + wp_db->ReleaseSnapshot(snapshot2); +} + // Test that the entries in old_commit_map_ get garbage collected properly TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { const size_t snapshot_cache_bits = 0; diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index fed2cb35b..f61700e00 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -554,12 +554,6 @@ const std::vector WritePreparedTxnDB::GetSnapshotListFromDB( return db_impl_->snapshots().GetAll(nullptr, max); } -void WritePreparedTxnDB::ReleaseSnapshot(const Snapshot* snapshot) { - auto snap_seq = snapshot->GetSequenceNumber(); - ReleaseSnapshotInternal(snap_seq); - db_impl_->ReleaseSnapshot(snapshot); -} - void WritePreparedTxnDB::ReleaseSnapshotInternal( const SequenceNumber snap_seq) { // relax is enough since max increases monotonically, i.e., if snap_seq < @@ -590,6 +584,28 @@ void WritePreparedTxnDB::ReleaseSnapshotInternal( } } +void WritePreparedTxnDB::CleanupReleasedSnapshots( + const std::vector& new_snapshots, + const std::vector& old_snapshots) { + auto newi = new_snapshots.begin(); + auto oldi = old_snapshots.begin(); + for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) { + assert(*newi >= *oldi); // cannot have new snapshots with lower seq + if (*newi == *oldi) { // still not released + newi++; + oldi++; + } else { + assert(*newi > *oldi); // *oldi is released + ReleaseSnapshotInternal(*oldi); + oldi++; + } + } + // Everything remained in old_snapshots is released and must be cleaned up + for (; oldi != old_snapshots.end(); oldi++) { + ReleaseSnapshotInternal(*oldi); + } +} + void WritePreparedTxnDB::UpdateSnapshots( const std::vector& snapshots, const SequenceNumber& version) { @@ -638,6 +654,12 @@ void WritePreparedTxnDB::UpdateSnapshots( // Update the size at the end. Otherwise a parallel reader might read // items that are not set yet. snapshots_total_.store(snapshots.size(), std::memory_order_release); + + // Note: this must be done after the snapshots data structures are updated + // with the new list of snapshots. + CleanupReleasedSnapshots(snapshots, snapshots_all_); + snapshots_all_ = snapshots; + TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end"); TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end"); } diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 83562c94d..e0263d4f7 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -112,8 +112,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { const std::vector& column_families, std::vector* iterators) override; - virtual void ReleaseSnapshot(const Snapshot* snapshot) override; - // Check whether the transaction that wrote the value with sequence number seq // is visible to the snapshot with sequence number snapshot_seq. // Returns true if commit_seq <= snapshot_seq @@ -379,6 +377,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; + friend class WritePreparedTransactionTest_DoubleSnapshot_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_RollbackTest_Test; @@ -518,6 +517,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // version value. void UpdateSnapshots(const std::vector& snapshots, const SequenceNumber& version); + // Check the new list of new snapshots against the old one to see if any of + // the snapshots are released and to do the cleanup for the released snapshot. + void CleanupReleasedSnapshots( + const std::vector& new_snapshots, + const std::vector& old_snapshots); // Check an evicted entry against live snapshots to see if it should be kept // around or it can be safely discarded (and hence assume committed for all @@ -552,6 +556,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // 2nd list for storing snapshots. The list sorted in ascending order. // Thread-safety is provided with snapshots_mutex_. std::vector snapshots_; + // The list of all snapshots: snapshots_ + snapshot_cache_. This list although + // redundant but simplifies CleanupOldSnapshots implementation. + // Thread-safety is provided with snapshots_mutex_. + std::vector snapshots_all_; // The version of the latest list of snapshots. This can be used to avoid // rewriting a list that is concurrently updated with a more recent version. SequenceNumber snapshots_version_ = 0;