From 4c70cb730614388041b97a31ae2e5addb1279284 Mon Sep 17 00:00:00 2001 From: Manuel Ung Date: Wed, 14 Aug 2019 14:25:00 -0700 Subject: [PATCH] WriteUnPrepared: support iterating while writing to transaction (#5699) Summary: In MyRocks, there are cases where we write while iterating through keys. This currently breaks WBWIIterator, because if a write batch flushes during iteration, the delta iterator would point to invalid memory. For now, fix by disallowing flush if there are active iterators. In the future, we will loop through all the iterators on a transaction, and refresh the iterators when a write batch is flushed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5699 Differential Revision: D16794157 Pulled By: lth fbshipit-source-id: 5d5bf70688bd68fe58e8a766475ae88fd1be3190 --- .../write_unprepared_transaction_test.cc | 57 +++++++++++++++++++ .../transactions/write_unprepared_txn.cc | 26 +++++++-- utilities/transactions/write_unprepared_txn.h | 13 +++++ 3 files changed, 92 insertions(+), 4 deletions(-) diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 48a07fa12..51e860e63 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -564,6 +564,63 @@ TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) { delete txn; } +// Test whether write to a transaction while iterating is supported. +TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) { + WriteOptions woptions; + TransactionOptions txn_options; + txn_options.write_batch_flush_threshold = 1; + + enum Action { DO_DELETE, DO_UPDATE }; + + for (Action a : {DO_DELETE, DO_UPDATE}) { + for (int i = 0; i < 100; i++) { + ASSERT_OK(db->Put(woptions, ToString(i), ToString(i))); + } + + Transaction* txn = db->BeginTransaction(woptions, txn_options); + // write_batch_ now contains 1 key. + ASSERT_OK(txn->Put("9", "a")); + + ReadOptions roptions; + auto iter = txn->GetIterator(roptions); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + if (iter->key() == "9") { + ASSERT_EQ(iter->value().ToString(), "a"); + } else { + ASSERT_EQ(iter->key().ToString(), iter->value().ToString()); + } + + if (a == DO_DELETE) { + ASSERT_OK(txn->Delete(iter->key())); + } else { + ASSERT_OK(txn->Put(iter->key(), "b")); + } + } + + delete iter; + ASSERT_OK(txn->Commit()); + + iter = db->NewIterator(roptions); + if (a == DO_DELETE) { + // Check that db is empty. + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + } else { + int keys = 0; + // Check that all values are updated to b. + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), keys++) { + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->value().ToString(), "b"); + } + ASSERT_EQ(keys, 100); + } + + delete iter; + delete txn; + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index af39680ac..18ebc3700 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -93,12 +93,17 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { unflushed_save_points_.reset(nullptr); recovered_txn_ = false; largest_validated_seq_ = 0; + assert(active_iterators_.empty()); + active_iterators_.clear(); } Status WriteUnpreparedTxn::HandleWrite(std::function do_write) { - Status s = MaybeFlushWriteBatchToDB(); - if (!s.ok()) { - return s; + Status s; + if (active_iterators_.empty()) { + s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } } s = do_write(); if (s.ok()) { @@ -688,6 +693,8 @@ void WriteUnpreparedTxn::Clear() { unflushed_save_points_.reset(nullptr); recovered_txn_ = false; largest_validated_seq_ = 0; + assert(active_iterators_.empty()); + active_iterators_.clear(); TransactionBaseImpl::Clear(); } @@ -862,6 +869,14 @@ Status WriteUnpreparedTxn::Get(const ReadOptions& options, } } +namespace { +static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) { + auto txn = reinterpret_cast(arg1); + auto iter = reinterpret_cast(arg2); + txn->RemoveActiveIterator(iter); +} +} // anonymous namespace + Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) { return GetIterator(options, wupt_db_->DefaultColumnFamily()); } @@ -872,7 +887,10 @@ Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options, Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this); assert(db_iter); - return write_batch_.NewIteratorWithBase(column_family, db_iter); + auto iter = write_batch_.NewIteratorWithBase(column_family, db_iter); + active_iterators_.push_back(iter); + iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter); + return iter; } Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 692578f61..e2a5399c3 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -160,6 +160,12 @@ class WriteUnpreparedTxn : public WritePreparedTxn { return last_log_number_; } + void RemoveActiveIterator(Iterator* iter) { + active_iterators_.erase( + std::remove(active_iterators_.begin(), active_iterators_.end(), iter), + active_iterators_.end()); + } + protected: void Initialize(const TransactionOptions& txn_options) override; @@ -302,6 +308,13 @@ class WriteUnpreparedTxn : public WritePreparedTxn { std::unique_ptr> flushed_save_points_; std::unique_ptr> unflushed_save_points_; + + // It is currently unsafe to flush a write batch if there are active iterators + // created from this transaction. This is because we use WriteBatchWithIndex + // to do merging reads from the DB and the write batch. If we flush the write + // batch, it is possible that the delta iterator on the iterator will point to + // invalid memory. + std::vector active_iterators_; }; } // namespace rocksdb