diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 9053acb1e..f73333c61 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -2557,6 +2557,81 @@ TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) { delete iter; } +// When an old prepared entry gets committed, there is a gap between the time +// that it is published and when it is cleaned up from old_prepared_. This test +// stresses such cacese. +TEST_P(WritePreparedTransactionTest, CommitOfOldPrepared) { + const size_t snapshot_cache_bits = 7; // same as default + for (const size_t commit_cache_bits : {0, 2, 3}) { + for (const size_t sub_batch_cnt : {1, 2, 3}) { + DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + std::atomic snap = {nullptr}; + std::atomic exp_prepare = {0}; + // Value is synchronized via snap + PinnableSlice value; + // Take a snapshot after publish and before RemovePrepared:Start + auto callback = [&](void* param) { + SequenceNumber prep_seq = *((SequenceNumber*)param); + if (prep_seq == exp_prepare.load()) { // only for write_thread + ASSERT_EQ(nullptr, snap.load()); + snap.store(db->GetSnapshot()); + ReadOptions roptions; + roptions.snapshot = snap.load(); + auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value); + ASSERT_OK(s); + } + }; + SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback); + SyncPoint::GetInstance()->EnableProcessing(); + // Thread to cause frequent evictions + rocksdb::port::Thread eviction_thread([&]() { + // Too many txns might cause commit_seq - prepare_seq in another thread + // to go beyond DELTA_UPPERBOUND + for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) { + db->Put(WriteOptions(), Slice("key1"), Slice("value1")); + } + }); + rocksdb::port::Thread write_thread([&]() { + for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) { + Transaction* txn = + db->BeginTransaction(WriteOptions(), TransactionOptions()); + ASSERT_OK(txn->SetName("xid")); + std::string val_str = "value" + ToString(i); + for (size_t b = 0; b < sub_batch_cnt; b++) { + ASSERT_OK(txn->Put(Slice("key"), val_str)); + } + ASSERT_OK(txn->Prepare()); + // Let an eviction to kick in + std::this_thread::yield(); + + exp_prepare.store(txn->GetId()); + ASSERT_OK(txn->Commit()); + delete txn; + + // Read with the snapshot taken before delayed_prepared_ cleanup + ReadOptions roptions; + roptions.snapshot = snap.load(); + ASSERT_NE(nullptr, roptions.snapshot); + PinnableSlice value2; + auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value2); + ASSERT_OK(s); + // It should see its own write + ASSERT_TRUE(val_str == value2); + // The value read by snapshot should not change + ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str()); + + db->ReleaseSnapshot(roptions.snapshot); + snap.store(nullptr); + } + }); + write_thread.join(); + eviction_thread.join(); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + } +} + // Test that updating the commit map will not affect the existing snapshots TEST_P(WritePreparedTransactionTest, AtomicCommit) { for (bool skip_prepare : {true, false}) { diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 893a2e011..c0cf42550 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -394,15 +394,21 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { } void WritePreparedTxnDB::AddPrepared(uint64_t seq) { - ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq); - assert(seq > max_evicted_seq_); - if (seq <= max_evicted_seq_) { - throw std::runtime_error( - "Added prepare_seq is larger than max_evicted_seq_: " + ToString(seq) + - " <= " + ToString(max_evicted_seq_.load())); - } + ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing with max %" PRIu64, + seq, max_evicted_seq_.load()); WriteLock wl(&prepared_mutex_); - prepared_txns_.push(seq); + if (UNLIKELY(seq <= max_evicted_seq_)) { + // This should not happen in normal case + ROCKS_LOG_ERROR( + info_log_, + "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64 + " <= %" PRIu64, + seq, max_evicted_seq_.load()); + delayed_prepared_.insert(seq); + delayed_prepared_empty_.store(false, std::memory_order_release); + } else { + prepared_txns_.push(seq); + } } void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, @@ -443,6 +449,21 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, // After each eviction from commit cache, check if the commit entry should // be kept around because it overlaps with a live snapshot. CheckAgainstSnapshots(evicted); + if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) { + WriteLock wl(&prepared_mutex_); + for (auto dp : delayed_prepared_) { + if (dp == evicted.prep_seq) { + // This is a rare case that txn is committed but prepared_txns_ is not + // cleaned up yet. Refer to delayed_prepared_commits_ definition for + // why it should be kept updated. + delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq; + ROCKS_LOG_DEBUG(info_log_, + "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64, + evicted.prep_seq, evicted.commit_seq); + break; + } + } + } } bool succ = ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq}); @@ -465,12 +486,24 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq, const size_t batch_cnt) { + TEST_SYNC_POINT_CALLBACK( + "RemovePrepared:Start", + const_cast(reinterpret_cast(&prepare_seq))); + ROCKS_LOG_DETAILS(info_log_, + "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt, + prepare_seq, batch_cnt); WriteLock wl(&prepared_mutex_); for (size_t i = 0; i < batch_cnt; i++) { prepared_txns_.erase(prepare_seq + i); bool was_empty = delayed_prepared_.empty(); if (!was_empty) { delayed_prepared_.erase(prepare_seq + i); + auto it = delayed_prepared_commits_.find(prepare_seq + i); + if (it != delayed_prepared_commits_.end()) { + ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64, + prepare_seq + i); + delayed_prepared_commits_.erase(it); + } bool is_empty = delayed_prepared_.empty(); if (was_empty != is_empty) { delayed_prepared_empty_.store(is_empty, std::memory_order_release); @@ -511,8 +544,19 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, const SequenceNumber& new_max) { ROCKS_LOG_DETAILS(info_log_, - "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64, - prev_max, new_max); + "AdvanceMaxEvictedSeq overhead %" PRIu64 + " => %" PRIu64 prev_max, + new_max); + // Declare the intention before getting snapshot from the DB. This helps a + // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if + // it has not already. Otherwise the new snapshot is when we ask DB for + // snapshots smaller than future max. + auto updated_future_max = prev_max; + while (updated_future_max < new_max && + !future_max_evicted_seq_.compare_exchange_weak( + updated_future_max, new_max, std::memory_order_acq_rel, + std::memory_order_relaxed)) { + }; // When max_evicted_seq_ advances, move older entries from prepared_txns_ // to delayed_prepared_. This guarantees that if a seq is lower than max, // then it is not in prepared_txns_ ans save an expensive, synchronized @@ -520,6 +564,11 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, // normal cases. { WriteLock wl(&prepared_mutex_); + ROCKS_LOG_DETAILS( + info_log_, + "AdvanceMaxEvictedSeq prepared_txns_.empty() %d top: %" PRIu64, + prepared_txns_.empty(), + prepared_txns_.empty() ? 0 : prepared_txns_.top()); while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { auto to_be_popped = prepared_txns_.top(); delayed_prepared_.insert(to_be_popped); @@ -587,15 +636,21 @@ SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal( SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check); assert(snap_impl); SequenceNumber snap_seq = snap_impl->GetSequenceNumber(); - if (UNLIKELY(snap_seq != 0 && snap_seq <= max_evicted_seq_)) { + // Note: Check against future_max_evicted_seq_ (in contrast with + // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq. + if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) { // There is a very rare case in which the commit entry evicts another commit // entry that is not published yet thus advancing max evicted seq beyond the // last published seq. This case is not likely in real-world setup so we // handle it with a few retries. size_t retry = 0; - while (snap_impl->GetSequenceNumber() <= max_evicted_seq_ && retry < 100) { - ROCKS_LOG_WARN(info_log_, "GetSnapshot retry %" PRIu64, - snap_impl->GetSequenceNumber()); + SequenceNumber max; + while ((max = future_max_evicted_seq_.load()) != 0 && + snap_impl->GetSequenceNumber() <= max && retry < 100) { + ROCKS_LOG_WARN(info_log_, + "GetSnapshot snap: %" PRIu64 " max: %" PRIu64 + " retry %" ROCKSDB_PRIszt, + snap_impl->GetSequenceNumber(), max, retry); ReleaseSnapshot(snap_impl); // Wait for last visible seq to catch up with max, and also go beyond it // by one. @@ -604,20 +659,19 @@ SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal( assert(snap_impl); retry++; } - assert(snap_impl->GetSequenceNumber() > max_evicted_seq_); - if (snap_impl->GetSequenceNumber() <= max_evicted_seq_) { - throw std::runtime_error("Snapshot seq " + - ToString(snap_impl->GetSequenceNumber()) + - " after " + ToString(retry) + - " retries is still less than max_evicted_seq_" + - ToString(max_evicted_seq_.load())); + assert(snap_impl->GetSequenceNumber() > max); + if (snap_impl->GetSequenceNumber() <= max) { + throw std::runtime_error( + "Snapshot seq " + ToString(snap_impl->GetSequenceNumber()) + + " after " + ToString(retry) + + " retries is still less than futre_max_evicted_seq_" + ToString(max)); } } EnhanceSnapshot(snap_impl, min_uncommitted); ROCKS_LOG_DETAILS( db_impl_->immutable_db_options().info_log, "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64, - for_ww_conflict_check, snap_impl->GetSequenceNumber(), min_uncommitted); + snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted); return snap_impl; } diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index c6eca8c47..4c0fd8ccd 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -153,21 +153,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { prep_seq, snapshot_seq, 1, min_uncommitted); return true; } - if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { - // We should not normally reach here - WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); - ReadLock rl(&prepared_mutex_); - ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64, - static_cast(delayed_prepared_.size())); - if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { - // Then it is not committed yet - ROCKS_LOG_DETAILS(info_log_, - "IsInSnapshot %" PRIu64 " in %" PRIu64 - " returns %" PRId32, - prep_seq, snapshot_seq, 0); - return false; - } - } auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; CommitEntry64b dont_care; CommitEntry cached; @@ -193,6 +178,34 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { prep_seq, snapshot_seq, 0); return false; } + if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { + // We should not normally reach here + WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); + ReadLock rl(&prepared_mutex_); + ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64, + static_cast(delayed_prepared_.size())); + if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { + // This is the order: 1) delayed_prepared_commits_ update, 2) publish 3) + // delayed_prepared_ clean up. So check if it is the case of a late + // clenaup. + auto it = delayed_prepared_commits_.find(prep_seq); + if (it == delayed_prepared_commits_.end()) { + // Then it is not committed yet + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32, + prep_seq, snapshot_seq, 0); + return false; + } else { + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " commit: %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, it->second, + snapshot_seq <= it->second); + return it->second <= snapshot_seq; + } + } + } // When advancing max_evicted_seq_, we move older entires from prepared to // delayed_prepared_. Also we move evicted entries from commit cache to // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= @@ -267,7 +280,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { void AddPrepared(uint64_t seq); // Remove the transaction with prepare sequence seq from the prepared list void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); - // Add the transaction with prepare sequence prepare_seq and commit sequence + // Add the transaction with prepare sequence prepare_seq and comtit sequence // commit_seq to the commit map. loop_cnt is to detect infinite loops. void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, uint8_t loop_cnt = 0); @@ -383,6 +396,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { const ColumnFamilyOptions& cf_options) override; private: + friend class WritePreparedCommitEntryPreReleaseCallback; friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; friend class WritePreparedTransactionTest_CommitMapTest_Test; @@ -614,6 +628,12 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // commit_cache_. So commit_cache_ must first be checked before consulting // with max_evicted_seq_. std::atomic max_evicted_seq_ = {}; + // Order: 1) update future_max_evicted_seq_ = new_max, 2) + // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since + // GetSnapshotInternal guarantess that the snapshot seq is larger than + // future_max_evicted_seq_, this guarantes that if a snapshot is not larger + // than max has already being looked at via a GetSnapshotListFromDB(new_max). + std::atomic future_max_evicted_seq_ = {}; // Advance max_evicted_seq_ by this value each time it needs an update. The // larger the value, the less frequent advances we would have. We do not want // it to be too large either as it would cause stalls by doing too much @@ -631,6 +651,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // time max_evicted_seq_ advances their sequence number. This is expected to // be empty normally. Thread-safety is provided with prepared_mutex_. std::set delayed_prepared_; + // Commit of a delayed prepared: 1) update commit cache, 2) update + // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_. + // delayed_prepared_commits_ will help us tell apart the unprepared txns from + // the ones that are committed but not cleaned up yet. + std::unordered_map delayed_prepared_commits_; // Update when delayed_prepared_.empty() changes. Expected to be true // normally. std::atomic delayed_prepared_empty_ = {true};