diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 48a07fa12..51e860e63 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -564,6 +564,63 @@ TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) { delete txn; } +// Test whether write to a transaction while iterating is supported. +TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) { + WriteOptions woptions; + TransactionOptions txn_options; + txn_options.write_batch_flush_threshold = 1; + + enum Action { DO_DELETE, DO_UPDATE }; + + for (Action a : {DO_DELETE, DO_UPDATE}) { + for (int i = 0; i < 100; i++) { + ASSERT_OK(db->Put(woptions, ToString(i), ToString(i))); + } + + Transaction* txn = db->BeginTransaction(woptions, txn_options); + // write_batch_ now contains 1 key. + ASSERT_OK(txn->Put("9", "a")); + + ReadOptions roptions; + auto iter = txn->GetIterator(roptions); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + if (iter->key() == "9") { + ASSERT_EQ(iter->value().ToString(), "a"); + } else { + ASSERT_EQ(iter->key().ToString(), iter->value().ToString()); + } + + if (a == DO_DELETE) { + ASSERT_OK(txn->Delete(iter->key())); + } else { + ASSERT_OK(txn->Put(iter->key(), "b")); + } + } + + delete iter; + ASSERT_OK(txn->Commit()); + + iter = db->NewIterator(roptions); + if (a == DO_DELETE) { + // Check that db is empty. + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + } else { + int keys = 0; + // Check that all values are updated to b. + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), keys++) { + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->value().ToString(), "b"); + } + ASSERT_EQ(keys, 100); + } + + delete iter; + delete txn; + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index af39680ac..18ebc3700 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -93,12 +93,17 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { unflushed_save_points_.reset(nullptr); recovered_txn_ = false; largest_validated_seq_ = 0; + assert(active_iterators_.empty()); + active_iterators_.clear(); } Status WriteUnpreparedTxn::HandleWrite(std::function do_write) { - Status s = MaybeFlushWriteBatchToDB(); - if (!s.ok()) { - return s; + Status s; + if (active_iterators_.empty()) { + s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } } s = do_write(); if (s.ok()) { @@ -688,6 +693,8 @@ void WriteUnpreparedTxn::Clear() { unflushed_save_points_.reset(nullptr); recovered_txn_ = false; largest_validated_seq_ = 0; + assert(active_iterators_.empty()); + active_iterators_.clear(); TransactionBaseImpl::Clear(); } @@ -862,6 +869,14 @@ Status WriteUnpreparedTxn::Get(const ReadOptions& options, } } +namespace { +static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) { + auto txn = reinterpret_cast(arg1); + auto iter = reinterpret_cast(arg2); + txn->RemoveActiveIterator(iter); +} +} // anonymous namespace + Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) { return GetIterator(options, wupt_db_->DefaultColumnFamily()); } @@ -872,7 +887,10 @@ Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options, Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this); assert(db_iter); - return write_batch_.NewIteratorWithBase(column_family, db_iter); + auto iter = write_batch_.NewIteratorWithBase(column_family, db_iter); + active_iterators_.push_back(iter); + iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter); + return iter; } Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 692578f61..e2a5399c3 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -160,6 +160,12 @@ class WriteUnpreparedTxn : public WritePreparedTxn { return last_log_number_; } + void RemoveActiveIterator(Iterator* iter) { + active_iterators_.erase( + std::remove(active_iterators_.begin(), active_iterators_.end(), iter), + active_iterators_.end()); + } + protected: void Initialize(const TransactionOptions& txn_options) override; @@ -302,6 +308,13 @@ class WriteUnpreparedTxn : public WritePreparedTxn { std::unique_ptr> flushed_save_points_; std::unique_ptr> unflushed_save_points_; + + // It is currently unsafe to flush a write batch if there are active iterators + // created from this transaction. This is because we use WriteBatchWithIndex + // to do merging reads from the DB and the write batch. If we flush the write + // batch, it is possible that the delta iterator on the iterator will point to + // invalid memory. + std::vector active_iterators_; }; } // namespace rocksdb