diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 4d1401b3a..9265c3d4a 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -13,15 +13,13 @@ namespace rocksdb { bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) { - auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers(); - // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is // in unprep_seqs, we have to check if seq is equal to prep_seq or any of // the prepare_batch_cnt seq nums after it. // // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is // large. - for (const auto& it : unprep_seqs) { + for (const auto& it : unprep_seqs_) { if (it.first <= seq && seq < it.first + it.second) { return true; } @@ -30,15 +28,6 @@ bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) { return db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_); } -SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber( - WriteUnpreparedTxn* txn) { - const auto& unprep_seqs = txn->GetUnpreparedSequenceNumbers(); - if (unprep_seqs.size()) { - return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; - } - return 0; -} - WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) @@ -537,7 +526,7 @@ Status WriteUnpreparedTxn::Get(const ReadOptions& options, const bool backed_by_snapshot = wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, - this); + unprep_seqs_); auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value, &callback); if (LIKELY(wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index b64fd81e6..d81c30217 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -53,17 +53,17 @@ class WriteUnpreparedTxn; // class WriteUnpreparedTxnReadCallback : public ReadCallback { public: - WriteUnpreparedTxnReadCallback(WritePreparedTxnDB* db, - SequenceNumber snapshot, - SequenceNumber min_uncommitted, - WriteUnpreparedTxn* txn) + WriteUnpreparedTxnReadCallback( + WritePreparedTxnDB* db, SequenceNumber snapshot, + SequenceNumber min_uncommitted, + const std::map& unprep_seqs) // Pass our last uncommitted seq as the snapshot to the parent class to // ensure that the parent will not prematurely filter out own writes. We // will do the exact comparison against snapshots in IsVisibleFullCheck // override. - : ReadCallback(CalcMaxVisibleSeq(txn, snapshot), min_uncommitted), + : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted), db_(db), - txn_(txn), + unprep_seqs_(unprep_seqs), wup_snapshot_(snapshot) {} virtual bool IsVisibleFullCheck(SequenceNumber seq) override; @@ -74,15 +74,18 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { } private: - static SequenceNumber CalcMaxVisibleSeq(WriteUnpreparedTxn* txn, - SequenceNumber snapshot_seq) { - SequenceNumber max_unprepared = CalcMaxUnpreparedSequenceNumber(txn); + static SequenceNumber CalcMaxVisibleSeq( + const std::map& unprep_seqs, + SequenceNumber snapshot_seq) { + SequenceNumber max_unprepared = 0; + if (unprep_seqs.size()) { + max_unprepared = + unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; + } return std::max(max_unprepared, snapshot_seq); } - static SequenceNumber CalcMaxUnpreparedSequenceNumber( - WriteUnpreparedTxn* txn); WritePreparedTxnDB* db_; - WriteUnpreparedTxn* txn_; + const std::map& unprep_seqs_; SequenceNumber wup_snapshot_; }; @@ -124,8 +127,6 @@ class WriteUnpreparedTxn : public WritePreparedTxn { virtual Status RebuildFromWriteBatch(WriteBatch*) override; - const std::map& GetUnpreparedSequenceNumbers(); - protected: void Initialize(const TransactionOptions& txn_options) override; @@ -156,6 +157,8 @@ class WriteUnpreparedTxn : public WritePreparedTxn { friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test; friend class WriteUnpreparedTxnDB; + const std::map& GetUnpreparedSequenceNumbers(); + Status MaybeFlushWriteBatchToDB(); Status FlushWriteBatchToDB(bool prepared); Status HandleWrite(std::function do_write); diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index c3fcd1f45..875d54167 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -348,7 +348,8 @@ struct WriteUnpreparedTxnDB::IteratorState { IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, std::shared_ptr s, SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn) - : callback(txn_db, sequence, min_uncommitted, txn), snapshot(s) {} + : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_), + snapshot(s) {} SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); } WriteUnpreparedTxnReadCallback callback; @@ -384,27 +385,22 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options, // foo: v5 5 // // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3, - // which is the last visible key. + // which is the last visible value. // // For unprepared transactions, if we have snap_seq = 3, but the current - // transaction has unprep_seq 5, then returning the first non-visible key + // transaction has unprep_seq 5, then returning the first non-visible value // would be incorrect, as we should return v5, and not v3. The problem is that - // there are committed keys at snapshot_seq < commit_seq < unprep_seq. + // there are committed values at snapshot_seq < commit_seq < unprep_seq. // // Snapshot validation can prevent this problem by ensuring that no committed - // keys exist at snapshot_seq < commit_seq, and thus any value with a sequence - // number greater than snapshot_seq must be unprepared keys. For example, if - // the transaction had a snapshot at 3, then snapshot validation would be - // performed during the Put(v5) call. It would find v4, and the Put would fail - // with snapshot validation failure. - // - // Because of this, if any writes have occurred, then the transaction snapshot - // must be used for the iterator. If no writes have occurred though, we can - // simply create a snapshot. Later writes would not be visible though, but we - // don't support iterating while writing anyway. + // values exist at snapshot_seq < commit_seq, and thus any value with a + // sequence number greater than snapshot_seq must be unprepared values. For + // example, if the transaction had a snapshot at 3, then snapshot validation + // would be performed during the Put(v5) call. It would find v4, and the Put + // would fail with snapshot validation failure. // // TODO(lth): Improve Prev() logic to continue iterating until - // max_visible_seq, and then return the last visible key, so that this + // max_visible_seq, and then return the last visible value, so that this // restriction can be lifted. const Snapshot* snapshot = nullptr; if (options.snapshot == nullptr) { @@ -418,9 +414,9 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options, assert(snapshot_seq != kMaxSequenceNumber); // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are // guaranteed that for keys that were modified by this transaction (and thus - // might have unprepared versions), no committed versions exist at + // might have unprepared values), no committed values exist at // largest_validated_seq < commit_seq (or the contrapositive: any committed - // version must exist at commit_seq <= largest_validated_seq). This implies + // value must exist at commit_seq <= largest_validated_seq). This implies // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <= // snapshot_seq. As explained above, the problem with Prev() only happens when // snapshot_seq < commit_seq.