diff --git a/test_util/transaction_test_util.cc b/test_util/transaction_test_util.cc index a9410f5fc..28f16a5e7 100644 --- a/test_util/transaction_test_util.cc +++ b/test_util/transaction_test_util.cc @@ -349,6 +349,7 @@ Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets, static_cast(key.size()), key.data(), int_value); total += int_value; } + iter->status().PermitUncheckedError(); delete iter; } diff --git a/utilities/transactions/lock/point/point_lock_manager.cc b/utilities/transactions/lock/point/point_lock_manager.cc index 0ca6e38f0..fe1fc573f 100644 --- a/utilities/transactions/lock/point/point_lock_manager.cc +++ b/utilities/transactions/lock/point/point_lock_manager.cc @@ -635,7 +635,7 @@ void PointLockManager::UnLock(PessimisticTransaction* txn, assert(lock_map->lock_map_stripes_.size() > stripe_num); LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); - stripe->stripe_mutex->Lock(); + stripe->stripe_mutex->Lock().PermitUncheckedError(); UnLockKey(txn, key, stripe, lock_map, env); stripe->stripe_mutex->UnLock(); @@ -677,7 +677,7 @@ void PointLockManager::UnLock(PessimisticTransaction* txn, assert(lock_map->lock_map_stripes_.size() > stripe_num); LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); - stripe->stripe_mutex->Lock(); + stripe->stripe_mutex->Lock().PermitUncheckedError(); for (const std::string* key : stripe_keys) { UnLockKey(txn, *key, stripe, lock_map, env); @@ -708,7 +708,7 @@ PointLockManager::PointLockStatus PointLockManager::GetPointLockStatus() { const auto& stripes = lock_maps_[i]->lock_map_stripes_; // Iterate and lock all stripes in ascending order. for (const auto& j : stripes) { - j->stripe_mutex->Lock(); + j->stripe_mutex->Lock().PermitUncheckedError(); for (const auto& it : j->keys) { struct KeyLockInfo info; info.exclusive = it.second.exclusive; diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 6531773ec..8c71c1de7 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -173,7 +173,6 @@ Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { } Status PessimisticTransaction::Prepare() { - Status s; if (name_.empty()) { return Status::InvalidArgument( @@ -184,6 +183,7 @@ Status PessimisticTransaction::Prepare() { return Status::Expired(); } + Status s; bool can_prepare = false; if (expiration_time_ > 0) { @@ -226,7 +226,9 @@ Status PessimisticTransaction::Prepare() { Status WriteCommittedTxn::PrepareInternal() { WriteOptions write_options = write_options_; write_options.disableWAL = false; - WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); + auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), + name_); + assert(s.ok()); class MarkLogCallback : public PreReleaseCallback { public: MarkLogCallback(DBImpl* db, bool two_write_queues) @@ -256,15 +258,14 @@ Status WriteCommittedTxn::PrepareInternal() { const bool kDisableMemtable = true; SequenceNumber* const KIgnoreSeqUsed = nullptr; const size_t kNoBatchCount = 0; - Status s = db_impl_->WriteImpl( - write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback, - &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount, - &mark_log_callback); + s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + kNoWriteCallback, &log_number_, kRefNoLog, + kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount, + &mark_log_callback); return s; } Status PessimisticTransaction::Commit() { - Status s; bool commit_without_prepare = false; bool commit_prepared = false; @@ -294,6 +295,7 @@ Status PessimisticTransaction::Commit() { } } + Status s; if (commit_without_prepare) { assert(!commit_prepared); if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) { @@ -377,7 +379,8 @@ Status WriteCommittedTxn::CommitInternal() { // We take the commit-time batch and append the Commit marker. // The Memtable will ignore the Commit marker in non-recovery mode WriteBatch* working_batch = GetCommitTimeWriteBatch(); - WriteBatchInternal::MarkCommit(working_batch, name_); + auto s = WriteBatchInternal::MarkCommit(working_batch, name_); + assert(s.ok()); // any operations appended to this working_batch will be ignored from WAL working_batch->MarkWalTerminationPoint(); @@ -385,13 +388,14 @@ Status WriteCommittedTxn::CommitInternal() { // insert prepared batch into Memtable only skipping WAL. // Memtable will ignore BeginPrepare/EndPrepare markers // in non recovery mode and simply insert the values - WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch()); + s = WriteBatchInternal::Append(working_batch, + GetWriteBatch()->GetWriteBatch()); + assert(s.ok()); uint64_t seq_used = kMaxSequenceNumber; - auto s = - db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr, + s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr, /*log_used*/ nullptr, /*log_ref*/ log_number_, - /*disable_memtable*/ false, &seq_used); + /*disable_memtable*/ false, &seq_used); assert(!s.ok() || seq_used != kMaxSequenceNumber); if (s.ok()) { SetId(seq_used); @@ -439,8 +443,9 @@ Status PessimisticTransaction::Rollback() { Status WriteCommittedTxn::RollbackInternal() { WriteBatch rollback_marker; - WriteBatchInternal::MarkRollback(&rollback_marker, name_); - auto s = db_impl_->WriteImpl(write_options_, &rollback_marker); + auto s = WriteBatchInternal::MarkRollback(&rollback_marker, name_); + assert(s.ok()); + s = db_impl_->WriteImpl(write_options_, &rollback_marker); return s; } @@ -505,9 +510,10 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch, // Iterating on this handler will add all keys in this batch into keys Handler handler; - batch->Iterate(&handler); - - Status s; + Status s = batch->Iterate(&handler); + if (!s.ok()) { + return s; + } // Attempt to lock all keys for (const auto& cf_iter : handler.keys_) { diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 50bfd038c..1482dc961 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -530,7 +530,9 @@ Status TransactionBaseImpl::SingleDeleteUntracked( } void TransactionBaseImpl::PutLogData(const Slice& blob) { - write_batch_.PutLogData(blob); + auto s = write_batch_.PutLogData(blob); + (void)s; + assert(s.ok()); } WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() { diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 449fea71e..14f507b81 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -271,7 +271,8 @@ class TransactionBaseImpl : public Transaction { write_batch_.Clear(); } assert(write_batch_.GetDataSize() == WriteBatchInternal::kHeader); - WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + auto s = WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + assert(s.ok()); } DB* db_; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index d8d0b3f81..72068cb85 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -198,7 +198,7 @@ TEST_P(TransactionTest, AssumeExclusiveTracked) { ASSERT_OK(txn->SingleDelete(db->DefaultColumnFamily(), Slice("foo"), ASSUME_LOCKED)); - txn->Rollback(); + ASSERT_OK(txn->Rollback()); delete txn; } @@ -223,7 +223,7 @@ TEST_P(TransactionTest, ValidateSnapshotTest) { if (with_flush) { auto db_impl = static_cast_with_check(db->GetRootDB()); - db_impl->TEST_FlushMemTable(true); + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); // Make sure the flushed memtable is not kept in memory int max_memtable_in_history = std::max( @@ -232,8 +232,8 @@ TEST_P(TransactionTest, ValidateSnapshotTest) { static_cast(options.write_buffer_size)) + 1; for (int i = 0; i < max_memtable_in_history; i++) { - db->Put(write_options, Slice("key"), Slice("value")); - db_impl->TEST_FlushMemTable(true); + ASSERT_OK(db->Put(write_options, Slice("key"), Slice("value"))); + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); } } @@ -392,9 +392,9 @@ TEST_P(TransactionTest, SharedLocks) { ASSERT_EQ(expected_txns, lock_txns); ASSERT_FALSE(cf_iterator->second.exclusive); - txn1->Rollback(); - txn2->Rollback(); - txn3->Rollback(); + ASSERT_OK(txn1->Rollback()); + ASSERT_OK(txn2->Rollback()); + ASSERT_OK(txn3->Rollback()); // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it. s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); @@ -416,9 +416,9 @@ TEST_P(TransactionTest, SharedLocks) { s = txn3->GetForUpdate(read_options, "foo", nullptr); ASSERT_OK(s); - txn1->Rollback(); - txn2->Rollback(); - txn3->Rollback(); + ASSERT_OK(txn1->Rollback()); + ASSERT_OK(txn2->Rollback()); + ASSERT_OK(txn3->Rollback()); // Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock. s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */); @@ -454,8 +454,8 @@ TEST_P(TransactionTest, SharedLocks) { ASSERT_TRUE(s.IsTimedOut()); ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key"); - txn1->Rollback(); - txn2->Rollback(); + ASSERT_OK(txn1->Rollback()); + ASSERT_OK(txn2->Rollback()); // Test txn1 holding an exclusive lock and txn2 trying to obtain shared // access. @@ -519,7 +519,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) { auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr, true /* exclusive */); ASSERT_OK(s); - txns[i]->Rollback(); + ASSERT_OK(txns[i]->Rollback()); delete txns[i]; }; threads.emplace_back(blocking_thread); @@ -581,7 +581,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) { // Rollback the leaf transaction. for (uint32_t i = 15; i < 31; i++) { - txns[i]->Rollback(); + ASSERT_OK(txns[i]->Rollback()); delete txns[i]; } @@ -651,7 +651,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) { auto s = txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr); ASSERT_OK(s); - txns_shared[i]->Rollback(); + ASSERT_OK(txns_shared[i]->Rollback()); delete txns_shared[i]; }; threads_shared.emplace_back(blocking_thread); @@ -678,7 +678,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) { // Verify the exclusivity field of the transactions in the deadlock path. ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive); ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive); - txns_shared[1]->Rollback(); + ASSERT_OK(txns_shared[1]->Rollback()); delete txns_shared[1]; for (auto& t : threads_shared) { @@ -725,7 +725,7 @@ TEST_P(TransactionStressTest, DeadlockCycle) { std::function blocking_thread = [&, i] { auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr); ASSERT_OK(s); - txns[i]->Rollback(); + ASSERT_OK(txns[i]->Rollback()); delete txns[i]; }; threads.emplace_back(blocking_thread); @@ -786,7 +786,7 @@ TEST_P(TransactionStressTest, DeadlockCycle) { } // Rollback the last transaction. - txns[len - 1]->Rollback(); + ASSERT_OK(txns[len - 1]->Rollback()); delete txns[len - 1]; for (auto& t : threads) { @@ -809,7 +809,7 @@ TEST_P(TransactionStressTest, DeadlockStress) { std::vector keys; for (uint32_t i = 0; i < NUM_KEYS; i++) { - db->Put(write_options, Slice(ToString(i)), Slice("")); + ASSERT_OK(db->Put(write_options, Slice(ToString(i)), Slice(""))); keys.push_back(ToString(i)); } @@ -831,7 +831,7 @@ TEST_P(TransactionStressTest, DeadlockStress) { txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0); if (!s.ok()) { ASSERT_TRUE(s.IsDeadlock()); - txn->Rollback(); + ASSERT_OK(txn->Rollback()); break; } } @@ -896,7 +896,7 @@ TEST_P(TransactionTest, LogMarkLeakTest) { ASSERT_OK(txn->Commit()); delete txn; } - db_impl->TEST_FlushMemTable(true); + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); } for (auto txn : txns) { ASSERT_OK(txn->Commit()); @@ -941,12 +941,14 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { ASSERT_EQ(1, txn->GetNumPuts()); // regular db read - db->Get(read_options, "foo2", &value); + ASSERT_OK(db->Get(read_options, "foo2", &value)); ASSERT_EQ(value, "bar2"); // commit time put - txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")); - txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")); + ASSERT_OK( + txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs"))); + ASSERT_OK( + txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats"))); // nothing has been prepped yet ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); @@ -1017,7 +1019,7 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { assert(false); } - db_impl->TEST_FlushMemTable(true); + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); // After flush the recoverable state must be visible if (cwb4recovery) { s = db->Get(read_options, "gtid", &value); @@ -1104,8 +1106,8 @@ TEST_P(TransactionTest, TwoPhaseNameTest) { s = txn1->SetName("name4"); ASSERT_EQ(s, Status::InvalidArgument()); - txn1->Rollback(); - txn2->Rollback(); + ASSERT_OK(txn1->Rollback()); + ASSERT_OK(txn2->Rollback()); delete txn1; delete txn2; } @@ -1144,7 +1146,8 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) { delete txn1; - txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")); + ASSERT_OK( + txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar"))); s = txn2->Prepare(); ASSERT_OK(s); @@ -1160,13 +1163,13 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) { } else { if (test_with_empty_wal) { DBImpl* db_impl = static_cast_with_check(db->GetRootDB()); - db_impl->TEST_FlushMemTable(true); + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); // After flush the state must be visible s = db->Get(read_options, "foo", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar"); } - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // kill and reopen to trigger recovery s = ReOpenNoDelete(); ASSERT_OK(s); @@ -1259,7 +1262,7 @@ TEST_P(TransactionTest, TwoPhaseRollbackTest) { // flush to next wal s = db->Put(write_options, Slice("foo"), Slice("bar")); ASSERT_OK(s); - db_impl->TEST_FlushMemTable(true); + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); // issue rollback (marker written to WAL) s = txn->Rollback(); @@ -1315,7 +1318,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { ASSERT_OK(s); ASSERT_EQ(1, txn->GetNumPuts()); - db_impl->TEST_FlushMemTable(true); + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); // regular db read db->Get(read_options, "foo2", &value); @@ -1332,7 +1335,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { s = db->Get(read_options, Slice("foo"), &value); ASSERT_TRUE(s.IsNotFound()); - db->FlushWAL(false); + ASSERT_OK(db->FlushWAL(false)); delete txn; // kill and reopen reinterpret_cast(db)->TEST_Crash(); @@ -1406,7 +1409,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { s = db->Put(write_options, Slice("foo3"), Slice("bar3")); ASSERT_OK(s); - db_impl->TEST_FlushMemTable(true); + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); // after memtable flush we can now release the log ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep); @@ -1672,7 +1675,7 @@ TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) { // kill and reopen env->SetFilesystemActive(false); - ReOpenNoDelete(); + ASSERT_OK(ReOpenNoDelete()); assert(db != nullptr); // value is now available @@ -2020,12 +2023,12 @@ TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) { s = db->Put(wal_on, "cats", "dogs4"); ASSERT_OK(s); - db->FlushWAL(false); + ASSERT_OK(db->FlushWAL(false)); // kill and reopen env->SetFilesystemActive(false); reinterpret_cast(db)->TEST_Crash(); - ReOpenNoDelete(); + ASSERT_OK(ReOpenNoDelete()); assert(db != nullptr); s = db->Get(read_options, "first", &value); @@ -2098,8 +2101,8 @@ TEST_P(TransactionTest, WriteConflictTest) { string value; Status s; - db->Put(write_options, "foo", "A"); - db->Put(write_options, "foo2", "B"); + ASSERT_OK(db->Put(write_options, "foo", "A")); + ASSERT_OK(db->Put(write_options, "foo2", "B")); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); @@ -2135,7 +2138,7 @@ TEST_P(TransactionTest, WriteConflictTest2) { std::string value; Status s; - db->Put(write_options, "foo", "bar"); + ASSERT_OK(db->Put(write_options, "foo", "bar")); txn_options.set_snapshot = true; Transaction* txn = db->BeginTransaction(write_options, txn_options); @@ -2183,8 +2186,8 @@ TEST_P(TransactionTest, ReadConflictTest) { std::string value; Status s; - db->Put(write_options, "foo", "bar"); - db->Put(write_options, "foo2", "bar"); + ASSERT_OK(db->Put(write_options, "foo", "bar")); + ASSERT_OK(db->Put(write_options, "foo2", "bar")); txn_options.set_snapshot = true; Transaction* txn = db->BeginTransaction(write_options, txn_options); @@ -2193,7 +2196,7 @@ TEST_P(TransactionTest, ReadConflictTest) { txn->SetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot(); - txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); ASSERT_EQ(value, "bar"); // This Put outside of a transaction will conflict with the previous read @@ -2239,21 +2242,21 @@ TEST_P(TransactionTest, FlushTest) { std::string value; Status s; - db->Put(write_options, Slice("foo"), Slice("bar")); - db->Put(write_options, Slice("foo2"), Slice("bar")); + ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar"))); + ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar"))); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); snapshot_read_options.snapshot = txn->GetSnapshot(); - txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); ASSERT_EQ(value, "bar"); s = txn->Put(Slice("foo"), Slice("bar2")); ASSERT_OK(s); - txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); ASSERT_EQ(value, "bar2"); // Put a random key so we have a memtable to flush @@ -2304,9 +2307,9 @@ TEST_P(TransactionTest, FlushTest2) { DBImpl* db_impl = static_cast_with_check(db->GetRootDB()); - db->Put(write_options, Slice("foo"), Slice("bar")); - db->Put(write_options, Slice("foo2"), Slice("bar2")); - db->Put(write_options, Slice("foo3"), Slice("bar3")); + ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar"))); + ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar2"))); + ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar3"))); txn_options.set_snapshot = true; Transaction* txn = db->BeginTransaction(write_options, txn_options); @@ -2314,13 +2317,13 @@ TEST_P(TransactionTest, FlushTest2) { snapshot_read_options.snapshot = txn->GetSnapshot(); - txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); ASSERT_EQ(value, "bar"); s = txn->Put(Slice("foo"), Slice("bar2")); ASSERT_OK(s); - txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); ASSERT_EQ(value, "bar2"); // verify foo is locked by txn s = db->Delete(write_options, "foo"); @@ -2405,7 +2408,7 @@ TEST_P(TransactionTest, FlushTest2) { s = db->Delete(write_options, "foo3"); ASSERT_TRUE(s.IsTimedOut()); - db_impl->TEST_WaitForCompact(); + ASSERT_OK(db_impl->TEST_WaitForCompact()); s = txn->Commit(); ASSERT_OK(s); @@ -2432,16 +2435,16 @@ TEST_P(TransactionTest, NoSnapshotTest) { std::string value; Status s; - db->Put(write_options, "AAA", "bar"); + ASSERT_OK(db->Put(write_options, "AAA", "bar")); Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); // Modify key after transaction start - db->Put(write_options, "AAA", "bar1"); + ASSERT_OK(db->Put(write_options, "AAA", "bar1")); // Read and write without a snap - txn->GetForUpdate(read_options, "AAA", &value); + ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value)); ASSERT_EQ(value, "bar1"); s = txn->Put("AAA", "bar2"); ASSERT_OK(s); @@ -2450,7 +2453,7 @@ TEST_P(TransactionTest, NoSnapshotTest) { s = txn->Commit(); ASSERT_OK(s); - txn->GetForUpdate(read_options, "AAA", &value); + ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value)); ASSERT_EQ(value, "bar2"); delete txn; @@ -2469,7 +2472,7 @@ TEST_P(TransactionTest, MultipleSnapshotTest) { Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); - db->Put(write_options, "AAA", "bar1"); + ASSERT_OK(db->Put(write_options, "AAA", "bar1")); // Read and write without a snapshot ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value)); @@ -2496,7 +2499,7 @@ TEST_P(TransactionTest, MultipleSnapshotTest) { snapshot_read_options.snapshot = txn->GetSnapshot(); // Read and write with snapshot - txn->GetForUpdate(snapshot_read_options, "CCC", &value); + ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "CCC", &value)); ASSERT_EQ(value, "bar1"); s = txn->Put("CCC", "bar2"); ASSERT_OK(s); @@ -2539,8 +2542,8 @@ TEST_P(TransactionTest, MultipleSnapshotTest) { txn = db->BeginTransaction(write_options); // Potentially conflicting writes - db->Put(write_options, "ZZZ", "zzz"); - db->Put(write_options, "XXX", "xxx"); + ASSERT_OK(db->Put(write_options, "ZZZ", "zzz")); + ASSERT_OK(db->Put(write_options, "XXX", "xxx")); txn->SetSnapshot(); @@ -2622,12 +2625,12 @@ TEST_P(TransactionTest, ColumnFamiliesTest) { // Write some data to the db WriteBatch batch; - batch.Put("foo", "foo"); - batch.Put(handles[1], "AAA", "bar"); - batch.Put(handles[1], "AAAZZZ", "bar"); + ASSERT_OK(batch.Put("foo", "foo")); + ASSERT_OK(batch.Put(handles[1], "AAA", "bar")); + ASSERT_OK(batch.Put(handles[1], "AAAZZZ", "bar")); s = db->Write(write_options, &batch); ASSERT_OK(s); - db->Delete(write_options, handles[1], "AAAZZZ"); + ASSERT_OK(db->Delete(write_options, handles[1], "AAAZZZ")); // These keys do not conflict with existing writes since they're in // different column families @@ -2711,7 +2714,7 @@ TEST_P(TransactionTest, ColumnFamiliesTest) { ASSERT_TRUE(s.IsNotFound()); // Put a key which will conflict with the next txn using the previous snapshot - db->Put(write_options, handles[2], "foo", "000"); + ASSERT_OK(db->Put(write_options, handles[2], "foo", "000")); results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh, multiget_keys, &values); @@ -2775,13 +2778,13 @@ TEST_P(TransactionTest, MultiGetBatchedTest) { // Write some data to the db WriteBatch batch; - batch.Put(handles[1], "aaa", "val1"); - batch.Put(handles[1], "bbb", "val2"); - batch.Put(handles[1], "ccc", "val3"); - batch.Put(handles[1], "ddd", "foo"); - batch.Put(handles[1], "eee", "val5"); - batch.Put(handles[1], "fff", "val6"); - batch.Merge(handles[1], "ggg", "foo"); + ASSERT_OK(batch.Put(handles[1], "aaa", "val1")); + ASSERT_OK(batch.Put(handles[1], "bbb", "val2")); + ASSERT_OK(batch.Put(handles[1], "ccc", "val3")); + ASSERT_OK(batch.Put(handles[1], "ddd", "foo")); + ASSERT_OK(batch.Put(handles[1], "eee", "val5")); + ASSERT_OK(batch.Put(handles[1], "fff", "val6")); + ASSERT_OK(batch.Merge(handles[1], "ggg", "foo")); s = db->Write(write_options, &batch); ASSERT_OK(s); @@ -2870,7 +2873,7 @@ TEST_P(TransactionTest, MultiGetLargeBatchedTest) { WriteBatch batch; for (int i = 0; i < 3 * MultiGetContext::MAX_BATCH_SIZE; ++i) { std::string val = "val" + std::to_string(i); - batch.Put(handles[1], key_str[i], val); + ASSERT_OK(batch.Put(handles[1], key_str[i], val)); } s = db->Write(write_options, &batch); ASSERT_OK(s); @@ -3013,7 +3016,7 @@ TEST_P(TransactionTest, EmptyTest) { delete txn; txn = db->BeginTransaction(write_options); - txn->Rollback(); + ASSERT_OK(txn->Rollback()); delete txn; txn = db->BeginTransaction(write_options); @@ -3060,17 +3063,23 @@ TEST_P(TransactionTest, PredicateManyPreceders) { std::vector results = txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values); + ASSERT_EQ(results.size(), 3); + ASSERT_TRUE(results[0].IsNotFound()); ASSERT_TRUE(results[1].IsNotFound()); + ASSERT_TRUE(results[2].IsNotFound()); s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate ASSERT_TRUE(s.IsTimedOut()); - txn2->Rollback(); + ASSERT_OK(txn2->Rollback()); multiget_values.clear(); results = txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values); + ASSERT_EQ(results.size(), 3); + ASSERT_TRUE(results[0].IsNotFound()); ASSERT_TRUE(results[1].IsNotFound()); + ASSERT_TRUE(results[2].IsNotFound()); s = txn1->Commit(); ASSERT_OK(s); @@ -3096,7 +3105,7 @@ TEST_P(TransactionTest, PredicateManyPreceders) { s = txn2->GetForUpdate(read_options2, "4", &value); ASSERT_TRUE(s.IsBusy()); - txn2->Rollback(); + ASSERT_OK(txn2->Rollback()); delete txn1; delete txn2; @@ -3246,7 +3255,7 @@ TEST_P(TransactionTest, UntrackedWrites) { s = txn->PutUntracked("untracked", "0"); ASSERT_OK(s); - txn->Rollback(); + ASSERT_OK(txn->Rollback()); s = db->Get(read_options, "untracked", &value); ASSERT_TRUE(s.IsNotFound()); @@ -3389,7 +3398,7 @@ TEST_P(TransactionTest, ReinitializeTest) { s = txn1->Put("Z", "a"); ASSERT_OK(s); - txn1->Rollback(); + ASSERT_OK(txn1->Rollback()); s = txn1->Put("Y", "y"); ASSERT_OK(s); @@ -3450,7 +3459,7 @@ TEST_P(TransactionTest, Rollback) { s = txn2->Put("X", "2"); ASSERT_TRUE(s.IsTimedOut()); - txn1->Rollback(); + ASSERT_OK(txn1->Rollback()); delete txn1; // txn2 should now be able to write to X @@ -3861,7 +3870,7 @@ TEST_P(TransactionTest, SavepointTest) { // Rollback to beginning of txn s = txn->RollbackToSavePoint(); ASSERT_TRUE(s.IsNotFound()); - txn->Rollback(); + ASSERT_OK(txn->Rollback()); ASSERT_EQ(0, txn->GetNumPuts()); ASSERT_EQ(0, txn->GetNumDeletes()); @@ -4218,7 +4227,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest) { // Verify that A is now unlocked s = txn2->Put("A", "a2"); ASSERT_OK(s); - txn2->Commit(); + ASSERT_OK(txn2->Commit()); delete txn2; s = db->Get(read_options, "A", &value); ASSERT_OK(s); @@ -4244,7 +4253,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest) { s = txn2->Put("B", "b4"); ASSERT_TRUE(s.IsTimedOut()); - txn1->Rollback(); + ASSERT_OK(txn1->Rollback()); delete txn1; // Verify that A and B are no longer locked @@ -4432,7 +4441,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest2) { s = txn2->Put("G", "g3"); ASSERT_OK(s); - txn1->RollbackToSavePoint(); // rollback to 2 + ASSERT_OK(txn1->RollbackToSavePoint()); // rollback to 2 // Verify A,B,D,E,F are still locked and C,G,H are not. s = txn2->Put("A", "a3"); @@ -4479,7 +4488,7 @@ TEST_P(TransactionTest, UndoGetForUpdateTest2) { s = txn2->Put("H", "h3"); ASSERT_OK(s); - txn1->RollbackToSavePoint(); // rollback to 1 + ASSERT_OK(txn1->RollbackToSavePoint()); // rollback to 1 // Verify A,B,F are still locked and C,D,E,G,H are not. s = txn2->Put("A", "a3"); @@ -5246,6 +5255,7 @@ TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) { ReadOptions read_options; string value; s = db->Get(read_options, "X", &value); + ASSERT_OK(s); ASSERT_EQ("1", value); delete txn1; @@ -5281,6 +5291,7 @@ Status TransactionStressTestInserter( return inserter.GetLastStatus(); } } + inserter.GetLastStatus().PermitUncheckedError(); // Make sure at least some of the transactions succeeded. It's ok if // some failed due to write-conflicts. @@ -5300,20 +5311,20 @@ Status TransactionStressTestInserter( TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { // Small write buffer to trigger more compactions options.write_buffer_size = 1024; - ReOpenNoDelete(); - const size_t num_workers = 4; // worker threads count - const size_t num_checkers = 2; // checker threads count - const size_t num_slow_checkers = 2; // checker threads emulating backups - const size_t num_slow_workers = 1; // slow worker threads count - const size_t num_transactions_per_thread = 10000; - const uint16_t num_sets = 3; - const size_t num_keys_per_set = 100; + ASSERT_OK(ReOpenNoDelete()); + constexpr size_t num_workers = 4; // worker threads count + constexpr size_t num_checkers = 2; // checker threads count + constexpr size_t num_slow_checkers = 2; // checker threads emulating backups + constexpr size_t num_slow_workers = 1; // slow worker threads count + constexpr size_t num_transactions_per_thread = 10000; + constexpr uint16_t num_sets = 3; + constexpr size_t num_keys_per_set = 100; // Setting the key-space to be 100 keys should cause enough write-conflicts // to make this test interesting. std::vector threads; std::atomic finished = {0}; - bool TAKE_SNAPSHOT = true; + constexpr bool TAKE_SNAPSHOT = true; uint64_t time_seed = env->NowMicros(); printf("time_seed is %" PRIu64 "\n", time_seed); // would help to reproduce @@ -5329,9 +5340,8 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { Random64 rand(time_seed * thd_seed); // Verify that data is consistent while (finished < num_workers) { - Status s = RandomTransactionInserter::Verify( - db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand); - ASSERT_OK(s); + ASSERT_OK(RandomTransactionInserter::Verify( + db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand)); } }; std::function call_slow_checker = [&] { @@ -5412,7 +5422,7 @@ TEST_P(TransactionTest, MemoryLimitTest) { ASSERT_TRUE(s.IsMemoryLimit()); ASSERT_EQ(2, txn->GetNumPuts()); - txn->Rollback(); + ASSERT_OK(txn->Rollback()); delete txn; } @@ -5551,7 +5561,7 @@ TEST_P(TransactionTest, Optimizations) { ASSERT_OK(ReOpen()); WriteOptions write_options; WriteBatch batch; - batch.Put(Slice("k"), Slice("v1")); + ASSERT_OK(batch.Put(Slice("k"), Slice("v1"))); ASSERT_OK(db->Write(write_options, &batch)); ReadOptions ropt; @@ -5593,7 +5603,7 @@ class ThreeBytewiseComparator : public Comparator { TEST_P(TransactionTest, GetWithoutSnapshot) { WriteOptions write_options; std::atomic finish = {false}; - db->Put(write_options, "key", "value"); + ASSERT_OK(db->Put(write_options, "key", "value")); ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() { for (int i = 0; i < 100; i++) { TransactionOptions txn_options; @@ -5629,16 +5639,16 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); WriteOptions write_options; WriteBatch batch; - batch.Put(Slice("key"), Slice("value")); - batch.Put(Slice("key2"), Slice("value2")); + ASSERT_OK(batch.Put(Slice("key"), Slice("value"))); + ASSERT_OK(batch.Put(Slice("key2"), Slice("value2"))); // duplicate the keys - batch.Put(Slice("key"), Slice("value3")); + ASSERT_OK(batch.Put(Slice("key"), Slice("value3"))); // duplicate the 2nd key. It should not be counted duplicate since a // sub-patch is cut after the last duplicate. - batch.Put(Slice("key2"), Slice("value4")); + ASSERT_OK(batch.Put(Slice("key2"), Slice("value4"))); // duplicate the keys but in a different cf. It should not be counted as // duplicate keys - batch.Put(cf_handle, Slice("key"), Slice("value5")); + ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value5"))); ASSERT_OK(db->Write(write_options, &batch)); @@ -5665,11 +5675,11 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); WriteOptions write_options; WriteBatch batch; - batch.Put(cf_handle, Slice("key"), Slice("value")); + ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value"))); // The first three bytes are the same, do it must be counted as duplicate - batch.Put(cf_handle, Slice("key2"), Slice("value2")); + ASSERT_OK(batch.Put(cf_handle, Slice("key2"), Slice("value2"))); // check for 2nd duplicate key in cf with non-default comparator - batch.Put(cf_handle, Slice("key2b"), Slice("value2b")); + ASSERT_OK(batch.Put(cf_handle, Slice("key2b"), Slice("value2b"))); ASSERT_OK(db->Write(write_options, &batch)); // The value must be the most recent value for all the keys equal to "key", @@ -5834,10 +5844,10 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value"))); WriteBatch batch; // Merge more than max_successive_merges times - batch.Merge(cf_handle, Slice("key"), Slice("1")); - batch.Merge(cf_handle, Slice("key"), Slice("2")); - batch.Merge(cf_handle, Slice("key"), Slice("3")); - batch.Merge(cf_handle, Slice("key"), Slice("4")); + ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("1"))); + ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("2"))); + ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("3"))); + ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("4"))); ASSERT_OK(db->Write(write_options, &batch)); ReadOptions read_options; string value; @@ -5916,10 +5926,10 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Prepare()); delete txn0; // This will check the asserts inside recovery code - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 - static_cast_with_check(db->GetRootDB()) - ->TEST_FlushMemTable(true, false, handles[1]); + ASSERT_OK(static_cast_with_check(db->GetRootDB()) + ->TEST_FlushMemTable(true, false, handles[1])); reinterpret_cast(db)->TEST_Crash(); ASSERT_OK(ReOpenNoDelete(cfds, &handles)); txn0 = db->GetTransactionByName("xid"); @@ -5956,8 +5966,8 @@ TEST_P(TransactionTest, DuplicateKeys) { // This will check the asserts inside recovery code ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 - static_cast_with_check(db->GetRootDB()) - ->TEST_FlushMemTable(true, false, handles[1]); + ASSERT_OK(static_cast_with_check(db->GetRootDB()) + ->TEST_FlushMemTable(true, false, handles[1])); reinterpret_cast(db)->TEST_Crash(); ASSERT_OK(ReOpenNoDelete(cfds, &handles)); txn0 = db->GetTransactionByName("xid"); @@ -5989,8 +5999,8 @@ TEST_P(TransactionTest, DuplicateKeys) { // This will check the asserts inside recovery code ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 - static_cast_with_check(db->GetRootDB()) - ->TEST_FlushMemTable(true, false, handles[1]); + ASSERT_OK(static_cast_with_check(db->GetRootDB()) + ->TEST_FlushMemTable(true, false, handles[1])); reinterpret_cast(db)->TEST_Crash(); ASSERT_OK(ReOpenNoDelete(cfds, &handles)); txn0 = db->GetTransactionByName("xid"); @@ -6016,8 +6026,8 @@ TEST_P(TransactionTest, DuplicateKeys) { // This will check the asserts inside recovery code ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 - static_cast_with_check(db->GetRootDB()) - ->TEST_FlushMemTable(true, false, handles[1]); + ASSERT_OK(static_cast_with_check(db->GetRootDB()) + ->TEST_FlushMemTable(true, false, handles[1])); reinterpret_cast(db)->TEST_Crash(); ASSERT_OK(ReOpenNoDelete(cfds, &handles)); txn0 = db->GetTransactionByName("xid"); @@ -6043,8 +6053,8 @@ TEST_P(TransactionTest, DuplicateKeys) { // This will check the asserts inside recovery code ASSERT_OK(db->FlushWAL(true)); // Flush only cf 1 - static_cast_with_check(db->GetRootDB()) - ->TEST_FlushMemTable(true, false, handles[1]); + ASSERT_OK(static_cast_with_check(db->GetRootDB()) + ->TEST_FlushMemTable(true, false, handles[1])); reinterpret_cast(db)->TEST_Crash(); ASSERT_OK(ReOpenNoDelete(cfds, &handles)); txn0 = db->GetTransactionByName("xid"); @@ -6075,7 +6085,7 @@ TEST_P(TransactionTest, ReseekOptimization) { write_options.sync = true; write_options.disableWAL = false; ColumnFamilyDescriptor cfd; - db->DefaultColumnFamily()->GetDescriptor(&cfd); + ASSERT_OK(db->DefaultColumnFamily()->GetDescriptor(&cfd)); auto max_skip = cfd.options.max_sequential_skip_in_iterations; ASSERT_OK(db->Put(write_options, Slice("foo0"), Slice("initv"))); @@ -6113,7 +6123,7 @@ TEST_P(TransactionTest, ReseekOptimization) { } ASSERT_EQ(cnt, 2); delete iter; - txn0->Rollback(); + ASSERT_OK(txn0->Rollback()); delete txn0; } @@ -6125,7 +6135,7 @@ TEST_P(TransactionTest, DoubleCrashInRecovery) { for (const bool write_after_recovery : {false, true}) { options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; options.manual_wal_flush = manual_wal_flush; - ReOpen(); + ASSERT_OK(ReOpen()); std::string cf_name = "two"; ColumnFamilyOptions cf_options; ColumnFamilyHandle* cf_handle = nullptr; @@ -6140,12 +6150,12 @@ TEST_P(TransactionTest, DoubleCrashInRecovery) { ASSERT_OK(txn->Prepare()); FlushOptions flush_ops; - db->Flush(flush_ops); + ASSERT_OK(db->Flush(flush_ops)); // Now we have a log that cannot be deleted ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1")); // Flush only the 2nd cf - db->Flush(flush_ops, cf_handle); + ASSERT_OK(db->Flush(flush_ops, cf_handle)); // The value is large enough to be touched by the corruption we ingest // below. @@ -6157,7 +6167,7 @@ TEST_P(TransactionTest, DoubleCrashInRecovery) { // key/value not touched by corruption ASSERT_OK(db->Put(write_options, "foo4", "bar4")); - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); DBImpl* db_impl = static_cast_with_check(db->GetRootDB()); uint64_t wal_file_id = db_impl->TEST_LogfileNumber(); std::string fname = LogFileName(dbname, wal_file_id); @@ -6191,7 +6201,7 @@ TEST_P(TransactionTest, DoubleCrashInRecovery) { } // Persist data written to WAL during recovery or by the last Put - db->FlushWAL(true); + ASSERT_OK(db->FlushWAL(true)); // 2nd crash to recover while having a valid log after the corrupted one. ASSERT_OK(ReOpenNoDelete(column_families, &handles)); assert(db != nullptr); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index de9b4f91c..e80f10239 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -104,8 +104,9 @@ Status WritePreparedTxn::PrepareInternal() { write_options.disableWAL = false; const bool WRITE_AFTER_COMMIT = true; const bool kFirstPrepareBatch = true; - WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, - !WRITE_AFTER_COMMIT); + auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), + name_, !WRITE_AFTER_COMMIT); + assert(s.ok()); // For each duplicate key we account for a new sub-batch prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); // Having AddPrepared in the PreReleaseCallback allows in-order addition of @@ -116,10 +117,10 @@ Status WritePreparedTxn::PrepareInternal() { db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch); const bool DISABLE_MEMTABLE = true; uint64_t seq_used = kMaxSequenceNumber; - Status s = db_impl_->WriteImpl( - write_options, GetWriteBatch()->GetWriteBatch(), - /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE, - &seq_used, prepare_batch_cnt_, &add_prepared_callback); + s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + /*callback*/ nullptr, &log_number_, /*log ref*/ 0, + !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_, + &add_prepared_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); auto prepare_seq = seq_used; SetId(prepare_seq); @@ -144,7 +145,8 @@ Status WritePreparedTxn::CommitInternal() { // The Memtable will ignore the Commit marker in non-recovery mode WriteBatch* working_batch = GetCommitTimeWriteBatch(); const bool empty = working_batch->Count() == 0; - WriteBatchInternal::MarkCommit(working_batch, name_); + auto s = WriteBatchInternal::MarkCommit(working_batch, name_); + assert(s.ok()); const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; if (!empty && for_recovery) { @@ -162,7 +164,7 @@ Status WritePreparedTxn::CommitInternal() { ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, "Duplicate key overhead"); SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); - auto s = working_batch->Iterate(&counter); + s = working_batch->Iterate(&counter); assert(s.ok()); commit_batch_cnt = counter.BatchCount(); } @@ -188,9 +190,9 @@ Status WritePreparedTxn::CommitInternal() { // redundantly reference the log that contains the prepared data. const uint64_t zero_log_number = 0ull; size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; - auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, - zero_log_number, disable_memtable, &seq_used, - batch_cnt, pre_release_callback); + s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, + zero_log_number, disable_memtable, &seq_used, + batch_cnt, pre_release_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); const SequenceNumber commit_batch_seq = seq_used; if (LIKELY(do_one_write || !s.ok())) { @@ -217,9 +219,11 @@ Status WritePreparedTxn::CommitInternal() { wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData, commit_batch_seq, commit_batch_cnt); WriteBatch empty_batch; - empty_batch.PutLogData(Slice()); + s = empty_batch.PutLogData(Slice()); + assert(s.ok()); // In the absence of Prepare markers, use Noop as a batch separator - WriteBatchInternal::InsertNoop(&empty_batch); + s = WriteBatchInternal::InsertNoop(&empty_batch); + assert(s.ok()); const bool DISABLE_MEMTABLE = true; const size_t ONE_BATCH = 1; const uint64_t NO_REF_LOG = 0; @@ -347,12 +351,12 @@ Status WritePreparedTxn::RollbackInternal() { wpt_db_->txn_db_options_.rollback_merge_operands, roptions); auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); - assert(s.ok()); if (!s.ok()) { return s; } // The Rollback marker will be used as a batch separator - WriteBatchInternal::MarkRollback(&rollback_batch, name_); + s = WriteBatchInternal::MarkRollback(&rollback_batch, name_); + assert(s.ok()); bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; const bool DISABLE_MEMTABLE = true; const uint64_t NO_REF_LOG = 0; @@ -402,9 +406,11 @@ Status WritePreparedTxn::RollbackInternal() { WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare( wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_); WriteBatch empty_batch; - empty_batch.PutLogData(Slice()); + s = empty_batch.PutLogData(Slice()); + assert(s.ok()); // In the absence of Prepare markers, use Noop as a batch separator - WriteBatchInternal::InsertNoop(&empty_batch); + s = WriteBatchInternal::InsertNoop(&empty_batch); + assert(s.ok()); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, &update_commit_map_with_prepare); diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index e6e42d7e1..a1b67bd1f 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -168,7 +168,8 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; WriteOptions write_options(write_options_orig); // In the absence of Prepare markers, use Noop as a batch separator - WriteBatchInternal::InsertNoop(batch); + auto s = WriteBatchInternal::InsertNoop(batch); + assert(s.ok()); const bool DISABLE_MEMTABLE = true; const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; @@ -189,9 +190,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, } else { pre_release_callback = &add_prepared_callback; } - auto s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, - no_log_ref, !DISABLE_MEMTABLE, &seq_used, - batch_cnt, pre_release_callback); + s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, no_log_ref, + !DISABLE_MEMTABLE, &seq_used, batch_cnt, + pre_release_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); uint64_t prepare_seq = seq_used; if (txn != nullptr) { diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index ed2600026..29f4bb6a2 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -279,7 +279,9 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) { static std::atomic_ullong autogen_id{0}; // To avoid changing all tests to call SetName, just autogenerate one. if (wupt_db_->txn_db_options_.autogenerate_name) { - SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1))); + auto s = + SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1))); + assert(s.ok()); } else #endif { @@ -354,8 +356,9 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) { const bool WRITE_AFTER_COMMIT = true; const bool first_prepare_batch = log_number_ == 0; // MarkEndPrepare will change Noop marker to the appropriate marker. - WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, - !WRITE_AFTER_COMMIT, !prepared); + s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), + name_, !WRITE_AFTER_COMMIT, !prepared); + assert(s.ok()); // For each duplicate key we account for a new sub-batch prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); // AddPrepared better to be called in the pre-release callback otherwise there @@ -541,7 +544,8 @@ Status WriteUnpreparedTxn::CommitInternal() { // will ignore the Commit marker in non-recovery mode WriteBatch* working_batch = GetCommitTimeWriteBatch(); const bool empty = working_batch->Count() == 0; - WriteBatchInternal::MarkCommit(working_batch, name_); + auto s = WriteBatchInternal::MarkCommit(working_batch, name_); + assert(s.ok()); const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; if (!empty && for_recovery) { @@ -557,7 +561,7 @@ Status WriteUnpreparedTxn::CommitInternal() { ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, "Duplicate key overhead"); SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); - auto s = working_batch->Iterate(&counter); + s = working_batch->Iterate(&counter); assert(s.ok()); commit_batch_cnt = counter.BatchCount(); } @@ -583,9 +587,9 @@ Status WriteUnpreparedTxn::CommitInternal() { // need to redundantly reference the log that contains the prepared data. const uint64_t zero_log_number = 0ull; size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; - auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, - zero_log_number, disable_memtable, &seq_used, - batch_cnt, pre_release_callback); + s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, + zero_log_number, disable_memtable, &seq_used, + batch_cnt, pre_release_callback); assert(!s.ok() || seq_used != kMaxSequenceNumber); const SequenceNumber commit_batch_seq = seq_used; if (LIKELY(do_one_write || !s.ok())) { @@ -619,9 +623,11 @@ Status WriteUnpreparedTxn::CommitInternal() { // Update commit map only from the 2nd queue WriteBatch empty_batch; - empty_batch.PutLogData(Slice()); + s = empty_batch.PutLogData(Slice()); + assert(s.ok()); // In the absence of Prepare markers, use Noop as a batch separator - WriteBatchInternal::InsertNoop(&empty_batch); + s = WriteBatchInternal::InsertNoop(&empty_batch); + assert(s.ok()); const bool DISABLE_MEMTABLE = true; const size_t ONE_BATCH = 1; const uint64_t NO_REF_LOG = 0; @@ -719,10 +725,14 @@ Status WriteUnpreparedTxn::RollbackInternal() { // TODO(lth): We write rollback batch all in a single batch here, but this // should be subdivded into multiple batches as well. In phase 2, when key // sets are read from WAL, this will happen naturally. - WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions); + s = WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions); + if (!s.ok()) { + return s; + } // The Rollback marker will be used as a batch separator - WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_); + s = WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_); + assert(s.ok()); bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; const bool DISABLE_MEMTABLE = true; const uint64_t NO_REF_LOG = 0; @@ -778,9 +788,11 @@ Status WriteUnpreparedTxn::RollbackInternal() { prepare_seq); WriteBatch empty_batch; const size_t ONE_BATCH = 1; - empty_batch.PutLogData(Slice()); + s = empty_batch.PutLogData(Slice()); + assert(s.ok()); // In the absence of Prepare markers, use Noop as a batch separator - WriteBatchInternal::InsertNoop(&empty_batch); + s = WriteBatchInternal::InsertNoop(&empty_batch); + assert(s.ok()); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, &update_commit_map_with_rollback_batch); @@ -863,11 +875,13 @@ Status WriteUnpreparedTxn::RollbackToSavePointInternal() { WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, top.unprep_seqs_, kBackedByDBSnapshot); - WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions); + s = WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions); + if (!s.ok()) { + return s; + } const bool kPrepared = true; s = FlushWriteBatchToDBInternal(!kPrepared); - assert(s.ok()); if (!s.ok()) { return s; }