03b08ba9a9
Summary: WriteBatchWithIndex::GetFromBatchAndDB only works correctly for overwrite_key=false. Transactions use overwrite_key=true (since WriteBatchWithIndex::GetIteratorWithBase only works when overwrite_key=true). So currently, Transactions could return incorrectly merged results when calling Get/GetForUpdate(). Until a permanent fix can be put in place, Transaction::Get[ForUpdate] and WriteBatchWithIndex::GetFromBatch[AndDB] will now return MergeInProgress if the most recent write to a key in the batch is a Merge. Test Plan: more tests Reviewers: sdong, yhchiang, rven, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D47817
1903 lines
44 KiB
C++
1903 lines
44 KiB
C++
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include <string>
|
|
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/utilities/transaction.h"
|
|
#include "rocksdb/utilities/transaction_db.h"
|
|
#include "util/logging.h"
|
|
#include "util/testharness.h"
|
|
#include "utilities/merge_operators.h"
|
|
#include "utilities/merge_operators/string_append/stringappend.h"
|
|
|
|
using std::string;
|
|
|
|
namespace rocksdb {
|
|
|
|
class TransactionTest : public testing::Test {
|
|
public:
|
|
TransactionDB* db;
|
|
string dbname;
|
|
Options options;
|
|
|
|
TransactionDBOptions txn_db_options;
|
|
|
|
TransactionTest() {
|
|
options.create_if_missing = true;
|
|
options.max_write_buffer_number = 2;
|
|
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
|
|
dbname = test::TmpDir() + "/transaction_testdb";
|
|
|
|
DestroyDB(dbname, options);
|
|
txn_db_options.transaction_lock_timeout = 0;
|
|
txn_db_options.default_lock_timeout = 0;
|
|
Status s = TransactionDB::Open(options, txn_db_options, dbname, &db);
|
|
assert(s.ok());
|
|
}
|
|
|
|
~TransactionTest() {
|
|
delete db;
|
|
DestroyDB(dbname, options);
|
|
}
|
|
};
|
|
|
|
TEST_F(TransactionTest, SuccessTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, Slice("foo"), Slice("bar"));
|
|
db->Put(write_options, Slice("foo2"), Slice("bar"));
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
|
|
ASSERT_TRUE(txn);
|
|
|
|
ASSERT_EQ(0, txn->GetNumPuts());
|
|
|
|
s = txn->GetForUpdate(read_options, "foo", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
s = txn->Put(Slice("foo"), Slice("bar2"));
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(1, txn->GetNumPuts());
|
|
|
|
s = txn->GetForUpdate(read_options, "foo", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "foo", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, WriteConflictTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, "foo", "A");
|
|
db->Put(write_options, "foo2", "B");
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->Put("foo", "A2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("foo2", "B2");
|
|
ASSERT_OK(s);
|
|
|
|
// This Put outside of a transaction will conflict with the previous write
|
|
s = db->Put(write_options, "foo", "xxx");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "A");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "A2");
|
|
db->Get(read_options, "foo2", &value);
|
|
ASSERT_EQ(value, "B2");
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, WriteConflictTest2) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, "foo", "bar");
|
|
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
// This Put outside of a transaction will conflict with a later write
|
|
s = db->Put(write_options, "foo", "barz");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("foo2", "X");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("foo",
|
|
"bar2"); // Conflicts with write done after snapshot taken
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn->Put("foo3", "Y");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "barz");
|
|
|
|
ASSERT_EQ(2, txn->GetNumKeys());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s); // Txn should commit, but only write foo2 and foo3
|
|
|
|
// Verify that transaction wrote foo2 and foo3 but not foo
|
|
db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "barz");
|
|
|
|
db->Get(read_options, "foo2", &value);
|
|
ASSERT_EQ(value, "X");
|
|
|
|
db->Get(read_options, "foo3", &value);
|
|
ASSERT_EQ(value, "Y");
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, ReadConflictTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, "foo", "bar");
|
|
db->Put(write_options, "foo2", "bar");
|
|
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
txn->SetSnapshot();
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
txn->GetForUpdate(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
// This Put outside of a transaction will conflict with the previous read
|
|
s = db->Put(write_options, "foo", "barz");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
s = txn->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, TxnOnlyTest) {
|
|
// Test to make sure transactions work when there are no other writes in an
|
|
// empty db.
|
|
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->Put("x", "y");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, FlushTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, Slice("foo"), Slice("bar"));
|
|
db->Put(write_options, Slice("foo2"), Slice("bar"));
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
txn->GetForUpdate(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
s = txn->Put(Slice("foo"), Slice("bar2"));
|
|
ASSERT_OK(s);
|
|
|
|
txn->GetForUpdate(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
// Put a random key so we have a memtable to flush
|
|
s = db->Put(write_options, "dummy", "dummy");
|
|
ASSERT_OK(s);
|
|
|
|
// force a memtable flush
|
|
FlushOptions flush_ops;
|
|
db->Flush(flush_ops);
|
|
|
|
s = txn->Commit();
|
|
// txn should commit since the flushed table is still in MemtableList History
|
|
ASSERT_OK(s);
|
|
|
|
db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, FlushTest2) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, Slice("foo"), Slice("bar"));
|
|
db->Put(write_options, Slice("foo2"), Slice("bar"));
|
|
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
txn->GetForUpdate(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
s = txn->Put(Slice("foo"), Slice("bar2"));
|
|
ASSERT_OK(s);
|
|
|
|
txn->GetForUpdate(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
// Put a random key so we have a MemTable to flush
|
|
s = db->Put(write_options, "dummy", "dummy");
|
|
ASSERT_OK(s);
|
|
|
|
// force a memtable flush
|
|
FlushOptions flush_ops;
|
|
db->Flush(flush_ops);
|
|
|
|
// Put a random key so we have a MemTable to flush
|
|
s = db->Put(write_options, "dummy", "dummy2");
|
|
ASSERT_OK(s);
|
|
|
|
// force a memtable flush
|
|
db->Flush(flush_ops);
|
|
|
|
s = db->Put(write_options, "dummy", "dummy3");
|
|
ASSERT_OK(s);
|
|
|
|
// force a memtable flush
|
|
// Since our test db has max_write_buffer_number=2, this flush will cause
|
|
// the first memtable to get purged from the MemtableList history.
|
|
db->Flush(flush_ops);
|
|
|
|
s = txn->Put("X", "Y");
|
|
// Put should fail since MemTableList History is not older than the snapshot.
|
|
ASSERT_TRUE(s.IsTryAgain());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
// Transaction should only write the keys that succeeded.
|
|
s = db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, NoSnapshotTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, "AAA", "bar");
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
// Modify key after transaction start
|
|
db->Put(write_options, "AAA", "bar1");
|
|
|
|
// Read and write without a snapshot
|
|
txn->GetForUpdate(read_options, "AAA", &value);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = txn->Put("AAA", "bar2");
|
|
ASSERT_OK(s);
|
|
|
|
// Should commit since read/write was done after data changed
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
txn->GetForUpdate(read_options, "AAA", &value);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, MultipleSnapshotTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, "AAA", "bar");
|
|
db->Put(write_options, "BBB", "bar");
|
|
db->Put(write_options, "CCC", "bar");
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
db->Put(write_options, "AAA", "bar1");
|
|
|
|
// Read and write without a snapshot
|
|
txn->GetForUpdate(read_options, "AAA", &value);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = txn->Put("AAA", "bar2");
|
|
ASSERT_OK(s);
|
|
|
|
// Modify BBB before snapshot is taken
|
|
db->Put(write_options, "BBB", "bar1");
|
|
|
|
txn->SetSnapshot();
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
// Read and write with snapshot
|
|
txn->GetForUpdate(snapshot_read_options, "BBB", &value);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = txn->Put("BBB", "bar2");
|
|
ASSERT_OK(s);
|
|
|
|
db->Put(write_options, "CCC", "bar1");
|
|
|
|
// Set a new snapshot
|
|
txn->SetSnapshot();
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
// Read and write with snapshot
|
|
txn->GetForUpdate(snapshot_read_options, "CCC", &value);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = txn->Put("CCC", "bar2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->GetForUpdate(read_options, "AAA", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
s = txn->GetForUpdate(read_options, "BBB", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
s = txn->GetForUpdate(read_options, "CCC", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
s = db->Get(read_options, "AAA", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = db->Get(read_options, "BBB", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = db->Get(read_options, "CCC", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar1");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "AAA", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
s = db->Get(read_options, "BBB", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
s = db->Get(read_options, "CCC", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
// verify that we track multiple writes to the same key at different snapshots
|
|
delete txn;
|
|
txn = db->BeginTransaction(write_options);
|
|
|
|
// Potentially conflicting writes
|
|
db->Put(write_options, "ZZZ", "zzz");
|
|
db->Put(write_options, "XXX", "xxx");
|
|
|
|
txn->SetSnapshot();
|
|
|
|
TransactionOptions txn_options;
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
|
txn2->SetSnapshot();
|
|
|
|
// This should not conflict in txn since the snapshot is later than the
|
|
// previous write (spoiler alert: it will later conflict with txn2).
|
|
s = txn->Put("ZZZ", "zzzz");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn;
|
|
|
|
// This will conflict since the snapshot is earlier than another write to ZZZ
|
|
s = txn2->Put("ZZZ", "xxxxx");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "ZZZ", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "zzzz");
|
|
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, ColumnFamiliesTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
ColumnFamilyHandle *cfa, *cfb;
|
|
ColumnFamilyOptions cf_options;
|
|
|
|
// Create 2 new column families
|
|
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
|
|
ASSERT_OK(s);
|
|
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
|
|
ASSERT_OK(s);
|
|
|
|
delete cfa;
|
|
delete cfb;
|
|
delete db;
|
|
|
|
// open DB with three column families
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
// have to open default column family
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
|
|
// open the new column families
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
|
|
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
|
|
s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
|
|
&handles, &db);
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
txn->SetSnapshot();
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
// Write some data to the db
|
|
WriteBatch batch;
|
|
batch.Put("foo", "foo");
|
|
batch.Put(handles[1], "AAA", "bar");
|
|
batch.Put(handles[1], "AAAZZZ", "bar");
|
|
s = db->Write(write_options, &batch);
|
|
ASSERT_OK(s);
|
|
db->Delete(write_options, handles[1], "AAAZZZ");
|
|
|
|
// These keys do not conflict with existing writes since they're in
|
|
// different column families
|
|
s = txn->Delete("AAA");
|
|
ASSERT_OK(s);
|
|
s = txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
Slice key_slice("AAAZZZ");
|
|
Slice value_slices[2] = {Slice("bar"), Slice("bar")};
|
|
s = txn->Put(handles[2], SliceParts(&key_slice, 1),
|
|
SliceParts(value_slices, 2));
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(3, txn->GetNumKeys());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
s = db->Get(read_options, "AAA", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
s = db->Get(read_options, handles[2], "AAAZZZ", &value);
|
|
ASSERT_EQ(value, "barbar");
|
|
|
|
Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
|
|
Slice value_slice("barbarbar");
|
|
|
|
s = txn2->Delete(handles[2], "XXX");
|
|
ASSERT_OK(s);
|
|
s = txn2->Delete(handles[1], "XXX");
|
|
ASSERT_OK(s);
|
|
|
|
// This write will cause a conflict with the earlier batch write
|
|
s = txn2->Put(handles[1], SliceParts(key_slices, 3),
|
|
SliceParts(&value_slice, 1));
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
s = db->Get(read_options, handles[1], "AAAZZZ", &value);
|
|
ASSERT_EQ(value, "barbar");
|
|
|
|
delete txn;
|
|
delete txn2;
|
|
|
|
txn = db->BeginTransaction(write_options, txn_options);
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
|
|
handles[0], handles[2]};
|
|
std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
|
|
std::vector<std::string> values(4);
|
|
|
|
std::vector<Status> results = txn->MultiGetForUpdate(
|
|
snapshot_read_options, multiget_cfh, multiget_keys, &values);
|
|
ASSERT_OK(results[0]);
|
|
ASSERT_OK(results[1]);
|
|
ASSERT_OK(results[2]);
|
|
ASSERT_TRUE(results[3].IsNotFound());
|
|
ASSERT_EQ(values[0], "bar");
|
|
ASSERT_EQ(values[1], "barbar");
|
|
ASSERT_EQ(values[2], "foo");
|
|
|
|
s = txn->SingleDelete(handles[2], "ZZZ");
|
|
ASSERT_OK(s);
|
|
s = txn->Put(handles[2], "ZZZ", "YYY");
|
|
ASSERT_OK(s);
|
|
s = txn->Put(handles[2], "ZZZ", "YYYY");
|
|
ASSERT_OK(s);
|
|
s = txn->Delete(handles[2], "ZZZ");
|
|
ASSERT_OK(s);
|
|
s = txn->Put(handles[2], "AAAZZZ", "barbarbar");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(5, txn->GetNumKeys());
|
|
|
|
// Txn should commit
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
s = db->Get(read_options, handles[2], "ZZZ", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
// Put a key which will conflict with the next txn using the previous snapshot
|
|
db->Put(write_options, handles[2], "foo", "000");
|
|
|
|
results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
|
|
multiget_keys, &values);
|
|
// All results should fail since there was a conflict
|
|
ASSERT_TRUE(results[0].IsBusy());
|
|
ASSERT_TRUE(results[1].IsBusy());
|
|
ASSERT_TRUE(results[2].IsBusy());
|
|
ASSERT_TRUE(results[3].IsBusy());
|
|
|
|
s = db->Get(read_options, handles[2], "foo", &value);
|
|
ASSERT_EQ(value, "000");
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->DropColumnFamily(handles[1]);
|
|
ASSERT_OK(s);
|
|
s = db->DropColumnFamily(handles[2]);
|
|
ASSERT_OK(s);
|
|
|
|
delete txn;
|
|
delete txn2;
|
|
|
|
for (auto handle : handles) {
|
|
delete handle;
|
|
}
|
|
}
|
|
|
|
TEST_F(TransactionTest, ColumnFamiliesTest2) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
ColumnFamilyHandle *one, *two;
|
|
ColumnFamilyOptions cf_options;
|
|
|
|
// Create 2 new column families
|
|
s = db->CreateColumnFamily(cf_options, "ONE", &one);
|
|
ASSERT_OK(s);
|
|
s = db->CreateColumnFamily(cf_options, "TWO", &two);
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn1 = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn1);
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
s = txn1->Put(one, "X", "1");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put(two, "X", "2");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put("X", "0");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put(one, "X", "11");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
// Drop first column family
|
|
s = db->DropColumnFamily(one);
|
|
ASSERT_OK(s);
|
|
|
|
// Should fail since column family was dropped.
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn1;
|
|
txn1 = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn1);
|
|
|
|
// Should fail since column family was dropped
|
|
s = txn1->Put(one, "X", "111");
|
|
ASSERT_TRUE(s.IsInvalidArgument());
|
|
|
|
s = txn1->Put(two, "X", "222");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("X", "000");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, two, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("222", value);
|
|
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("000", value);
|
|
|
|
s = db->DropColumnFamily(two);
|
|
ASSERT_OK(s);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
delete one;
|
|
delete two;
|
|
}
|
|
|
|
TEST_F(TransactionTest, EmptyTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
s = db->Put(write_options, "aaa", "aaa");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
txn->Rollback();
|
|
delete txn;
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
s = txn->GetForUpdate(read_options, "aaa", &value);
|
|
ASSERT_EQ(value, "aaa");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
txn->SetSnapshot();
|
|
|
|
s = txn->GetForUpdate(read_options, "aaa", &value);
|
|
ASSERT_EQ(value, "aaa");
|
|
|
|
// Conflicts with previous GetForUpdate
|
|
s = db->Put(write_options, "aaa", "xxx");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
// transaction expired!
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, PredicateManyPreceders) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options1, read_options2;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
|
read_options1.snapshot = txn1->GetSnapshot();
|
|
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
txn2->SetSnapshot();
|
|
read_options2.snapshot = txn2->GetSnapshot();
|
|
|
|
std::vector<Slice> multiget_keys = {"1", "2", "3"};
|
|
std::vector<std::string> multiget_values;
|
|
|
|
std::vector<Status> results =
|
|
txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
|
|
ASSERT_TRUE(results[1].IsNotFound());
|
|
|
|
s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
txn2->Rollback();
|
|
|
|
multiget_values.clear();
|
|
results =
|
|
txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
|
|
ASSERT_TRUE(results[1].IsNotFound());
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
read_options1.snapshot = txn1->GetSnapshot();
|
|
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
read_options2.snapshot = txn2->GetSnapshot();
|
|
|
|
s = txn1->Put("4", "x");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Delete("4"); // conflict
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->GetForUpdate(read_options2, "4", &value);
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
txn2->Rollback();
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, LostUpdate) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, read_options1, read_options2;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
// Test 2 transactions writing to the same key in multiple orders and
|
|
// with/without snapshots
|
|
|
|
Transaction* txn1 = db->BeginTransaction(write_options);
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
|
|
s = txn1->Put("1", "1");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("1", "2"); // conflict
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "1", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("1", value);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
txn_options.set_snapshot = true;
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
read_options1.snapshot = txn1->GetSnapshot();
|
|
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
read_options2.snapshot = txn2->GetSnapshot();
|
|
|
|
s = txn1->Put("1", "3");
|
|
ASSERT_OK(s);
|
|
s = txn2->Put("1", "4"); // conflict
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "1", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("3", value);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
read_options1.snapshot = txn1->GetSnapshot();
|
|
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
read_options2.snapshot = txn2->GetSnapshot();
|
|
|
|
s = txn1->Put("1", "5");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("1", "6");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "1", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("5", value);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
read_options1.snapshot = txn1->GetSnapshot();
|
|
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
read_options2.snapshot = txn2->GetSnapshot();
|
|
|
|
s = txn1->Put("1", "7");
|
|
ASSERT_OK(s);
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
txn2->SetSnapshot();
|
|
s = txn2->Put("1", "8");
|
|
ASSERT_OK(s);
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "1", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("8", value);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
txn1 = db->BeginTransaction(write_options);
|
|
txn2 = db->BeginTransaction(write_options);
|
|
|
|
s = txn1->Put("1", "9");
|
|
ASSERT_OK(s);
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("1", "10");
|
|
ASSERT_OK(s);
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
s = db->Get(read_options, "1", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "10");
|
|
}
|
|
|
|
TEST_F(TransactionTest, UntrackedWrites) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
// Verify transaction rollback works for untracked keys.
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
txn->SetSnapshot();
|
|
|
|
s = txn->PutUntracked("untracked", "0");
|
|
ASSERT_OK(s);
|
|
txn->Rollback();
|
|
s = db->Get(read_options, "untracked", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn;
|
|
txn = db->BeginTransaction(write_options);
|
|
txn->SetSnapshot();
|
|
|
|
s = db->Put(write_options, "untracked", "x");
|
|
ASSERT_OK(s);
|
|
|
|
// Untracked writes should succeed even though key was written after snapshot
|
|
s = txn->PutUntracked("untracked", "1");
|
|
ASSERT_OK(s);
|
|
s = txn->MergeUntracked("untracked", "2");
|
|
ASSERT_OK(s);
|
|
s = txn->DeleteUntracked("untracked");
|
|
ASSERT_OK(s);
|
|
|
|
// Conflict
|
|
s = txn->Put("untracked", "3");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "untracked", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, ExpiredTransaction) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
// Set txn expiration timeout to 0 microseconds (expires instantly)
|
|
txn_options.expiration = 0;
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
|
|
|
s = txn1->Put("X", "1");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("Y", "1");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
|
|
// txn2 should be able to write to X since txn1 has expired
|
|
s = txn2->Put("X", "2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("2", value);
|
|
|
|
s = txn1->Put("Z", "1");
|
|
ASSERT_OK(s);
|
|
|
|
// txn1 should fail to commit since it is expired
|
|
s = txn1->Commit();
|
|
ASSERT_TRUE(s.IsExpired());
|
|
|
|
s = db->Get(read_options, "Y", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = db->Get(read_options, "Z", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, Rollback) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
|
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("X", "1");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
|
|
// txn2 should not be able to write to X since txn1 has it locked
|
|
s = txn2->Put("X", "2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
txn1->Rollback();
|
|
delete txn1;
|
|
|
|
// txn2 should now be able to write to X
|
|
s = txn2->Put("X", "3");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("3", value);
|
|
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, LockLimitTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
delete db;
|
|
|
|
// Open DB with a lock limit of 3
|
|
txn_db_options.max_num_locks = 3;
|
|
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
|
|
ASSERT_OK(s);
|
|
|
|
// Create a txn and verify we can only lock up to 3 keys
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->Put("X", "x");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("Y", "y");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("Z", "z");
|
|
ASSERT_OK(s);
|
|
|
|
// lock limit reached
|
|
s = txn->Put("W", "w");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
// re-locking same key shouldn't put us over the limit
|
|
s = txn->Put("X", "xx");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->GetForUpdate(read_options, "W", &value);
|
|
ASSERT_TRUE(s.IsBusy());
|
|
s = txn->GetForUpdate(read_options, "V", &value);
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
// re-locking same key shouldn't put us over the limit
|
|
s = txn->GetForUpdate(read_options, "Y", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("y", value);
|
|
|
|
s = txn->Get(read_options, "W", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
// "X" currently locked
|
|
s = txn2->Put("X", "x");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
// lock limit reached
|
|
s = txn2->Put("M", "m");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("xx", value);
|
|
|
|
s = db->Get(read_options, "W", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
// Committing txn should release its locks and allow txn2 to proceed
|
|
s = txn2->Put("X", "x2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Delete("X");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("M", "m");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("Z", "z2");
|
|
ASSERT_OK(s);
|
|
|
|
// lock limit reached
|
|
s = txn2->Delete("Y");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "Z", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("z2", value);
|
|
|
|
s = db->Get(read_options, "Y", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("y", value);
|
|
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn;
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, IteratorTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
// Write some keys to the db
|
|
s = db->Put(write_options, "A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "G", "g");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "F", "f");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "C", "c");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "D", "d");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
// Write some keys in a txn
|
|
s = txn->Put("B", "b");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("H", "h");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Delete("D");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("E", "e");
|
|
ASSERT_OK(s);
|
|
|
|
txn->SetSnapshot();
|
|
const Snapshot* snapshot = txn->GetSnapshot();
|
|
|
|
// Write some keys to the db after the snapshot
|
|
s = db->Put(write_options, "BB", "xx");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "C", "xx");
|
|
ASSERT_OK(s);
|
|
|
|
read_options.snapshot = snapshot;
|
|
Iterator* iter = txn->GetIterator(read_options);
|
|
ASSERT_OK(iter->status());
|
|
iter->SeekToFirst();
|
|
|
|
// Read all keys via iter and lock them all
|
|
std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
|
|
for (int i = 0; i < 7; i++) {
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(results[i], iter->value().ToString());
|
|
|
|
s = txn->GetForUpdate(read_options, iter->key(), nullptr);
|
|
if (i == 2) {
|
|
// "C" was modified after txn's snapshot
|
|
ASSERT_TRUE(s.IsBusy());
|
|
} else {
|
|
ASSERT_OK(s);
|
|
}
|
|
|
|
iter->Next();
|
|
}
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
iter->Seek("G");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("g", iter->value().ToString());
|
|
|
|
iter->Prev();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("f", iter->value().ToString());
|
|
|
|
iter->Seek("D");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("e", iter->value().ToString());
|
|
|
|
iter->Seek("C");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("c", iter->value().ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("e", iter->value().ToString());
|
|
|
|
iter->Seek("");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("a", iter->value().ToString());
|
|
|
|
iter->Seek("X");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
iter->SeekToLast();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("h", iter->value().ToString());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete iter;
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, SavepointTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
ASSERT_EQ(0, txn->GetNumPuts());
|
|
|
|
s = txn->RollbackToSavePoint();
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
txn->SetSavePoint(); // 1
|
|
|
|
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
|
|
s = txn->RollbackToSavePoint();
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Put("B", "b");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(1, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("b", value);
|
|
|
|
delete txn;
|
|
txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("B", "bb");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("C", "c");
|
|
ASSERT_OK(s);
|
|
|
|
txn->SetSavePoint(); // 2
|
|
|
|
s = txn->Delete("B");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("C", "cc");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("D", "d");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(5, txn->GetNumPuts());
|
|
ASSERT_EQ(1, txn->GetNumDeletes());
|
|
|
|
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
|
|
|
|
ASSERT_EQ(3, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a", value);
|
|
|
|
s = txn->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("bb", value);
|
|
|
|
s = txn->Get(read_options, "C", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("c", value);
|
|
|
|
s = txn->Get(read_options, "D", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("E", "e");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(5, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
// Rollback to beginning of txn
|
|
s = txn->RollbackToSavePoint();
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
txn->Rollback();
|
|
|
|
ASSERT_EQ(0, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("b", value);
|
|
|
|
s = txn->Get(read_options, "D", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Get(read_options, "D", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Get(read_options, "E", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Put("A", "aa");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("F", "f");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(2, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
txn->SetSavePoint(); // 3
|
|
txn->SetSavePoint(); // 4
|
|
|
|
s = txn->Put("G", "g");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->SingleDelete("F");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Delete("B");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("aa", value);
|
|
|
|
s = txn->Get(read_options, "F", &value);
|
|
// According to db.h, doing a SingleDelete on a key that has been
|
|
// overwritten will have undefinied behavior. So it is unclear what the
|
|
// result of fetching "F" should be. The current implementation will
|
|
// return NotFound in this case.
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Get(read_options, "B", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
ASSERT_EQ(3, txn->GetNumPuts());
|
|
ASSERT_EQ(2, txn->GetNumDeletes());
|
|
|
|
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
|
|
|
|
ASSERT_EQ(2, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
s = txn->Get(read_options, "F", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("f", value);
|
|
|
|
s = txn->Get(read_options, "G", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "F", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("f", value);
|
|
|
|
s = db->Get(read_options, "G", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("aa", value);
|
|
|
|
s = db->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("b", value);
|
|
|
|
s = db->Get(read_options, "C", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = db->Get(read_options, "D", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = db->Get(read_options, "E", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, SavepointTest2) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
txn_options.lock_timeout = 1; // 1 ms
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn1);
|
|
|
|
s = txn1->Put("A", "");
|
|
ASSERT_OK(s);
|
|
|
|
txn1->SetSavePoint(); // 1
|
|
|
|
s = txn1->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("C", "c");
|
|
ASSERT_OK(s);
|
|
|
|
txn1->SetSavePoint(); // 2
|
|
|
|
s = txn1->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put("B", "b");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 2
|
|
|
|
// Verify that "A" and "C" is still locked while "B" is not
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
s = txn2->Put("A", "a2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Put("C", "c2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Put("B", "b2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("A", "aa");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put("B", "bb");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn2;
|
|
|
|
s = txn1->Put("A", "aaa");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put("B", "bbb");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put("C", "ccc");
|
|
ASSERT_OK(s);
|
|
|
|
txn1->SetSavePoint(); // 3
|
|
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 3
|
|
|
|
// Verify that "A", "B", "C" are still locked
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
s = txn2->Put("A", "a2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Put("B", "b2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Put("C", "c2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
|
|
|
|
// Verify that only "A" is locked
|
|
s = txn2->Put("A", "a3");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Put("B", "b3");
|
|
ASSERT_OK(s);
|
|
s = txn2->Put("C", "c3po");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn1;
|
|
|
|
// Verify "A" "C" "B" are no longer locked
|
|
s = txn2->Put("A", "a4");
|
|
ASSERT_OK(s);
|
|
s = txn2->Put("B", "b4");
|
|
ASSERT_OK(s);
|
|
s = txn2->Put("C", "c4");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, TimeoutTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
delete db;
|
|
|
|
// transaction writes have an infinite timeout,
|
|
// but we will override this when we start a txn
|
|
// db writes have infinite timeout
|
|
txn_db_options.transaction_lock_timeout = -1;
|
|
txn_db_options.default_lock_timeout = -1;
|
|
|
|
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "aaa", "aaa");
|
|
ASSERT_OK(s);
|
|
|
|
TransactionOptions txn_options0;
|
|
txn_options0.expiration = 100; // 100ms
|
|
txn_options0.lock_timeout = 50; // txn timeout no longer infinite
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options0);
|
|
|
|
s = txn1->GetForUpdate(read_options, "aaa", nullptr);
|
|
ASSERT_OK(s);
|
|
|
|
// Conflicts with previous GetForUpdate.
|
|
// Since db writes do not have a timeout, this should eventually succeed when
|
|
// the transaction expires.
|
|
s = db->Put(write_options, "aaa", "xxx");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_GE(txn1->GetElapsedTime(),
|
|
static_cast<uint64_t>(txn_options0.expiration));
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_TRUE(s.IsExpired()); // expired!
|
|
|
|
s = db->Get(read_options, "aaa", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("xxx", value);
|
|
|
|
delete txn1;
|
|
delete db;
|
|
|
|
// transaction writes have 10ms timeout,
|
|
// db writes have infinite timeout
|
|
txn_db_options.transaction_lock_timeout = 50;
|
|
txn_db_options.default_lock_timeout = -1;
|
|
|
|
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "aaa", "aaa");
|
|
ASSERT_OK(s);
|
|
|
|
TransactionOptions txn_options;
|
|
txn_options.expiration = 100; // 100ms
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
|
|
s = txn1->GetForUpdate(read_options, "aaa", nullptr);
|
|
ASSERT_OK(s);
|
|
|
|
// Conflicts with previous GetForUpdate.
|
|
// Since db writes do not have a timeout, this should eventually succeed when
|
|
// the transaction expires.
|
|
s = db->Put(write_options, "aaa", "xxx");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_NOK(s); // expired!
|
|
|
|
s = db->Get(read_options, "aaa", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("xxx", value);
|
|
|
|
delete txn1;
|
|
txn_options.expiration = 6000000; // 100 minutes
|
|
txn_options.lock_timeout = 1; // 1ms
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
txn1->SetLockTimeout(100);
|
|
|
|
TransactionOptions txn_options2;
|
|
txn_options2.expiration = 10; // 10ms
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options2);
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("a", "2");
|
|
ASSERT_OK(s);
|
|
|
|
// txn1 has a lock timeout longer than txn2's expiration, so it will win
|
|
s = txn1->Delete("a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
// txn2 should be expired out since txn1 waiting until its timeout expired.
|
|
s = txn2->Commit();
|
|
ASSERT_TRUE(s.IsExpired());
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
txn_options.expiration = 6000000; // 100 minutes
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
txn_options2.expiration = 100000000;
|
|
txn2 = db->BeginTransaction(write_options, txn_options2);
|
|
|
|
s = txn1->Delete("asdf");
|
|
ASSERT_OK(s);
|
|
|
|
// txn2 has a smaller lock timeout than txn1's expiration, so it will time out
|
|
s = txn2->Delete("asdf");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("asdf", "asdf");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "asdf", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("asdf", value);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, SingleDeleteTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->SingleDelete("A");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
|
|
s = txn->SingleDelete("A");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a", value);
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a", value);
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
|
|
s = txn->SingleDelete("A");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
txn2->SetSnapshot();
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("A", "a2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->SingleDelete("A");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->SingleDelete("B");
|
|
ASSERT_OK(s);
|
|
|
|
// According to db.h, doing a SingleDelete on a key that has been
|
|
// overwritten will have undefinied behavior. So it is unclear what the
|
|
// result of fetching "A" should be. The current implementation will
|
|
// return NotFound in this case.
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn2->Put("B", "b");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn2;
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
// According to db.h, doing a SingleDelete on a key that has been
|
|
// overwritten will have undefinied behavior. So it is unclear what the
|
|
// result of fetching "A" should be. The current implementation will
|
|
// return NotFound in this case.
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = db->Get(read_options, "B", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
}
|
|
|
|
TEST_F(TransactionTest, MergeTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = db->Put(write_options, "A", "a0");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Merge("A", "1");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Merge("A", "2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsMergeInProgress());
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a", value);
|
|
|
|
s = txn->Merge("A", "3");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsMergeInProgress());
|
|
|
|
TransactionOptions txn_options;
|
|
txn_options.lock_timeout = 1; // 1 ms
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
// verify that txn has "A" locked
|
|
s = txn2->Merge("A", "4");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn2;
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a,3", value);
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|
|
|
|
#else
|
|
#include <stdio.h>
|
|
|
|
int main(int argc, char** argv) {
|
|
fprintf(stderr,
|
|
"SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
|
|
return 0;
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|