diff --git a/db/db_impl.cc b/db/db_impl.cc index 84c9dca75..adb0652ad 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1037,12 +1037,19 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, SequenceNumber snapshot; if (read_options.snapshot != nullptr) { - // Note: In WritePrepared txns this is not necessary but not harmful either. - // Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is - // specified we should be fine with skipping seq numbers that are greater - // than that. + // Note: In WritePrepared txns this is not necessary but not harmful + // either. Because prep_seq > snapshot => commit_seq > snapshot so if + // a snapshot is specified we should be fine with skipping seq numbers + // that are greater than that. + // + // In WriteUnprepared, we cannot set snapshot in the lookup key because we + // may skip uncommitted data that should be visible to the transaction for + // reading own writes. snapshot = reinterpret_cast(read_options.snapshot)->number_; + if (callback) { + snapshot = std::max(snapshot, callback->MaxUnpreparedSequenceNumber()); + } } else { // Since we get and reference the super version before getting // the snapshot number, without a mutex protection, it is possible @@ -1050,10 +1057,10 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // data for this snapshot is available. But it will contain all // the data available in the super version we have, which is also // a valid snapshot to read from. - // We shouldn't get snapshot before finding and referencing the - // super versipon because a flush happening in between may compact - // away data for the snapshot, but the snapshot is earlier than the - // data overwriting it, so users may see wrong results. + // We shouldn't get snapshot before finding and referencing the super + // version because a flush happening in between may compact away data for + // the snapshot, but the snapshot is earlier than the data overwriting it, + // so users may see wrong results. snapshot = last_seq_same_as_publish_seq_ ? versions_->LastSequence() : versions_->LastPublishedSequence(); diff --git a/db/db_iter.cc b/db/db_iter.cc index e24db0fb4..90ad2cfd6 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -260,6 +260,18 @@ class DBIter final: public Iterator { bool TooManyInternalKeysSkipped(bool increment = true); bool IsVisible(SequenceNumber sequence); + // CanReseekToSkip() returns whether the iterator can use the optimization + // where it reseek by sequence number to get the next key when there are too + // many versions. This is disabled for write unprepared because seeking to + // sequence number does not guarantee that it is visible. + inline bool CanReseekToSkip(); + + // MaxVisibleSequenceNumber() returns the maximum visible sequence number + // for this snapshot. This sequence number may be greater than snapshot + // seqno because uncommitted data written to DB for write unprepared will + // have a higher sequence number. + inline SequenceNumber MaxVisibleSequenceNumber(); + // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // is called void TempPinData() { @@ -578,7 +590,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // If we have sequentially iterated via numerous equal keys, then it's // better to seek so that we can avoid too many key comparisons. - if (num_skipped > max_skip_) { + if (num_skipped > max_skip_ && CanReseekToSkip()) { num_skipped = 0; std::string last_key; if (skipping) { @@ -895,7 +907,7 @@ bool DBIter::FindValueForCurrentKey() { // This user key has lots of entries. // We're going from old to new, and it's taking too long. Let's do a Seek() // and go from new to old. This helps when a key was overwritten many times. - if (num_skipped >= max_skip_) { + if (num_skipped >= max_skip_ && CanReseekToSkip()) { return FindValueForCurrentKeyUsingSeek(); } @@ -1194,7 +1206,7 @@ bool DBIter::FindUserKeyBeforeSavedKey() { PERF_COUNTER_ADD(internal_key_skipped_count, 1); } - if (num_skipped >= max_skip_) { + if (num_skipped >= max_skip_ && CanReseekToSkip()) { num_skipped = 0; IterKey last_key; last_key.SetInternalKey(ParsedInternalKey( @@ -1234,8 +1246,21 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) { } bool DBIter::IsVisible(SequenceNumber sequence) { - return sequence <= sequence_ && - (read_callback_ == nullptr || read_callback_->IsCommitted(sequence)); + return sequence <= MaxVisibleSequenceNumber() && + (read_callback_ == nullptr || read_callback_->IsVisible(sequence)); +} + +bool DBIter::CanReseekToSkip() { + return read_callback_ == nullptr || + read_callback_->MaxUnpreparedSequenceNumber() == 0; +} + +SequenceNumber DBIter::MaxVisibleSequenceNumber() { + if (read_callback_ == nullptr) { + return sequence_; + } + + return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber()); } void DBIter::Seek(const Slice& target) { @@ -1243,14 +1268,16 @@ void DBIter::Seek(const Slice& target) { status_ = Status::OK(); ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); + + SequenceNumber seq = MaxVisibleSequenceNumber(); saved_key_.Clear(); - saved_key_.SetInternalKey(target, sequence_); + saved_key_.SetInternalKey(target, seq); if (iterate_lower_bound_ != nullptr && user_comparator_->Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) < 0) { saved_key_.Clear(); - saved_key_.SetInternalKey(*iterate_lower_bound_, sequence_); + saved_key_.SetInternalKey(*iterate_lower_bound_, seq); } { diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 7eeb12118..b040a02b3 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -20,7 +20,7 @@ namespace rocksdb { // A dumb ReadCallback which saying every key is committed. class DummyReadCallback : public ReadCallback { - bool IsCommitted(SequenceNumber /*seq*/) { return true; } + bool IsVisible(SequenceNumber /*seq*/) override { return true; } }; // Test param: @@ -2417,7 +2417,7 @@ TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) { explicit TestReadCallback(SequenceNumber last_visible_seq) : last_visible_seq_(last_visible_seq) {} - bool IsCommitted(SequenceNumber seq) override { + bool IsVisible(SequenceNumber seq) override { return seq <= last_visible_seq_; } diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 7b9808d66..2c9634755 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -20,7 +20,7 @@ class TestReadCallback : public ReadCallback { SequenceNumber snapshot_seq) : snapshot_checker_(snapshot_checker), snapshot_seq_(snapshot_seq) {} - bool IsCommitted(SequenceNumber seq) override { + bool IsVisible(SequenceNumber seq) override { return snapshot_checker_->IsInSnapshot(seq, snapshot_seq_); } diff --git a/db/db_test2.cc b/db/db_test2.cc index 5043b38a9..61f261369 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2422,7 +2422,7 @@ TEST_F(DBTest2, ReadCallbackTest) { class TestReadCallback : public ReadCallback { public: explicit TestReadCallback(SequenceNumber snapshot) : snapshot_(snapshot) {} - virtual bool IsCommitted(SequenceNumber seq) override { + virtual bool IsVisible(SequenceNumber seq) override { return seq <= snapshot_; } diff --git a/db/memtable.cc b/db/memtable.cc index 91559a596..c68827ad9 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -579,7 +579,7 @@ struct Saver { bool CheckCallback(SequenceNumber _seq) { if (callback_) { - return callback_->IsCommitted(_seq); + return callback_->IsVisible(_seq); } return true; } diff --git a/db/read_callback.h b/db/read_callback.h index f3fe35dfc..440f7848d 100644 --- a/db/read_callback.h +++ b/db/read_callback.h @@ -13,9 +13,18 @@ class ReadCallback { public: virtual ~ReadCallback() {} - // Will be called to see if the seq number accepted; if not it moves on to the - // next seq number. - virtual bool IsCommitted(SequenceNumber seq) = 0; + // Will be called to see if the seq number visible; if not it moves on to + // the next seq number. + virtual bool IsVisible(SequenceNumber seq) = 0; + + // This is called to determine the maximum visible sequence number for the + // current transaction for read-your-own-write semantics. This is so that + // for write unprepared, we will not skip keys that are written by the + // current transaction with the seek to snapshot optimization. + // + // For other uses, this returns zero, meaning that the current snapshot + // sequence number is the maximum visible sequence number. + inline virtual SequenceNumber MaxUnpreparedSequenceNumber() { return 0; }; }; } // namespace rocksdb diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 5fd7700f1..e79f5985f 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -229,6 +229,7 @@ class WriteBatchWithIndex : public WriteBatchBase { private: friend class PessimisticTransactionDB; friend class WritePreparedTxn; + friend class WriteUnpreparedTxn; friend class WriteBatchWithIndex_SubBatchCnt_Test; // Returns the number of sub-batches inside the write batch. A sub-batch // starts right before inserting a key that is a duplicate of a key in the diff --git a/table/get_context.h b/table/get_context.h index 90a5ff35c..2b9135676 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -72,7 +72,7 @@ class GetContext { bool CheckCallback(SequenceNumber seq) { if (callback_) { - return callback_->IsCommitted(seq); + return callback_->IsVisible(seq); } return true; } diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index 545b92f7f..1d511880b 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -108,7 +108,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, } else if (found_record_for_key) { bool write_conflict = snap_checker == nullptr ? snap_seq < seq - : !snap_checker->IsCommitted(seq); + : !snap_checker->IsVisible(seq); if (write_conflict) { result = Status::Busy(); } diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 13ab8679a..105c37df7 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -306,9 +306,10 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, SequenceNumber min_uncommitted = 0; if (options.snapshot != nullptr) { snapshot_seq = options.snapshot->GetSequenceNumber(); - min_uncommitted = static_cast_with_check( - options.snapshot) - ->min_uncommitted_; + min_uncommitted = + static_cast_with_check( + options.snapshot) + ->min_uncommitted_; } else { auto* snapshot = GetSnapshot(); // We take a snapshot to make sure that the related data in the commit map diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index b62a6f74e..65797aa77 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -612,9 +612,9 @@ class WritePreparedTxnReadCallback : public ReadCallback { SequenceNumber min_uncommitted) : db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {} - // Will be called to see if the seq number accepted; if not it moves on to the - // next seq number. - inline virtual bool IsCommitted(SequenceNumber seq) override { + // Will be called to see if the seq number visible; if not it moves on to + // the next seq number. + inline virtual bool IsVisible(SequenceNumber seq) override { return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_); } diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 96fe9bac9..115695cd0 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -10,8 +10,185 @@ #endif #include "utilities/transactions/transaction_test.h" +#include "utilities/transactions/write_unprepared_txn.h" +#include "utilities/transactions/write_unprepared_txn_db.h" -namespace rocksdb {} // namespace rocksdb +namespace rocksdb { + +class WriteUnpreparedTransactionTestBase : public TransactionTestBase { + public: + WriteUnpreparedTransactionTestBase(bool use_stackable_db, + bool two_write_queue, + TxnDBWritePolicy write_policy) + : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){} +}; + +class WriteUnpreparedTransactionTest + : public WriteUnpreparedTransactionTestBase, + virtual public ::testing::WithParamInterface< + std::tuple> { + public: + WriteUnpreparedTransactionTest() + : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()), + std::get<1>(GetParam()), + std::get<2>(GetParam())){} +}; + +INSTANTIATE_TEST_CASE_P( + WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest, + ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED), + std::make_tuple(false, true, WRITE_UNPREPARED))); + +TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { + auto verify_state = [](Iterator* iter, const std::string& key, + const std::string& value) { + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(key, iter->key().ToString()); + ASSERT_EQ(value, iter->value().ToString()); + }; + + options.disable_auto_compactions = true; + ReOpen(); + + // The following tests checks whether reading your own write for + // a transaction works for write unprepared, when there are uncommitted + // values written into DB. + // + // Although the values written by DB::Put are technically committed, we add + // their seq num to unprep_seqs_ to pretend that they were written into DB + // as part of an unprepared batch, and then check if they are visible to the + // transaction. + auto snapshot0 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v1")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v2")); + auto snapshot2 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v3")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v4")); + auto snapshot4 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v5")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v6")); + auto snapshot6 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v7")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v8")); + auto snapshot8 = db->GetSnapshot(); + + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + WriteUnpreparedTxn* wup_txn = dynamic_cast(txn); + + ReadOptions roptions; + roptions.snapshot = snapshot0; + + auto iter = txn->GetIterator(roptions); + + // Test Get(). + std::string value; + wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = + snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + + ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); + ASSERT_EQ(value, "v3"); + + ASSERT_OK(txn->Get(roptions, Slice("b"), &value)); + ASSERT_EQ(value, "v4"); + + wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = + snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + + ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); + ASSERT_EQ(value, "v7"); + + ASSERT_OK(txn->Get(roptions, Slice("b"), &value)); + ASSERT_EQ(value, "v8"); + + wup_txn->unprep_seqs_.clear(); + + // Test Next(). + wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = + snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + + iter->Seek("a"); + verify_state(iter, "a", "v3"); + + iter->Next(); + verify_state(iter, "b", "v4"); + + iter->SeekToFirst(); + verify_state(iter, "a", "v3"); + + iter->Next(); + verify_state(iter, "b", "v4"); + + wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = + snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + + iter->Seek("a"); + verify_state(iter, "a", "v7"); + + iter->Next(); + verify_state(iter, "b", "v8"); + + iter->SeekToFirst(); + verify_state(iter, "a", "v7"); + + iter->Next(); + verify_state(iter, "b", "v8"); + + wup_txn->unprep_seqs_.clear(); + + // Test Prev(). For Prev(), we need to adjust the snapshot to match what is + // possible in WriteUnpreparedTxn. + // + // Because of row locks and ValidateSnapshot, there cannot be any committed + // entries after snapshot, but before the first prepared key. + delete iter; + roptions.snapshot = snapshot2; + iter = txn->GetIterator(roptions); + wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = + snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + + iter->SeekForPrev("b"); + verify_state(iter, "b", "v4"); + + iter->Prev(); + verify_state(iter, "a", "v3"); + + iter->SeekToLast(); + verify_state(iter, "b", "v4"); + + iter->Prev(); + verify_state(iter, "a", "v3"); + + delete iter; + roptions.snapshot = snapshot6; + iter = txn->GetIterator(roptions); + wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = + snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + + iter->SeekForPrev("b"); + verify_state(iter, "b", "v8"); + + iter->Prev(); + verify_state(iter, "a", "v7"); + + iter->SeekToLast(); + verify_state(iter, "b", "v8"); + + iter->Prev(); + verify_state(iter, "a", "v7"); + + db->ReleaseSnapshot(snapshot0); + db->ReleaseSnapshot(snapshot2); + db->ReleaseSnapshot(snapshot4); + db->ReleaseSnapshot(snapshot6); + db->ReleaseSnapshot(snapshot8); + delete iter; + delete txn; +} + +} // namespace rocksdb int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 3ed263039..001c2444d 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -6,6 +6,9 @@ #ifndef ROCKSDB_LITE #include "utilities/transactions/write_unprepared_txn.h" +#include "db/db_impl.h" +#include "util/cast_util.h" +#include "utilities/transactions/write_unprepared_txn_db.h" #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS @@ -13,6 +16,74 @@ namespace rocksdb { +bool WriteUnpreparedTxnReadCallback::IsVisible(SequenceNumber seq) { + auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers(); + + // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is + // in unprep_seqs, we have to check if seq is equal to prep_seq or any of + // the prepare_batch_cnt seq nums after it. + // + // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is + // large. + for (const auto& it : unprep_seqs) { + if (it.first <= seq && seq < it.first + it.second) { + return true; + } + } + + return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_); +} + +SequenceNumber WriteUnpreparedTxnReadCallback::MaxUnpreparedSequenceNumber() { + auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers(); + if (unprep_seqs.size()) { + return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; + } + + return 0; +} + +WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, + const WriteOptions& write_options, + const TransactionOptions& txn_options) + : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) {} + +Status WriteUnpreparedTxn::Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) { + auto snapshot = options.snapshot; + auto snap_seq = + snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; + SequenceNumber min_uncommitted = 0; // by default disable the optimization + if (snapshot != nullptr) { + min_uncommitted = + static_cast_with_check(snapshot) + ->min_uncommitted_; + } + + WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, + this); + return write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value, + &callback); +} + +Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) { + return GetIterator(options, wupt_db_->DefaultColumnFamily()); +} + +Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) { + // Make sure to get iterator from WriteUnprepareTxnDB, not the root db. + Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this); + assert(db_iter); + + return write_batch_.NewIteratorWithBase(column_family, db_iter); +} + +const std::map& +WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() { + return unprep_seqs_; +} } // namespace rocksdb diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index bc5141437..65eb7ad98 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -8,12 +8,67 @@ #ifndef ROCKSDB_LITE #include "utilities/transactions/write_prepared_txn.h" +#include "utilities/transactions/write_unprepared_txn_db.h" namespace rocksdb { -class WriteUnpreparedTxn : public WritePreparedTxn { - using WritePreparedTxn::WritePreparedTxn; +class WriteUnpreparedTxnDB; +class WriteUnpreparedTxn; +class WriteUnpreparedTxnReadCallback : public ReadCallback { + public: + WriteUnpreparedTxnReadCallback(WritePreparedTxnDB* db, + SequenceNumber snapshot, + SequenceNumber min_uncommitted, + WriteUnpreparedTxn* txn) + : db_(db), + snapshot_(snapshot), + min_uncommitted_(min_uncommitted), + txn_(txn) {} + + virtual bool IsVisible(SequenceNumber seq) override; + virtual SequenceNumber MaxUnpreparedSequenceNumber() override; + + private: + WritePreparedTxnDB* db_; + SequenceNumber snapshot_; + SequenceNumber min_uncommitted_; + WriteUnpreparedTxn* txn_; +}; + +class WriteUnpreparedTxn : public WritePreparedTxn { + public: + WriteUnpreparedTxn(WriteUnpreparedTxnDB* db, + const WriteOptions& write_options, + const TransactionOptions& txn_options); + + virtual ~WriteUnpreparedTxn() {} + + // Get and GetIterator needs to be overridden so that a ReadCallback to + // handle read-your-own-write is used. + using Transaction::Get; + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) override; + + using Transaction::GetIterator; + virtual Iterator* GetIterator(const ReadOptions& options) override; + virtual Iterator* GetIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override; + + const std::map& GetUnpreparedSequenceNumbers(); + + private: + friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test; + + WriteUnpreparedTxnDB* wupt_db_; + + // Ordered list of unprep_seq sequence numbers that we have already written + // to DB. + // + // This maps unprep_seq => prepare_batch_cnt for each prepared batch written + // by this transactioin. + std::map unprep_seqs_; }; } // namespace rocksdb diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 036e96d34..913ee3bd4 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -11,12 +11,13 @@ #include "utilities/transactions/write_unprepared_txn_db.h" #include "rocksdb/utilities/transaction_db.h" +#include "util/cast_util.h" namespace rocksdb { Transaction* WriteUnpreparedTxnDB::BeginTransaction( -const WriteOptions& write_options, const TransactionOptions& txn_options, -Transaction* old_txn) { + const WriteOptions& write_options, const TransactionOptions& txn_options, + Transaction* old_txn) { if (old_txn != nullptr) { ReinitializeTransaction(old_txn, write_options, txn_options); return old_txn; @@ -25,5 +26,58 @@ Transaction* old_txn) { } } +// Struct to hold ownership of snapshot and read callback for iterator cleanup. +struct WriteUnpreparedTxnDB::IteratorState { + IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, + std::shared_ptr s, + SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn) + : callback(txn_db, sequence, min_uncommitted, txn), snapshot(s) {} + + WriteUnpreparedTxnReadCallback callback; + std::shared_ptr snapshot; +}; + +namespace { +static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) { + delete reinterpret_cast(arg1); +} +} // anonymous namespace + +Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family, + WriteUnpreparedTxn* txn) { + // TODO(lth): Refactor so that this logic is shared with WritePrepared. + constexpr bool ALLOW_BLOB = true; + constexpr bool ALLOW_REFRESH = true; + std::shared_ptr own_snapshot = nullptr; + SequenceNumber snapshot_seq; + SequenceNumber min_uncommitted = 0; + if (options.snapshot != nullptr) { + snapshot_seq = options.snapshot->GetSequenceNumber(); + min_uncommitted = + static_cast_with_check( + options.snapshot) + ->min_uncommitted_; + } else { + auto* snapshot = GetSnapshot(); + // We take a snapshot to make sure that the related data in the commit map + // are not deleted. + snapshot_seq = snapshot->GetSequenceNumber(); + min_uncommitted = + static_cast_with_check(snapshot) + ->min_uncommitted_; + own_snapshot = std::make_shared(db_impl_, snapshot); + } + assert(snapshot_seq != kMaxSequenceNumber); + auto* cfd = reinterpret_cast(column_family)->cfd(); + auto* state = + new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn); + auto* db_iter = + db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, + !ALLOW_BLOB, !ALLOW_REFRESH); + db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr); + return db_iter; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_unprepared_txn_db.h b/utilities/transactions/write_unprepared_txn_db.h index 9bb6ad3d2..10393d59e 100644 --- a/utilities/transactions/write_unprepared_txn_db.h +++ b/utilities/transactions/write_unprepared_txn_db.h @@ -11,16 +11,26 @@ #endif #include "utilities/transactions/write_prepared_txn_db.h" - #include "utilities/transactions/write_unprepared_txn.h" namespace rocksdb { +class WriteUnpreparedTxn; + class WriteUnpreparedTxnDB : public WritePreparedTxnDB { + public: using WritePreparedTxnDB::WritePreparedTxnDB; Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) override; + + // Struct to hold ownership of snapshot and read callback for cleanup. + struct IteratorState; + + using WritePreparedTxnDB::NewIterator; + Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family, + WriteUnpreparedTxn* txn); }; } // namespace rocksdb