rocksdb/utilities/transactions/write_unprepared_transaction_test.cc
Andrew Kryczka ed8eb436db Move slow valgrind tests behind -DROCKSDB_FULL_VALGRIND_RUN (#8475)
Summary:
Various tests had disabled valgrind due to it slowing down and timing
out (as is the case right now) the CI runs. Where a test was disabled with no comment,
I assumed slowness was the cause. For these tests that were slow under
valgrind, as well as the ones identified in https://github.com/facebook/rocksdb/issues/8352, this PR moves them
behind the compiler flag `-DROCKSDB_FULL_VALGRIND_RUN`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8475

Test Plan: running `make full_valgrind_test`, `make valgrind_test`, `make check`; will verify they appear working correctly

Reviewed By: jay-zhuang

Differential Revision: D29504843

Pulled By: ajkr

fbshipit-source-id: 2aac90749cfbd30d5ce11cb29a07a1b9314eeea7
2021-07-07 11:14:05 -07:00

787 lines
26 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/transactions/transaction_test.h"
#include "utilities/transactions/write_unprepared_txn.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
namespace ROCKSDB_NAMESPACE {
class WriteUnpreparedTransactionTestBase : public TransactionTestBase {
public:
WriteUnpreparedTransactionTestBase(bool use_stackable_db,
bool two_write_queue,
TxnDBWritePolicy write_policy)
: TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
kOrderedWrite) {}
};
class WriteUnpreparedTransactionTest
: public WriteUnpreparedTransactionTestBase,
virtual public ::testing::WithParamInterface<
std::tuple<bool, bool, TxnDBWritePolicy>> {
public:
WriteUnpreparedTransactionTest()
: WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()),
std::get<1>(GetParam()),
std::get<2>(GetParam())){}
};
INSTANTIATE_TEST_CASE_P(
WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest,
::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED),
std::make_tuple(false, true, WRITE_UNPREPARED)));
enum StressAction { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT };
class WriteUnpreparedStressTest : public WriteUnpreparedTransactionTestBase,
virtual public ::testing::WithParamInterface<
std::tuple<bool, StressAction>> {
public:
WriteUnpreparedStressTest()
: WriteUnpreparedTransactionTestBase(false, std::get<0>(GetParam()),
WRITE_UNPREPARED),
action_(std::get<1>(GetParam())) {}
StressAction action_;
};
INSTANTIATE_TEST_CASE_P(
WriteUnpreparedStressTest, WriteUnpreparedStressTest,
::testing::Values(std::make_tuple(false, NO_SNAPSHOT),
std::make_tuple(false, RO_SNAPSHOT),
std::make_tuple(false, REFRESH_SNAPSHOT),
std::make_tuple(true, NO_SNAPSHOT),
std::make_tuple(true, RO_SNAPSHOT),
std::make_tuple(true, REFRESH_SNAPSHOT)));
TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
// The following tests checks whether reading your own write for
// a transaction works for write unprepared, when there are uncommitted
// values written into DB.
auto verify_state = [](Iterator* iter, const std::string& key,
const std::string& value) {
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(key, iter->key().ToString());
ASSERT_EQ(value, iter->value().ToString());
};
// Test always reseeking vs never reseeking.
for (uint64_t max_skip : {0, std::numeric_limits<int>::max()}) {
options.max_sequential_skip_in_iterations = max_skip;
options.disable_auto_compactions = true;
ASSERT_OK(ReOpen());
TransactionOptions txn_options;
WriteOptions woptions;
ReadOptions roptions;
ASSERT_OK(db->Put(woptions, "a", ""));
ASSERT_OK(db->Put(woptions, "b", ""));
Transaction* txn = db->BeginTransaction(woptions, txn_options);
WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
txn->SetSnapshot();
for (int i = 0; i < 5; i++) {
std::string stored_value = "v" + ToString(i);
ASSERT_OK(txn->Put("a", stored_value));
ASSERT_OK(txn->Put("b", stored_value));
ASSERT_OK(wup_txn->FlushWriteBatchToDB(false));
// Test Get()
std::string value;
ASSERT_OK(txn->Get(roptions, "a", &value));
ASSERT_EQ(value, stored_value);
ASSERT_OK(txn->Get(roptions, "b", &value));
ASSERT_EQ(value, stored_value);
// Test Next()
auto iter = txn->GetIterator(roptions);
iter->Seek("a");
verify_state(iter, "a", stored_value);
iter->Next();
verify_state(iter, "b", stored_value);
iter->SeekToFirst();
verify_state(iter, "a", stored_value);
iter->Next();
verify_state(iter, "b", stored_value);
delete iter;
// Test Prev()
iter = txn->GetIterator(roptions);
iter->SeekForPrev("b");
verify_state(iter, "b", stored_value);
iter->Prev();
verify_state(iter, "a", stored_value);
iter->SeekToLast();
verify_state(iter, "b", stored_value);
iter->Prev();
verify_state(iter, "a", stored_value);
delete iter;
}
delete txn;
}
}
#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) {
// This is a stress test where different threads are writing random keys, and
// then before committing or aborting the transaction, it validates to see
// that it can read the keys it wrote, and the keys it did not write respect
// the snapshot. To avoid row lock contention (and simply stressing the
// locking system), each thread is mostly only writing to its own set of keys.
const uint32_t kNumIter = 1000;
const uint32_t kNumThreads = 10;
const uint32_t kNumKeys = 5;
// Test with
// 1. no snapshots set
// 2. snapshot set on ReadOptions
// 3. snapshot set, and refreshing after every write.
StressAction a = action_;
WriteOptions write_options;
txn_db_options.transaction_lock_timeout = -1;
options.disable_auto_compactions = true;
ASSERT_OK(ReOpen());
std::vector<std::string> keys;
for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) {
keys.push_back("k" + ToString(k));
}
RandomShuffle(keys.begin(), keys.end());
// This counter will act as a "sequence number" to help us validate
// visibility logic with snapshots. If we had direct access to the seqno of
// snapshots and key/values, then we should directly compare those instead.
std::atomic<int64_t> counter(0);
std::function<void(uint32_t)> stress_thread = [&](int id) {
size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
Random64 rnd(static_cast<uint32_t>(tid));
Transaction* txn;
TransactionOptions txn_options;
// batch_size of 1 causes writes to DB for every marker.
txn_options.write_batch_flush_threshold = 1;
ReadOptions read_options;
for (uint32_t i = 0; i < kNumIter; i++) {
std::set<std::string> owned_keys(keys.begin() + id * kNumKeys,
keys.begin() + (id + 1) * kNumKeys);
// Add unowned keys to make the workload more interesting, but this
// increases row lock contention, so just do it sometimes.
if (rnd.OneIn(2)) {
owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]);
}
txn = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn->SetName(ToString(id)));
txn->SetSnapshot();
if (a >= RO_SNAPSHOT) {
read_options.snapshot = txn->GetSnapshot();
ASSERT_TRUE(read_options.snapshot != nullptr);
}
uint64_t buf[2];
buf[0] = id;
// When scanning through the database, make sure that all unprepared
// keys have value >= snapshot and all other keys have value < snapshot.
int64_t snapshot_num = counter.fetch_add(1);
Status s;
for (const auto& key : owned_keys) {
buf[1] = counter.fetch_add(1);
s = txn->Put(key, Slice((const char*)buf, sizeof(buf)));
if (!s.ok()) {
break;
}
if (a == REFRESH_SNAPSHOT) {
txn->SetSnapshot();
read_options.snapshot = txn->GetSnapshot();
snapshot_num = counter.fetch_add(1);
}
}
// Failure is possible due to snapshot validation. In this case,
// rollback and move onto next iteration.
if (!s.ok()) {
ASSERT_TRUE(s.IsBusy());
ASSERT_OK(txn->Rollback());
delete txn;
continue;
}
auto verify_key = [&owned_keys, &a, &id, &snapshot_num](
const std::string& key, const std::string& value) {
if (owned_keys.count(key) > 0) {
ASSERT_EQ(value.size(), 16);
// Since this key is part of owned_keys, then this key must be
// unprepared by this transaction identified by 'id'
ASSERT_EQ(((int64_t*)value.c_str())[0], id);
if (a == REFRESH_SNAPSHOT) {
// If refresh snapshot is true, then the snapshot is refreshed
// after every Put(), meaning that the current snapshot in
// snapshot_num must be greater than the "seqno" of any keys
// written by the current transaction.
ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
} else {
// If refresh snapshot is not on, then the snapshot was taken at
// the beginning of the transaction, meaning all writes must come
// after snapshot_num
ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num);
}
} else if (a >= RO_SNAPSHOT) {
// If this is not an unprepared key, just assert that the key
// "seqno" is smaller than the snapshot seqno.
ASSERT_EQ(value.size(), 16);
ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
}
};
// Validate Get()/Next()/Prev(). Do only one of them to save time, and
// reduce lock contention.
switch (rnd.Uniform(3)) {
case 0: // Validate Get()
{
for (const auto& key : keys) {
std::string value;
s = txn->Get(read_options, Slice(key), &value);
if (!s.ok()) {
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(owned_keys.count(key), 0);
} else {
verify_key(key, value);
}
}
break;
}
case 1: // Validate Next()
{
Iterator* iter = txn->GetIterator(read_options);
ASSERT_OK(iter->status());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
verify_key(iter->key().ToString(), iter->value().ToString());
}
ASSERT_OK(iter->status());
delete iter;
break;
}
case 2: // Validate Prev()
{
Iterator* iter = txn->GetIterator(read_options);
ASSERT_OK(iter->status());
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
verify_key(iter->key().ToString(), iter->value().ToString());
}
ASSERT_OK(iter->status());
delete iter;
break;
}
default:
FAIL();
}
if (rnd.OneIn(2)) {
ASSERT_OK(txn->Commit());
} else {
ASSERT_OK(txn->Rollback());
}
delete txn;
}
};
std::vector<port::Thread> threads;
for (uint32_t i = 0; i < kNumThreads; i++) {
threads.emplace_back(stress_thread, i);
}
for (auto& t : threads) {
t.join();
}
}
#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
// This tests how write unprepared behaves during recovery when the DB crashes
// after a transaction has either been unprepared or prepared, and tests if
// the changes are correctly applied for prepared transactions if we decide to
// rollback/commit.
TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
WriteOptions write_options;
write_options.disableWAL = false;
TransactionOptions txn_options;
std::vector<Transaction*> prepared_trans;
WriteUnpreparedTxnDB* wup_db;
options.disable_auto_compactions = true;
enum Action { UNPREPARED, ROLLBACK, COMMIT };
// batch_size of 1 causes writes to DB for every marker.
for (size_t batch_size : {1, 1000000}) {
txn_options.write_batch_flush_threshold = batch_size;
for (bool empty : {true, false}) {
for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) {
for (int num_batches = 1; num_batches < 10; num_batches++) {
// Reset database.
prepared_trans.clear();
ASSERT_OK(ReOpen());
wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
if (!empty) {
for (int i = 0; i < num_batches; i++) {
ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i),
"before value" + ToString(i)));
}
}
// Write num_batches unprepared batches.
Transaction* txn = db->BeginTransaction(write_options, txn_options);
WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
ASSERT_OK(txn->SetName("xid"));
for (int i = 0; i < num_batches; i++) {
ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i)));
if (txn_options.write_batch_flush_threshold == 1) {
// WriteUnprepared will check write_batch_flush_threshold and
// possibly flush before appending to the write batch. No flush
// will happen at the first write because the batch is still
// empty, so after k puts, there should be k-1 flushed batches.
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
} else {
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
}
}
if (a == UNPREPARED) {
// This is done to prevent the destructor from rolling back the
// transaction for us, since we want to pretend we crashed and
// test that recovery does the rollback.
wup_txn->unprep_seqs_.clear();
} else {
ASSERT_OK(txn->Prepare());
}
delete txn;
// Crash and run recovery code paths.
ASSERT_OK(wup_db->db_impl_->FlushWAL(true));
wup_db->TEST_Crash();
ASSERT_OK(ReOpenNoDelete());
assert(db != nullptr);
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1);
if (a == ROLLBACK) {
ASSERT_OK(prepared_trans[0]->Rollback());
delete prepared_trans[0];
} else if (a == COMMIT) {
ASSERT_OK(prepared_trans[0]->Commit());
delete prepared_trans[0];
}
Iterator* iter = db->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
iter->SeekToFirst();
// Check that DB has before values.
if (!empty || a == COMMIT) {
for (int i = 0; i < num_batches; i++) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
if (a == COMMIT) {
ASSERT_EQ(iter->value().ToString(), "value" + ToString(i));
} else {
ASSERT_EQ(iter->value().ToString(),
"before value" + ToString(i));
}
iter->Next();
}
}
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
}
}
}
}
// Basic test to see that unprepared batch gets written to DB when batch size
// is exceeded. It also does some basic checks to see if commit/rollback works
// as expected for write unprepared.
TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
WriteOptions write_options;
TransactionOptions txn_options;
const int kNumKeys = 10;
// batch_size of 1 causes writes to DB for every marker.
for (size_t batch_size : {1, 1000000}) {
txn_options.write_batch_flush_threshold = batch_size;
for (bool prepare : {false, true}) {
for (bool commit : {false, true}) {
ASSERT_OK(ReOpen());
Transaction* txn = db->BeginTransaction(write_options, txn_options);
WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
ASSERT_OK(txn->SetName("xid"));
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(txn->Put("k" + ToString(i), "v" + ToString(i)));
if (txn_options.write_batch_flush_threshold == 1) {
// WriteUnprepared will check write_batch_flush_threshold and
// possibly flush before appending to the write batch. No flush will
// happen at the first write because the batch is still empty, so
// after k puts, there should be k-1 flushed batches.
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
} else {
ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
}
}
if (prepare) {
ASSERT_OK(txn->Prepare());
}
Iterator* iter = db->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
iter->SeekToFirst();
assert(!iter->Valid());
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
if (commit) {
ASSERT_OK(txn->Commit());
} else {
ASSERT_OK(txn->Rollback());
}
delete txn;
iter = db->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
iter->SeekToFirst();
for (int i = 0; i < (commit ? kNumKeys : 0); i++) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
ASSERT_EQ(iter->value().ToString(), "v" + ToString(i));
iter->Next();
}
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
}
}
}
// Test whether logs containing unprepared/prepared batches are kept even
// after memtable finishes flushing, and whether they are removed when
// transaction commits/aborts.
//
// TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests.
TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
WriteOptions write_options;
TransactionOptions txn_options;
// batch_size of 1 causes writes to DB for every marker.
txn_options.write_batch_flush_threshold = 1;
const int kNumKeys = 10;
WriteOptions wopts;
wopts.sync = true;
for (bool prepare : {false, true}) {
for (bool commit : {false, true}) {
ASSERT_OK(ReOpen());
auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
auto db_impl = wup_db->db_impl_;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn1->SetName("xid1"));
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn2->SetName("xid2"));
// Spread this transaction across multiple log files.
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(txn1->Put("k1" + ToString(i), "v" + ToString(i)));
if (i >= kNumKeys / 2) {
ASSERT_OK(txn2->Put("k2" + ToString(i), "v" + ToString(i)));
}
if (i > 0) {
ASSERT_OK(db_impl->TEST_SwitchWAL());
}
}
ASSERT_GT(txn1->GetLogNumber(), 0);
ASSERT_GT(txn2->GetLogNumber(), 0);
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
txn1->GetLogNumber());
ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
if (prepare) {
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn2->Prepare());
}
ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
txn1->GetLogNumber());
if (commit) {
ASSERT_OK(txn1->Commit());
} else {
ASSERT_OK(txn1->Rollback());
}
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
txn2->GetLogNumber());
if (commit) {
ASSERT_OK(txn2->Commit());
} else {
ASSERT_OK(txn2->Rollback());
}
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
delete txn1;
delete txn2;
}
}
}
TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) {
WriteOptions woptions;
TransactionOptions txn_options;
txn_options.write_batch_flush_threshold = 1;
Transaction* txn = db->BeginTransaction(woptions, txn_options);
// Do some writes with no snapshot
ASSERT_OK(txn->Put("a", "a"));
ASSERT_OK(txn->Put("b", "b"));
ASSERT_OK(txn->Put("c", "c"));
// Test that it is still possible to create iterators after writes with no
// snapshot, if iterator snapshot is fresh enough.
ReadOptions roptions;
auto iter = txn->GetIterator(roptions);
ASSERT_OK(iter->status());
int keys = 0;
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), keys++) {
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
}
ASSERT_EQ(keys, 3);
ASSERT_OK(iter->status());
delete iter;
delete txn;
}
// Test whether write to a transaction while iterating is supported.
TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
WriteOptions woptions;
TransactionOptions txn_options;
txn_options.write_batch_flush_threshold = 1;
enum Action { DO_DELETE, DO_UPDATE };
for (Action a : {DO_DELETE, DO_UPDATE}) {
for (int i = 0; i < 100; i++) {
ASSERT_OK(db->Put(woptions, ToString(i), ToString(i)));
}
Transaction* txn = db->BeginTransaction(woptions, txn_options);
// write_batch_ now contains 1 key.
ASSERT_OK(txn->Put("9", "a"));
ReadOptions roptions;
auto iter = txn->GetIterator(roptions);
ASSERT_OK(iter->status());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
if (iter->key() == "9") {
ASSERT_EQ(iter->value().ToString(), "a");
} else {
ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
}
if (a == DO_DELETE) {
ASSERT_OK(txn->Delete(iter->key()));
} else {
ASSERT_OK(txn->Put(iter->key(), "b"));
}
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_OK(txn->Commit());
iter = db->NewIterator(roptions);
ASSERT_OK(iter->status());
if (a == DO_DELETE) {
// Check that db is empty.
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
} else {
int keys = 0;
// Check that all values are updated to b.
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), keys++) {
ASSERT_OK(iter->status());
ASSERT_EQ(iter->value().ToString(), "b");
}
ASSERT_EQ(keys, 100);
}
ASSERT_OK(iter->status());
delete iter;
delete txn;
}
}
// Test that using an iterator after transaction clear is not supported
TEST_P(WriteUnpreparedTransactionTest, IterateAfterClear) {
WriteOptions woptions;
TransactionOptions txn_options;
txn_options.write_batch_flush_threshold = 1;
enum Action { kCommit, kRollback };
for (Action a : {kCommit, kRollback}) {
for (int i = 0; i < 100; i++) {
ASSERT_OK(db->Put(woptions, ToString(i), ToString(i)));
}
Transaction* txn = db->BeginTransaction(woptions, txn_options);
ASSERT_OK(txn->Put("9", "a"));
ReadOptions roptions;
auto iter1 = txn->GetIterator(roptions);
auto iter2 = txn->GetIterator(roptions);
iter1->SeekToFirst();
iter2->Seek("9");
// Check that iterators are valid before transaction finishes.
ASSERT_TRUE(iter1->Valid());
ASSERT_TRUE(iter2->Valid());
ASSERT_OK(iter1->status());
ASSERT_OK(iter2->status());
if (a == kCommit) {
ASSERT_OK(txn->Commit());
} else {
ASSERT_OK(txn->Rollback());
}
// Check that iterators are invalidated after transaction finishes.
ASSERT_FALSE(iter1->Valid());
ASSERT_FALSE(iter2->Valid());
ASSERT_TRUE(iter1->status().IsInvalidArgument());
ASSERT_TRUE(iter2->status().IsInvalidArgument());
delete iter1;
delete iter2;
delete txn;
}
}
TEST_P(WriteUnpreparedTransactionTest, SavePoint) {
WriteOptions woptions;
TransactionOptions txn_options;
txn_options.write_batch_flush_threshold = 1;
Transaction* txn = db->BeginTransaction(woptions, txn_options);
txn->SetSavePoint();
ASSERT_OK(txn->Put("a", "a"));
ASSERT_OK(txn->Put("b", "b"));
ASSERT_OK(txn->Commit());
ReadOptions roptions;
std::string value;
ASSERT_OK(txn->Get(roptions, "a", &value));
ASSERT_EQ(value, "a");
ASSERT_OK(txn->Get(roptions, "b", &value));
ASSERT_EQ(value, "b");
delete txn;
}
TEST_P(WriteUnpreparedTransactionTest, UntrackedKeys) {
WriteOptions woptions;
TransactionOptions txn_options;
txn_options.write_batch_flush_threshold = 1;
Transaction* txn = db->BeginTransaction(woptions, txn_options);
auto wb = txn->GetWriteBatch()->GetWriteBatch();
ASSERT_OK(txn->Put("a", "a"));
ASSERT_OK(wb->Put("a_untrack", "a_untrack"));
txn->SetSavePoint();
ASSERT_OK(txn->Put("b", "b"));
ASSERT_OK(txn->Put("b_untrack", "b_untrack"));
ReadOptions roptions;
std::string value;
ASSERT_OK(txn->Get(roptions, "a", &value));
ASSERT_EQ(value, "a");
ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
ASSERT_EQ(value, "a_untrack");
ASSERT_OK(txn->Get(roptions, "b", &value));
ASSERT_EQ(value, "b");
ASSERT_OK(txn->Get(roptions, "b_untrack", &value));
ASSERT_EQ(value, "b_untrack");
// b and b_untrack should be rolled back.
ASSERT_OK(txn->RollbackToSavePoint());
ASSERT_OK(txn->Get(roptions, "a", &value));
ASSERT_EQ(value, "a");
ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
ASSERT_EQ(value, "a_untrack");
auto s = txn->Get(roptions, "b", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn->Get(roptions, "b_untrack", &value);
ASSERT_TRUE(s.IsNotFound());
// Everything should be rolled back.
ASSERT_OK(txn->Rollback());
s = txn->Get(roptions, "a", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn->Get(roptions, "a_untrack", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn->Get(roptions, "b", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn->Get(roptions, "b_untrack", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn;
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr,
"SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // ROCKSDB_LITE