diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index a3f9f6303..c0d8537a2 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -522,9 +522,13 @@ class Transaction { id_ = id; } + virtual uint64_t GetLastLogNumber() const { return log_number_; } + private: friend class PessimisticTransactionDB; friend class WriteUnpreparedTxnDB; + friend class TransactionTest_TwoPhaseLogRollingTest_Test; + friend class TransactionTest_TwoPhaseLogRollingTest2_Test; // No copying allowed Transaction(const Transaction&); void operator=(const Transaction&); diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 7798e63da..fa271c35d 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -112,8 +112,14 @@ struct TransactionDBOptions { // 8m entry, 64MB size size_t wp_commit_cache_bits = static_cast(23); + // For testing, whether transaction name should be auto-generated or not. This + // is useful for write unprepared which requires named transactions. + bool autogenerate_name = false; + friend class WritePreparedTxnDB; + friend class WriteUnpreparedTxn; friend class WritePreparedTransactionTestBase; + friend class TransactionTestBase; friend class MySQLStyleTransactionTest; }; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 98548dd95..551632614 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -1727,7 +1727,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) { // our log should be in the heap ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), txn1->GetLogNumber()); - ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); + ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber()); // flush default cf to crate new log s = db->Put(wopts, "foo", "bar"); @@ -1736,12 +1736,12 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) { ASSERT_OK(s); // make sure we are on a new log - ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); + ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber()); // put txn2 prep section in this log s = txn2->Prepare(); ASSERT_OK(s); - ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); + ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber()); // heap should still see first log ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), @@ -1777,7 +1777,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) { ASSERT_OK(s); // make sure we are on a new log - ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); + ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber()); // commit txn2 s = txn2->Commit(); @@ -1878,7 +1878,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { s = db->Put(wopts, "cats", "dogs1"); ASSERT_OK(s); - auto prepare_log_no = txn1->GetLogNumber(); + auto prepare_log_no = txn1->GetLastLogNumber(); // roll to LOG B s = db_impl->TEST_FlushMemTable(true); @@ -1905,7 +1905,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { assert(false); } ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), - prepare_log_no); + txn1->GetLogNumber()); ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0); // commit in LOG B @@ -2604,10 +2604,8 @@ TEST_P(TransactionTest, ColumnFamiliesTest) { std::vector handles; - s = TransactionDB::Open(options, txn_db_options, dbname, column_families, - &handles, &db); + ASSERT_OK(ReOpenNoDelete(column_families, &handles)); assert(db != nullptr); - ASSERT_OK(s); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); @@ -2769,10 +2767,8 @@ TEST_P(TransactionTest, MultiGetBatchedTest) { std::vector handles; options.merge_operator = MergeOperators::CreateStringAppendOperator(); - s = TransactionDB::Open(options, txn_db_options, dbname, column_families, - &handles, &db); + ASSERT_OK(ReOpenNoDelete(column_families, &handles)); assert(db != nullptr); - ASSERT_OK(s); // Write some data to the db WriteBatch batch; @@ -3132,6 +3128,12 @@ TEST_P(TransactionTest, LostUpdate) { } TEST_P(TransactionTest, UntrackedWrites) { + if (txn_db_options.write_policy == WRITE_UNPREPARED) { + // TODO(lth): For WriteUnprepared, validate that untracked writes are + // not supported. + return; + } + WriteOptions write_options; ReadOptions read_options; std::string value; @@ -3376,7 +3378,7 @@ TEST_P(TransactionTest, LockLimitTest) { // Open DB with a lock limit of 3 txn_db_options.max_num_locks = 3; - s = TransactionDB::Open(options, txn_db_options, dbname, &db); + ASSERT_OK(ReOpen()); assert(db != nullptr); ASSERT_OK(s); @@ -5285,6 +5287,9 @@ TEST_P(TransactionTest, MemoryLimitTest) { TransactionOptions txn_options; // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data. txn_options.max_write_batch_size = 29; + // Set threshold to unlimited so that the write batch does not get flushed, + // and can hit the memory limit. + txn_options.write_batch_flush_threshold = 0; std::string value; Status s; diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 9b634c11c..abfa7d8a9 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -27,6 +27,7 @@ #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" #include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/write_unprepared_txn_db.h" #include "port/port.h" @@ -67,6 +68,12 @@ class TransactionTestBase : public ::testing::Test { txn_db_options.default_lock_timeout = 0; txn_db_options.write_policy = write_policy; txn_db_options.rollback_merge_operands = true; + // This will stress write unprepared, by forcing write batch flush on every + // write. + txn_db_options.default_write_batch_flush_threshold = 1; + // Write unprepared requires all transactions to be named. This setting + // autogenerates the name so that existing tests can pass. + txn_db_options.autogenerate_name = true; Status s; if (use_stackable_db == false) { s = TransactionDB::Open(options, txn_db_options, dbname, &db); @@ -273,13 +280,20 @@ class TransactionTestBase : public ::testing::Test { if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { // Consume one seq per key exp_seq += 4; - } else { + } else if (txn_db_options.write_policy == + TxnDBWritePolicy::WRITE_PREPARED) { // Consume one seq per batch exp_seq++; if (options.two_write_queues) { // Consume one seq for commit exp_seq++; } + } else { + // Flushed after each key, consume one seq per flushed batch + exp_seq += 4; + // WriteUnprepared implements CommitWithoutPrepareInternal by simply + // calling Prepare then Commit. Consume one seq for the prepare. + exp_seq++; } delete txn; with_empty_commits++; @@ -303,11 +317,17 @@ class TransactionTestBase : public ::testing::Test { if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { // Consume one seq per key exp_seq += 5; - } else { + } else if (txn_db_options.write_policy == + TxnDBWritePolicy::WRITE_PREPARED) { // Consume one seq per batch exp_seq++; // Consume one seq per commit marker exp_seq++; + } else { + // Flushed after each key, consume one seq per flushed batch + exp_seq += 5; + // Consume one seq per commit marker + exp_seq++; } delete txn; }; @@ -330,7 +350,8 @@ class TransactionTestBase : public ::testing::Test { if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { // No seq is consumed for deleting the txn buffer exp_seq += 0; - } else { + } else if (txn_db_options.write_policy == + TxnDBWritePolicy::WRITE_PREPARED) { // Consume one seq per batch exp_seq++; // Consume one seq per rollback batch @@ -339,6 +360,15 @@ class TransactionTestBase : public ::testing::Test { // Consume one seq for rollback commit exp_seq++; } + } else { + // Flushed after each key, consume one seq per flushed batch + exp_seq += 5; + // Consume one seq per rollback batch + exp_seq++; + if (options.two_write_queues) { + // Consume one seq for rollback commit + exp_seq++; + } } delete txn; }; diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 1e7384dc7..d4f0d993a 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1612,7 +1612,7 @@ TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) { txn = txns[index]; txns.erase(txns.begin() + index); } - // Since commit cahce is practically disabled, commit results in immediate + // Since commit cache is practically disabled, commit results in immediate // advance in max_evicted_seq_ and subsequently moving some prepared txns // to delayed_prepared_. txn->Commit(); diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index e9d305c69..88b638975 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -335,7 +335,11 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) { for (int i = 0; i < num_batches; i++) { ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i))); if (txn_options.write_batch_flush_threshold == 1) { - ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); + // WriteUnprepared will check write_batch_flush_threshold and + // possibly flush before appending to the write batch. No flush + // will happen at the first write because the batch is still + // empty, so after k puts, there should be k-1 flushed batches. + ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i); } else { ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); } @@ -411,7 +415,11 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) { for (int i = 0; i < kNumKeys; i++) { txn->Put("k" + ToString(i), "v" + ToString(i)); if (txn_options.write_batch_flush_threshold == 1) { - ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); + // WriteUnprepared will check write_batch_flush_threshold and + // possibly flush before appending to the write batch. No flush will + // happen at the first write because the batch is still empty, so + // after k puts, there should be k-1 flushed batches. + ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i); } else { ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); } diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 321110ea1..85a38981c 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -37,6 +37,7 @@ WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, const TransactionOptions& txn_options) : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db), + last_log_number_(0), recovered_txn_(false), largest_validated_seq_(0) { if (txn_options.write_batch_flush_threshold < 0) { @@ -56,10 +57,15 @@ WriteUnpreparedTxn::~WriteUnpreparedTxn() { // We should rollback regardless of GetState, but some unit tests that // test crash recovery run the destructor assuming that rollback does not // happen, so that rollback during recovery can be exercised. - if (GetState() == STARTED) { - auto s __attribute__((__unused__)) = RollbackInternal(); - // TODO(lth): Better error handling. + if (GetState() == STARTED || GetState() == LOCKS_STOLEN) { + auto s = RollbackInternal(); assert(s.ok()); + if (!s.ok()) { + ROCKS_LOG_FATAL( + wupt_db_->info_log_, + "Rollback of WriteUnprepared transaction failed in destructor: %s", + s.ToString().c_str()); + } dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( log_number_); } @@ -233,6 +239,7 @@ Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { const bool kPrepared = true; Status s; if (write_batch_flush_threshold_ > 0 && + write_batch_.GetWriteBatch()->Count() > 0 && write_batch_.GetDataSize() > static_cast(write_batch_flush_threshold_)) { assert(GetState() != PREPARED); @@ -257,7 +264,17 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) { if (name_.empty()) { - return Status::InvalidArgument("Cannot write to DB without SetName."); + assert(!prepared); +#ifndef NDEBUG + static std::atomic_ullong autogen_id{0}; + // To avoid changing all tests to call SetName, just autogenerate one. + if (wupt_db_->txn_db_options_.autogenerate_name) { + SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1))); + } else +#endif + { + return Status::InvalidArgument("Cannot write to DB without SetName."); + } } // TODO(lth): Reduce duplicate code with WritePrepared prepare logic. @@ -285,11 +302,14 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) { // from the current transaction. This means that if log_number_ is set, // WriteImpl should not overwrite that value, so set log_used to nullptr if // log_number_ is already set. - uint64_t* log_used = log_number_ ? nullptr : &log_number_; - auto s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), - /*callback*/ nullptr, log_used, /*log ref*/ - 0, !DISABLE_MEMTABLE, &seq_used, - prepare_batch_cnt_, &add_prepared_callback); + auto s = + db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + /*callback*/ nullptr, &last_log_number_, /*log ref*/ + 0, !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_, + &add_prepared_callback); + if (log_number_ == 0) { + log_number_ = last_log_number_; + } assert(!s.ok() || seq_used != kMaxSequenceNumber); auto prepare_seq = seq_used; diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 5c654b05b..cfa18d6ee 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -145,8 +145,21 @@ class WriteUnpreparedTxn : public WritePreparedTxn { const SliceParts& key, const bool assume_tracked = false) override; + // In WriteUnprepared, untracked writes will break snapshot validation logic. + // Snapshot validation will only check the largest sequence number of a key to + // see if it was committed or not. However, an untracked unprepared write will + // hide smaller committed sequence numbers. + // + // TODO(lth): Investigate whether it is worth having snapshot validation + // validate all values larger than snap_seq. Otherwise, we should return + // Status::NotSupported for untracked writes. + virtual Status RebuildFromWriteBatch(WriteBatch*) override; + virtual uint64_t GetLastLogNumber() const override { + return last_log_number_; + } + protected: void Initialize(const TransactionOptions& txn_options) override; @@ -219,6 +232,8 @@ class WriteUnpreparedTxn : public WritePreparedTxn { // commit callbacks. std::map unprep_seqs_; + uint64_t last_log_number_; + // Recovered transactions have tracked_keys_ populated, but are not actually // locked for efficiency reasons. For recovered transactions, skip unlocking // keys when transaction ends.