diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 16a6d86a6..29b7f6f14 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1857,6 +1857,24 @@ void DBImpl::MultiGetImpl( snapshot = last_seq_same_as_publish_seq_ ? versions_->LastSequence() : versions_->LastPublishedSequence(); + if (callback) { + // The unprep_seqs are not published for write unprepared, so it could be + // that max_visible_seq is larger. Seek to the std::max of the two. + // However, we still want our callback to contain the actual snapshot so + // that it can do the correct visibility filtering. + callback->Refresh(snapshot); + + // Internally, WriteUnpreparedTxnReadCallback::Refresh would set + // max_visible_seq = max(max_visible_seq, snapshot) + // + // Currently, the commented out assert is broken by + // InvalidSnapshotReadCallback, but if write unprepared recovery followed + // the regular transaction flow, then this special read callback would not + // be needed. + // + // assert(callback->max_visible_seq() >= snapshot); + snapshot = callback->max_visible_seq(); + } } // For each of the given keys, apply the entire "get" process as follows: diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index f4c21d476..97bebac5d 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -40,6 +40,25 @@ void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) { prepare_batch_cnt_ = 0; } +void WritePreparedTxn::MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + bool sorted_input) { + SequenceNumber min_uncommitted, snap_seq; + const bool backed_by_snapshot = + wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); + WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted); + write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys, + keys, values, statuses, sorted_input, + &callback); + if (UNLIKELY(!wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { + for (size_t i = 0; i < num_keys; i++) { + statuses[i] = Status::TryAgain(); + } + } +} + Status WritePreparedTxn::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val) { diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 2cd729cd2..c574f6231 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -53,6 +53,13 @@ class WritePreparedTxn : public PessimisticTransaction { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + using Transaction::MultiGet; + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + bool sorted_input = false) override; + // Note: The behavior is undefined in presence of interleaved writes to the // same transaction. // To make WAL commit markers visible, the snapshot will be diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index c677013aa..d8c5eea55 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -524,6 +524,26 @@ void WriteUnpreparedTxn::Clear() { TransactionBaseImpl::Clear(); } +void WriteUnpreparedTxn::MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + bool sorted_input) { + SequenceNumber min_uncommitted, snap_seq; + const bool backed_by_snapshot = + wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); + WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, + unprep_seqs_); + write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys, + keys, values, statuses, sorted_input, + &callback); + if (UNLIKELY(!wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { + for (size_t i = 0; i < num_keys; i++) { + statuses[i] = Status::TryAgain(); + } + } +} + Status WriteUnpreparedTxn::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index bc952544a..2c2315594 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -146,6 +146,13 @@ class WriteUnpreparedTxn : public WritePreparedTxn { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + using Transaction::MultiGet; + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + bool sorted_input = false) override; + using Transaction::GetIterator; virtual Iterator* GetIterator(const ReadOptions& options) override; virtual Iterator* GetIterator(const ReadOptions& options,