0c2bd5cb4b
Summary: Doing inline checking of transaction expiration instead of using a callback. Test Plan: To be added Reviewers: anthony Reviewed By: anthony Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D53673
2549 lines
60 KiB
C++
2549 lines
60 KiB
C++
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include <string>
|
|
|
|
#include "db/db_impl.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/utilities/transaction.h"
|
|
#include "rocksdb/utilities/transaction_db.h"
|
|
#include "table/mock_table.h"
|
|
#include "util/logging.h"
|
|
#include "util/sync_point.h"
|
|
#include "util/testharness.h"
|
|
#include "util/testutil.h"
|
|
#include "utilities/merge_operators.h"
|
|
#include "utilities/merge_operators/string_append/stringappend.h"
|
|
|
|
using std::string;
|
|
|
|
namespace rocksdb {
|
|
|
|
class TransactionTest : public testing::Test {
|
|
public:
|
|
TransactionDB* db;
|
|
string dbname;
|
|
Options options;
|
|
|
|
TransactionDBOptions txn_db_options;
|
|
|
|
TransactionTest() {
|
|
options.create_if_missing = true;
|
|
options.max_write_buffer_number = 2;
|
|
options.write_buffer_size = 4 * 1024;
|
|
options.level0_file_num_compaction_trigger = 2;
|
|
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
|
|
dbname = test::TmpDir() + "/transaction_testdb";
|
|
|
|
DestroyDB(dbname, options);
|
|
txn_db_options.transaction_lock_timeout = 0;
|
|
txn_db_options.default_lock_timeout = 0;
|
|
Status s = TransactionDB::Open(options, txn_db_options, dbname, &db);
|
|
assert(s.ok());
|
|
}
|
|
|
|
~TransactionTest() {
|
|
delete db;
|
|
DestroyDB(dbname, options);
|
|
}
|
|
|
|
Status ReOpen() {
|
|
delete db;
|
|
DestroyDB(dbname, options);
|
|
|
|
Status s = TransactionDB::Open(options, txn_db_options, dbname, &db);
|
|
|
|
return s;
|
|
}
|
|
};
|
|
|
|
TEST_F(TransactionTest, SuccessTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, Slice("foo"), Slice("bar"));
|
|
db->Put(write_options, Slice("foo2"), Slice("bar"));
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
|
|
ASSERT_TRUE(txn);
|
|
|
|
ASSERT_EQ(0, txn->GetNumPuts());
|
|
|
|
s = txn->GetForUpdate(read_options, "foo", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
s = txn->Put(Slice("foo"), Slice("bar2"));
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(1, txn->GetNumPuts());
|
|
|
|
s = txn->GetForUpdate(read_options, "foo", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "foo", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, FirstWriteTest) {
|
|
WriteOptions write_options;
|
|
|
|
// Test conflict checking against the very first write to a db.
|
|
// The transaction's snapshot will have seq 1 and the following write
|
|
// will have sequence 1.
|
|
Status s = db->Put(write_options, "A", "a");
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
txn->SetSnapshot();
|
|
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("A", "b");
|
|
ASSERT_OK(s);
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, FirstWriteTest2) {
|
|
WriteOptions write_options;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
txn->SetSnapshot();
|
|
|
|
// Test conflict checking against the very first write to a db.
|
|
// The transaction's snapshot is a seq 0 while the following write
|
|
// will have sequence 1.
|
|
Status s = db->Put(write_options, "A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("A", "b");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, WriteOptionsTest) {
|
|
WriteOptions write_options;
|
|
write_options.sync = true;
|
|
write_options.disableWAL = true;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
ASSERT_TRUE(txn->GetWriteOptions()->sync);
|
|
|
|
write_options.sync = false;
|
|
txn->SetWriteOptions(write_options);
|
|
ASSERT_FALSE(txn->GetWriteOptions()->sync);
|
|
ASSERT_TRUE(txn->GetWriteOptions()->disableWAL);
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, WriteConflictTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, "foo", "A");
|
|
db->Put(write_options, "foo2", "B");
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->Put("foo", "A2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("foo2", "B2");
|
|
ASSERT_OK(s);
|
|
|
|
// This Put outside of a transaction will conflict with the previous write
|
|
s = db->Put(write_options, "foo", "xxx");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "A");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "A2");
|
|
db->Get(read_options, "foo2", &value);
|
|
ASSERT_EQ(value, "B2");
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, WriteConflictTest2) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, "foo", "bar");
|
|
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
// This Put outside of a transaction will conflict with a later write
|
|
s = db->Put(write_options, "foo", "barz");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("foo2", "X");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("foo",
|
|
"bar2"); // Conflicts with write done after snapshot taken
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn->Put("foo3", "Y");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "barz");
|
|
|
|
ASSERT_EQ(2, txn->GetNumKeys());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s); // Txn should commit, but only write foo2 and foo3
|
|
|
|
// Verify that transaction wrote foo2 and foo3 but not foo
|
|
db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "barz");
|
|
|
|
db->Get(read_options, "foo2", &value);
|
|
ASSERT_EQ(value, "X");
|
|
|
|
db->Get(read_options, "foo3", &value);
|
|
ASSERT_EQ(value, "Y");
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, ReadConflictTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, "foo", "bar");
|
|
db->Put(write_options, "foo2", "bar");
|
|
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
txn->SetSnapshot();
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
txn->GetForUpdate(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
// This Put outside of a transaction will conflict with the previous read
|
|
s = db->Put(write_options, "foo", "barz");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
s = txn->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, TxnOnlyTest) {
|
|
// Test to make sure transactions work when there are no other writes in an
|
|
// empty db.
|
|
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->Put("x", "y");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, FlushTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, Slice("foo"), Slice("bar"));
|
|
db->Put(write_options, Slice("foo2"), Slice("bar"));
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
txn->GetForUpdate(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
s = txn->Put(Slice("foo"), Slice("bar2"));
|
|
ASSERT_OK(s);
|
|
|
|
txn->GetForUpdate(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
// Put a random key so we have a memtable to flush
|
|
s = db->Put(write_options, "dummy", "dummy");
|
|
ASSERT_OK(s);
|
|
|
|
// force a memtable flush
|
|
FlushOptions flush_ops;
|
|
db->Flush(flush_ops);
|
|
|
|
s = txn->Commit();
|
|
// txn should commit since the flushed table is still in MemtableList History
|
|
ASSERT_OK(s);
|
|
|
|
db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, FlushTest2) {
|
|
const size_t num_tests = 3;
|
|
|
|
for (size_t n = 0; n < num_tests; n++) {
|
|
// Test different table factories
|
|
switch (n) {
|
|
case 0:
|
|
break;
|
|
case 1:
|
|
options.table_factory.reset(new mock::MockTableFactory());
|
|
break;
|
|
case 2: {
|
|
PlainTableOptions pt_opts;
|
|
pt_opts.hash_table_ratio = 0;
|
|
options.table_factory.reset(NewPlainTableFactory(pt_opts));
|
|
break;
|
|
}
|
|
}
|
|
|
|
Status s = ReOpen();
|
|
ASSERT_OK(s);
|
|
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
|
|
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetBaseDB());
|
|
|
|
db->Put(write_options, Slice("foo"), Slice("bar"));
|
|
db->Put(write_options, Slice("foo2"), Slice("bar2"));
|
|
db->Put(write_options, Slice("foo3"), Slice("bar3"));
|
|
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
txn->GetForUpdate(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar");
|
|
|
|
s = txn->Put(Slice("foo"), Slice("bar2"));
|
|
ASSERT_OK(s);
|
|
|
|
txn->GetForUpdate(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar2");
|
|
// verify foo is locked by txn
|
|
s = db->Delete(write_options, "foo");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = db->Put(write_options, "Z", "z");
|
|
ASSERT_OK(s);
|
|
s = db->Put(write_options, "dummy", "dummy");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "S", "s");
|
|
ASSERT_OK(s);
|
|
s = db->SingleDelete(write_options, "S");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Delete("S");
|
|
// Should fail after encountering a write to S in memtable
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
// force a memtable flush
|
|
s = db_impl->TEST_FlushMemTable(true);
|
|
ASSERT_OK(s);
|
|
|
|
// Put a random key so we have a MemTable to flush
|
|
s = db->Put(write_options, "dummy", "dummy2");
|
|
ASSERT_OK(s);
|
|
|
|
// force a memtable flush
|
|
ASSERT_OK(db_impl->TEST_FlushMemTable(true));
|
|
|
|
s = db->Put(write_options, "dummy", "dummy3");
|
|
ASSERT_OK(s);
|
|
|
|
// force a memtable flush
|
|
// Since our test db has max_write_buffer_number=2, this flush will cause
|
|
// the first memtable to get purged from the MemtableList history.
|
|
ASSERT_OK(db_impl->TEST_FlushMemTable(true));
|
|
|
|
s = txn->Put("X", "Y");
|
|
// Should succeed after verifying there is no write to X in SST file
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("Z", "zz");
|
|
// Should fail after encountering a write to Z in SST file
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn->GetForUpdate(read_options, "foo2", &value);
|
|
// should succeed since key was written before txn started
|
|
ASSERT_OK(s);
|
|
// verify foo2 is locked by txn
|
|
s = db->Delete(write_options, "foo2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn->Delete("S");
|
|
// Should fail after encountering a write to S in SST file
|
|
fprintf(stderr, "%" ROCKSDB_PRIszt " %s\n", n, s.ToString().c_str());
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
// Write a bunch of keys to db to force a compaction
|
|
Random rnd(47);
|
|
for (int i = 0; i < 1000; i++) {
|
|
s = db->Put(write_options, std::to_string(i),
|
|
test::CompressibleString(&rnd, 0.8, 100, &value));
|
|
ASSERT_OK(s);
|
|
}
|
|
|
|
s = txn->Put("X", "yy");
|
|
// Should succeed after verifying there is no write to X in SST file
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("Z", "zzz");
|
|
// Should fail after encountering a write to Z in SST file
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn->Delete("S");
|
|
// Should fail after encountering a write to S in SST file
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn->GetForUpdate(read_options, "foo3", &value);
|
|
// should succeed since key was written before txn started
|
|
ASSERT_OK(s);
|
|
// verify foo3 is locked by txn
|
|
s = db->Delete(write_options, "foo3");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
db_impl->TEST_WaitForCompact();
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
// Transaction should only write the keys that succeeded.
|
|
s = db->Get(read_options, "foo", &value);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("yy", value);
|
|
|
|
s = db->Get(read_options, "Z", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("z", value);
|
|
|
|
delete txn;
|
|
}
|
|
}
|
|
|
|
TEST_F(TransactionTest, NoSnapshotTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, "AAA", "bar");
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
// Modify key after transaction start
|
|
db->Put(write_options, "AAA", "bar1");
|
|
|
|
// Read and write without a snapshot
|
|
txn->GetForUpdate(read_options, "AAA", &value);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = txn->Put("AAA", "bar2");
|
|
ASSERT_OK(s);
|
|
|
|
// Should commit since read/write was done after data changed
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
txn->GetForUpdate(read_options, "AAA", &value);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, MultipleSnapshotTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
db->Put(write_options, "AAA", "bar");
|
|
db->Put(write_options, "BBB", "bar");
|
|
db->Put(write_options, "CCC", "bar");
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
db->Put(write_options, "AAA", "bar1");
|
|
|
|
// Read and write without a snapshot
|
|
txn->GetForUpdate(read_options, "AAA", &value);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = txn->Put("AAA", "bar2");
|
|
ASSERT_OK(s);
|
|
|
|
// Modify BBB before snapshot is taken
|
|
db->Put(write_options, "BBB", "bar1");
|
|
|
|
txn->SetSnapshot();
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
// Read and write with snapshot
|
|
txn->GetForUpdate(snapshot_read_options, "BBB", &value);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = txn->Put("BBB", "bar2");
|
|
ASSERT_OK(s);
|
|
|
|
db->Put(write_options, "CCC", "bar1");
|
|
|
|
// Set a new snapshot
|
|
txn->SetSnapshot();
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
// Read and write with snapshot
|
|
txn->GetForUpdate(snapshot_read_options, "CCC", &value);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = txn->Put("CCC", "bar2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->GetForUpdate(read_options, "AAA", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
s = txn->GetForUpdate(read_options, "BBB", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
s = txn->GetForUpdate(read_options, "CCC", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
s = db->Get(read_options, "AAA", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = db->Get(read_options, "BBB", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar1");
|
|
s = db->Get(read_options, "CCC", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar1");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "AAA", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
s = db->Get(read_options, "BBB", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
s = db->Get(read_options, "CCC", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "bar2");
|
|
|
|
// verify that we track multiple writes to the same key at different snapshots
|
|
delete txn;
|
|
txn = db->BeginTransaction(write_options);
|
|
|
|
// Potentially conflicting writes
|
|
db->Put(write_options, "ZZZ", "zzz");
|
|
db->Put(write_options, "XXX", "xxx");
|
|
|
|
txn->SetSnapshot();
|
|
|
|
TransactionOptions txn_options;
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
|
txn2->SetSnapshot();
|
|
|
|
// This should not conflict in txn since the snapshot is later than the
|
|
// previous write (spoiler alert: it will later conflict with txn2).
|
|
s = txn->Put("ZZZ", "zzzz");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn;
|
|
|
|
// This will conflict since the snapshot is earlier than another write to ZZZ
|
|
s = txn2->Put("ZZZ", "xxxxx");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "ZZZ", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "zzzz");
|
|
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, ColumnFamiliesTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
ColumnFamilyHandle *cfa, *cfb;
|
|
ColumnFamilyOptions cf_options;
|
|
|
|
// Create 2 new column families
|
|
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
|
|
ASSERT_OK(s);
|
|
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
|
|
ASSERT_OK(s);
|
|
|
|
delete cfa;
|
|
delete cfb;
|
|
delete db;
|
|
|
|
// open DB with three column families
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
// have to open default column family
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
|
|
// open the new column families
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
|
|
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
|
|
s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
|
|
&handles, &db);
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
txn->SetSnapshot();
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
// Write some data to the db
|
|
WriteBatch batch;
|
|
batch.Put("foo", "foo");
|
|
batch.Put(handles[1], "AAA", "bar");
|
|
batch.Put(handles[1], "AAAZZZ", "bar");
|
|
s = db->Write(write_options, &batch);
|
|
ASSERT_OK(s);
|
|
db->Delete(write_options, handles[1], "AAAZZZ");
|
|
|
|
// These keys do not conflict with existing writes since they're in
|
|
// different column families
|
|
s = txn->Delete("AAA");
|
|
ASSERT_OK(s);
|
|
s = txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
Slice key_slice("AAAZZZ");
|
|
Slice value_slices[2] = {Slice("bar"), Slice("bar")};
|
|
s = txn->Put(handles[2], SliceParts(&key_slice, 1),
|
|
SliceParts(value_slices, 2));
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(3, txn->GetNumKeys());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
s = db->Get(read_options, "AAA", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
s = db->Get(read_options, handles[2], "AAAZZZ", &value);
|
|
ASSERT_EQ(value, "barbar");
|
|
|
|
Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
|
|
Slice value_slice("barbarbar");
|
|
|
|
s = txn2->Delete(handles[2], "XXX");
|
|
ASSERT_OK(s);
|
|
s = txn2->Delete(handles[1], "XXX");
|
|
ASSERT_OK(s);
|
|
|
|
// This write will cause a conflict with the earlier batch write
|
|
s = txn2->Put(handles[1], SliceParts(key_slices, 3),
|
|
SliceParts(&value_slice, 1));
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
s = db->Get(read_options, handles[1], "AAAZZZ", &value);
|
|
ASSERT_EQ(value, "barbar");
|
|
|
|
delete txn;
|
|
delete txn2;
|
|
|
|
txn = db->BeginTransaction(write_options, txn_options);
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
|
|
handles[0], handles[2]};
|
|
std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
|
|
std::vector<std::string> values(4);
|
|
|
|
std::vector<Status> results = txn->MultiGetForUpdate(
|
|
snapshot_read_options, multiget_cfh, multiget_keys, &values);
|
|
ASSERT_OK(results[0]);
|
|
ASSERT_OK(results[1]);
|
|
ASSERT_OK(results[2]);
|
|
ASSERT_TRUE(results[3].IsNotFound());
|
|
ASSERT_EQ(values[0], "bar");
|
|
ASSERT_EQ(values[1], "barbar");
|
|
ASSERT_EQ(values[2], "foo");
|
|
|
|
s = txn->SingleDelete(handles[2], "ZZZ");
|
|
ASSERT_OK(s);
|
|
s = txn->Put(handles[2], "ZZZ", "YYY");
|
|
ASSERT_OK(s);
|
|
s = txn->Put(handles[2], "ZZZ", "YYYY");
|
|
ASSERT_OK(s);
|
|
s = txn->Delete(handles[2], "ZZZ");
|
|
ASSERT_OK(s);
|
|
s = txn->Put(handles[2], "AAAZZZ", "barbarbar");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(5, txn->GetNumKeys());
|
|
|
|
// Txn should commit
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
s = db->Get(read_options, handles[2], "ZZZ", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
// Put a key which will conflict with the next txn using the previous snapshot
|
|
db->Put(write_options, handles[2], "foo", "000");
|
|
|
|
results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
|
|
multiget_keys, &values);
|
|
// All results should fail since there was a conflict
|
|
ASSERT_TRUE(results[0].IsBusy());
|
|
ASSERT_TRUE(results[1].IsBusy());
|
|
ASSERT_TRUE(results[2].IsBusy());
|
|
ASSERT_TRUE(results[3].IsBusy());
|
|
|
|
s = db->Get(read_options, handles[2], "foo", &value);
|
|
ASSERT_EQ(value, "000");
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->DropColumnFamily(handles[1]);
|
|
ASSERT_OK(s);
|
|
s = db->DropColumnFamily(handles[2]);
|
|
ASSERT_OK(s);
|
|
|
|
delete txn;
|
|
delete txn2;
|
|
|
|
for (auto handle : handles) {
|
|
delete handle;
|
|
}
|
|
}
|
|
|
|
TEST_F(TransactionTest, ColumnFamiliesTest2) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
ColumnFamilyHandle *one, *two;
|
|
ColumnFamilyOptions cf_options;
|
|
|
|
// Create 2 new column families
|
|
s = db->CreateColumnFamily(cf_options, "ONE", &one);
|
|
ASSERT_OK(s);
|
|
s = db->CreateColumnFamily(cf_options, "TWO", &two);
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn1 = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn1);
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
s = txn1->Put(one, "X", "1");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put(two, "X", "2");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put("X", "0");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put(one, "X", "11");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
// Drop first column family
|
|
s = db->DropColumnFamily(one);
|
|
ASSERT_OK(s);
|
|
|
|
// Should fail since column family was dropped.
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn1;
|
|
txn1 = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn1);
|
|
|
|
// Should fail since column family was dropped
|
|
s = txn1->Put(one, "X", "111");
|
|
ASSERT_TRUE(s.IsInvalidArgument());
|
|
|
|
s = txn1->Put(two, "X", "222");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("X", "000");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, two, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("222", value);
|
|
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("000", value);
|
|
|
|
s = db->DropColumnFamily(two);
|
|
ASSERT_OK(s);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
delete one;
|
|
delete two;
|
|
}
|
|
|
|
TEST_F(TransactionTest, EmptyTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
s = db->Put(write_options, "aaa", "aaa");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
txn->Rollback();
|
|
delete txn;
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
s = txn->GetForUpdate(read_options, "aaa", &value);
|
|
ASSERT_EQ(value, "aaa");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
txn->SetSnapshot();
|
|
|
|
s = txn->GetForUpdate(read_options, "aaa", &value);
|
|
ASSERT_EQ(value, "aaa");
|
|
|
|
// Conflicts with previous GetForUpdate
|
|
s = db->Put(write_options, "aaa", "xxx");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
// transaction expired!
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, PredicateManyPreceders) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options1, read_options2;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
txn_options.set_snapshot = true;
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
|
read_options1.snapshot = txn1->GetSnapshot();
|
|
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
txn2->SetSnapshot();
|
|
read_options2.snapshot = txn2->GetSnapshot();
|
|
|
|
std::vector<Slice> multiget_keys = {"1", "2", "3"};
|
|
std::vector<std::string> multiget_values;
|
|
|
|
std::vector<Status> results =
|
|
txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
|
|
ASSERT_TRUE(results[1].IsNotFound());
|
|
|
|
s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
txn2->Rollback();
|
|
|
|
multiget_values.clear();
|
|
results =
|
|
txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
|
|
ASSERT_TRUE(results[1].IsNotFound());
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
read_options1.snapshot = txn1->GetSnapshot();
|
|
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
read_options2.snapshot = txn2->GetSnapshot();
|
|
|
|
s = txn1->Put("4", "x");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Delete("4"); // conflict
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->GetForUpdate(read_options2, "4", &value);
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
txn2->Rollback();
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, LostUpdate) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, read_options1, read_options2;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
// Test 2 transactions writing to the same key in multiple orders and
|
|
// with/without snapshots
|
|
|
|
Transaction* txn1 = db->BeginTransaction(write_options);
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
|
|
s = txn1->Put("1", "1");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("1", "2"); // conflict
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "1", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("1", value);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
txn_options.set_snapshot = true;
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
read_options1.snapshot = txn1->GetSnapshot();
|
|
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
read_options2.snapshot = txn2->GetSnapshot();
|
|
|
|
s = txn1->Put("1", "3");
|
|
ASSERT_OK(s);
|
|
s = txn2->Put("1", "4"); // conflict
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "1", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("3", value);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
read_options1.snapshot = txn1->GetSnapshot();
|
|
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
read_options2.snapshot = txn2->GetSnapshot();
|
|
|
|
s = txn1->Put("1", "5");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("1", "6");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "1", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("5", value);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
read_options1.snapshot = txn1->GetSnapshot();
|
|
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
read_options2.snapshot = txn2->GetSnapshot();
|
|
|
|
s = txn1->Put("1", "7");
|
|
ASSERT_OK(s);
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
txn2->SetSnapshot();
|
|
s = txn2->Put("1", "8");
|
|
ASSERT_OK(s);
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "1", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("8", value);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
txn1 = db->BeginTransaction(write_options);
|
|
txn2 = db->BeginTransaction(write_options);
|
|
|
|
s = txn1->Put("1", "9");
|
|
ASSERT_OK(s);
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("1", "10");
|
|
ASSERT_OK(s);
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
|
|
s = db->Get(read_options, "1", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ(value, "10");
|
|
}
|
|
|
|
TEST_F(TransactionTest, UntrackedWrites) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
// Verify transaction rollback works for untracked keys.
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
txn->SetSnapshot();
|
|
|
|
s = txn->PutUntracked("untracked", "0");
|
|
ASSERT_OK(s);
|
|
txn->Rollback();
|
|
s = db->Get(read_options, "untracked", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn;
|
|
txn = db->BeginTransaction(write_options);
|
|
txn->SetSnapshot();
|
|
|
|
s = db->Put(write_options, "untracked", "x");
|
|
ASSERT_OK(s);
|
|
|
|
// Untracked writes should succeed even though key was written after snapshot
|
|
s = txn->PutUntracked("untracked", "1");
|
|
ASSERT_OK(s);
|
|
s = txn->MergeUntracked("untracked", "2");
|
|
ASSERT_OK(s);
|
|
s = txn->DeleteUntracked("untracked");
|
|
ASSERT_OK(s);
|
|
|
|
// Conflict
|
|
s = txn->Put("untracked", "3");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "untracked", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, ExpiredTransaction) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
// Set txn expiration timeout to 0 microseconds (expires instantly)
|
|
txn_options.expiration = 0;
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
|
|
|
s = txn1->Put("X", "1");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("Y", "1");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
|
|
// txn2 should be able to write to X since txn1 has expired
|
|
s = txn2->Put("X", "2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("2", value);
|
|
|
|
s = txn1->Put("Z", "1");
|
|
ASSERT_OK(s);
|
|
|
|
// txn1 should fail to commit since it is expired
|
|
s = txn1->Commit();
|
|
ASSERT_TRUE(s.IsExpired());
|
|
|
|
s = db->Get(read_options, "Y", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = db->Get(read_options, "Z", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, Rollback) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
|
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("X", "1");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
|
|
// txn2 should not be able to write to X since txn1 has it locked
|
|
s = txn2->Put("X", "2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
txn1->Rollback();
|
|
delete txn1;
|
|
|
|
// txn2 should now be able to write to X
|
|
s = txn2->Put("X", "3");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("3", value);
|
|
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, LockLimitTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
delete db;
|
|
|
|
// Open DB with a lock limit of 3
|
|
txn_db_options.max_num_locks = 3;
|
|
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
|
|
ASSERT_OK(s);
|
|
|
|
// Create a txn and verify we can only lock up to 3 keys
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->Put("X", "x");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("Y", "y");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("Z", "z");
|
|
ASSERT_OK(s);
|
|
|
|
// lock limit reached
|
|
s = txn->Put("W", "w");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
// re-locking same key shouldn't put us over the limit
|
|
s = txn->Put("X", "xx");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->GetForUpdate(read_options, "W", &value);
|
|
ASSERT_TRUE(s.IsBusy());
|
|
s = txn->GetForUpdate(read_options, "V", &value);
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
// re-locking same key shouldn't put us over the limit
|
|
s = txn->GetForUpdate(read_options, "Y", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("y", value);
|
|
|
|
s = txn->Get(read_options, "W", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
// "X" currently locked
|
|
s = txn2->Put("X", "x");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
// lock limit reached
|
|
s = txn2->Put("M", "m");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("xx", value);
|
|
|
|
s = db->Get(read_options, "W", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
// Committing txn should release its locks and allow txn2 to proceed
|
|
s = txn2->Put("X", "x2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Delete("X");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("M", "m");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("Z", "z2");
|
|
ASSERT_OK(s);
|
|
|
|
// lock limit reached
|
|
s = txn2->Delete("Y");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "Z", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("z2", value);
|
|
|
|
s = db->Get(read_options, "Y", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("y", value);
|
|
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn;
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, IteratorTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
// Write some keys to the db
|
|
s = db->Put(write_options, "A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "G", "g");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "F", "f");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "C", "c");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "D", "d");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
// Write some keys in a txn
|
|
s = txn->Put("B", "b");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("H", "h");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Delete("D");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("E", "e");
|
|
ASSERT_OK(s);
|
|
|
|
txn->SetSnapshot();
|
|
const Snapshot* snapshot = txn->GetSnapshot();
|
|
|
|
// Write some keys to the db after the snapshot
|
|
s = db->Put(write_options, "BB", "xx");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "C", "xx");
|
|
ASSERT_OK(s);
|
|
|
|
read_options.snapshot = snapshot;
|
|
Iterator* iter = txn->GetIterator(read_options);
|
|
ASSERT_OK(iter->status());
|
|
iter->SeekToFirst();
|
|
|
|
// Read all keys via iter and lock them all
|
|
std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
|
|
for (int i = 0; i < 7; i++) {
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(results[i], iter->value().ToString());
|
|
|
|
s = txn->GetForUpdate(read_options, iter->key(), nullptr);
|
|
if (i == 2) {
|
|
// "C" was modified after txn's snapshot
|
|
ASSERT_TRUE(s.IsBusy());
|
|
} else {
|
|
ASSERT_OK(s);
|
|
}
|
|
|
|
iter->Next();
|
|
}
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
iter->Seek("G");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("g", iter->value().ToString());
|
|
|
|
iter->Prev();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("f", iter->value().ToString());
|
|
|
|
iter->Seek("D");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("e", iter->value().ToString());
|
|
|
|
iter->Seek("C");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("c", iter->value().ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("e", iter->value().ToString());
|
|
|
|
iter->Seek("");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("a", iter->value().ToString());
|
|
|
|
iter->Seek("X");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
iter->SeekToLast();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("h", iter->value().ToString());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete iter;
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, DisableIndexingTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a", value);
|
|
|
|
txn->DisableIndexing();
|
|
|
|
s = txn->Put("B", "b");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "B", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
Iterator* iter = txn->GetIterator(read_options);
|
|
ASSERT_OK(iter->status());
|
|
|
|
iter->Seek("B");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
s = txn->Delete("A");
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a", value);
|
|
|
|
txn->EnableIndexing();
|
|
|
|
s = txn->Put("B", "bb");
|
|
ASSERT_OK(s);
|
|
|
|
iter->Seek("B");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ("bb", iter->value().ToString());
|
|
|
|
s = txn->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("bb", value);
|
|
|
|
s = txn->Put("A", "aa");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("aa", value);
|
|
|
|
delete iter;
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, SavepointTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
ASSERT_EQ(0, txn->GetNumPuts());
|
|
|
|
s = txn->RollbackToSavePoint();
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
txn->SetSavePoint(); // 1
|
|
|
|
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
|
|
s = txn->RollbackToSavePoint();
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Put("B", "b");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(1, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("b", value);
|
|
|
|
delete txn;
|
|
txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("B", "bb");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("C", "c");
|
|
ASSERT_OK(s);
|
|
|
|
txn->SetSavePoint(); // 2
|
|
|
|
s = txn->Delete("B");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("C", "cc");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("D", "d");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(5, txn->GetNumPuts());
|
|
ASSERT_EQ(1, txn->GetNumDeletes());
|
|
|
|
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
|
|
|
|
ASSERT_EQ(3, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a", value);
|
|
|
|
s = txn->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("bb", value);
|
|
|
|
s = txn->Get(read_options, "C", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("c", value);
|
|
|
|
s = txn->Get(read_options, "D", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("E", "e");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(5, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
// Rollback to beginning of txn
|
|
s = txn->RollbackToSavePoint();
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
txn->Rollback();
|
|
|
|
ASSERT_EQ(0, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("b", value);
|
|
|
|
s = txn->Get(read_options, "D", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Get(read_options, "D", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Get(read_options, "E", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Put("A", "aa");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("F", "f");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_EQ(2, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
txn->SetSavePoint(); // 3
|
|
txn->SetSavePoint(); // 4
|
|
|
|
s = txn->Put("G", "g");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->SingleDelete("F");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Delete("B");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("aa", value);
|
|
|
|
s = txn->Get(read_options, "F", &value);
|
|
// According to db.h, doing a SingleDelete on a key that has been
|
|
// overwritten will have undefinied behavior. So it is unclear what the
|
|
// result of fetching "F" should be. The current implementation will
|
|
// return NotFound in this case.
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Get(read_options, "B", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
ASSERT_EQ(3, txn->GetNumPuts());
|
|
ASSERT_EQ(2, txn->GetNumDeletes());
|
|
|
|
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
|
|
|
|
ASSERT_EQ(2, txn->GetNumPuts());
|
|
ASSERT_EQ(0, txn->GetNumDeletes());
|
|
|
|
s = txn->Get(read_options, "F", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("f", value);
|
|
|
|
s = txn->Get(read_options, "G", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "F", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("f", value);
|
|
|
|
s = db->Get(read_options, "G", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("aa", value);
|
|
|
|
s = db->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("b", value);
|
|
|
|
s = db->Get(read_options, "C", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = db->Get(read_options, "D", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = db->Get(read_options, "E", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, SavepointTest2) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
TransactionOptions txn_options;
|
|
string value;
|
|
Status s;
|
|
|
|
txn_options.lock_timeout = 1; // 1 ms
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn1);
|
|
|
|
s = txn1->Put("A", "");
|
|
ASSERT_OK(s);
|
|
|
|
txn1->SetSavePoint(); // 1
|
|
|
|
s = txn1->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("C", "c");
|
|
ASSERT_OK(s);
|
|
|
|
txn1->SetSavePoint(); // 2
|
|
|
|
s = txn1->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put("B", "b");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 2
|
|
|
|
// Verify that "A" and "C" is still locked while "B" is not
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
s = txn2->Put("A", "a2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Put("C", "c2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Put("B", "b2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("A", "aa");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put("B", "bb");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn2;
|
|
|
|
s = txn1->Put("A", "aaa");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put("B", "bbb");
|
|
ASSERT_OK(s);
|
|
s = txn1->Put("C", "ccc");
|
|
ASSERT_OK(s);
|
|
|
|
txn1->SetSavePoint(); // 3
|
|
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 3
|
|
|
|
// Verify that "A", "B", "C" are still locked
|
|
txn2 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
s = txn2->Put("A", "a2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Put("B", "b2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Put("C", "c2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
|
|
|
|
// Verify that only "A" is locked
|
|
s = txn2->Put("A", "a3");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Put("B", "b3");
|
|
ASSERT_OK(s);
|
|
s = txn2->Put("C", "c3po");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn1;
|
|
|
|
// Verify "A" "C" "B" are no longer locked
|
|
s = txn2->Put("A", "a4");
|
|
ASSERT_OK(s);
|
|
s = txn2->Put("B", "b4");
|
|
ASSERT_OK(s);
|
|
s = txn2->Put("C", "c4");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, TimeoutTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
delete db;
|
|
|
|
// transaction writes have an infinite timeout,
|
|
// but we will override this when we start a txn
|
|
// db writes have infinite timeout
|
|
txn_db_options.transaction_lock_timeout = -1;
|
|
txn_db_options.default_lock_timeout = -1;
|
|
|
|
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "aaa", "aaa");
|
|
ASSERT_OK(s);
|
|
|
|
TransactionOptions txn_options0;
|
|
txn_options0.expiration = 100; // 100ms
|
|
txn_options0.lock_timeout = 50; // txn timeout no longer infinite
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options0);
|
|
|
|
s = txn1->GetForUpdate(read_options, "aaa", nullptr);
|
|
ASSERT_OK(s);
|
|
|
|
// Conflicts with previous GetForUpdate.
|
|
// Since db writes do not have a timeout, this should eventually succeed when
|
|
// the transaction expires.
|
|
s = db->Put(write_options, "aaa", "xxx");
|
|
ASSERT_OK(s);
|
|
|
|
ASSERT_GE(txn1->GetElapsedTime(),
|
|
static_cast<uint64_t>(txn_options0.expiration));
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_TRUE(s.IsExpired()); // expired!
|
|
|
|
s = db->Get(read_options, "aaa", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("xxx", value);
|
|
|
|
delete txn1;
|
|
delete db;
|
|
|
|
// transaction writes have 10ms timeout,
|
|
// db writes have infinite timeout
|
|
txn_db_options.transaction_lock_timeout = 50;
|
|
txn_db_options.default_lock_timeout = -1;
|
|
|
|
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "aaa", "aaa");
|
|
ASSERT_OK(s);
|
|
|
|
TransactionOptions txn_options;
|
|
txn_options.expiration = 100; // 100ms
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
|
|
s = txn1->GetForUpdate(read_options, "aaa", nullptr);
|
|
ASSERT_OK(s);
|
|
|
|
// Conflicts with previous GetForUpdate.
|
|
// Since db writes do not have a timeout, this should eventually succeed when
|
|
// the transaction expires.
|
|
s = db->Put(write_options, "aaa", "xxx");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_NOK(s); // expired!
|
|
|
|
s = db->Get(read_options, "aaa", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("xxx", value);
|
|
|
|
delete txn1;
|
|
txn_options.expiration = 6000000; // 100 minutes
|
|
txn_options.lock_timeout = 1; // 1ms
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
txn1->SetLockTimeout(100);
|
|
|
|
TransactionOptions txn_options2;
|
|
txn_options2.expiration = 10; // 10ms
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options2);
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("a", "2");
|
|
ASSERT_OK(s);
|
|
|
|
// txn1 has a lock timeout longer than txn2's expiration, so it will win
|
|
s = txn1->Delete("a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
// txn2 should be expired out since txn1 waiting until its timeout expired.
|
|
s = txn2->Commit();
|
|
ASSERT_TRUE(s.IsExpired());
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
txn_options.expiration = 6000000; // 100 minutes
|
|
txn1 = db->BeginTransaction(write_options, txn_options);
|
|
txn_options2.expiration = 100000000;
|
|
txn2 = db->BeginTransaction(write_options, txn_options2);
|
|
|
|
s = txn1->Delete("asdf");
|
|
ASSERT_OK(s);
|
|
|
|
// txn2 has a smaller lock timeout than txn1's expiration, so it will time out
|
|
s = txn2->Delete("asdf");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Put("asdf", "asdf");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Get(read_options, "asdf", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("asdf", value);
|
|
|
|
delete txn1;
|
|
delete txn2;
|
|
}
|
|
|
|
TEST_F(TransactionTest, SingleDeleteTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = txn->SingleDelete("A");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
|
|
s = txn->SingleDelete("A");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a", value);
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a", value);
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
|
|
s = txn->SingleDelete("A");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
txn = db->BeginTransaction(write_options);
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
txn2->SetSnapshot();
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Put("A", "a2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->SingleDelete("A");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->SingleDelete("B");
|
|
ASSERT_OK(s);
|
|
|
|
// According to db.h, doing a SingleDelete on a key that has been
|
|
// overwritten will have undefinied behavior. So it is unclear what the
|
|
// result of fetching "A" should be. The current implementation will
|
|
// return NotFound in this case.
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn2->Put("B", "b");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn2;
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
// According to db.h, doing a SingleDelete on a key that has been
|
|
// overwritten will have undefinied behavior. So it is unclear what the
|
|
// result of fetching "A" should be. The current implementation will
|
|
// return NotFound in this case.
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = db->Get(read_options, "B", &value);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
}
|
|
|
|
TEST_F(TransactionTest, MergeTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = db->Put(write_options, "A", "a0");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Merge("A", "1");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Merge("A", "2");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsMergeInProgress());
|
|
|
|
s = txn->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a", value);
|
|
|
|
s = txn->Merge("A", "3");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn->Get(read_options, "A", &value);
|
|
ASSERT_TRUE(s.IsMergeInProgress());
|
|
|
|
TransactionOptions txn_options;
|
|
txn_options.lock_timeout = 1; // 1 ms
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_TRUE(txn2);
|
|
|
|
// verify that txn has "A" locked
|
|
s = txn2->Merge("A", "4");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn2;
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn;
|
|
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a,3", value);
|
|
}
|
|
|
|
TEST_F(TransactionTest, DeferSnapshotTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
s = db->Put(write_options, "A", "a0");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn1 = db->BeginTransaction(write_options);
|
|
Transaction* txn2 = db->BeginTransaction(write_options);
|
|
|
|
txn1->SetSnapshotOnNextOperation();
|
|
auto snapshot = txn1->GetSnapshot();
|
|
ASSERT_FALSE(snapshot);
|
|
|
|
s = txn2->Put("A", "a2");
|
|
ASSERT_OK(s);
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn2;
|
|
|
|
s = txn1->GetForUpdate(read_options, "A", &value);
|
|
// Should not conflict with txn2 since snapshot wasn't set until
|
|
// GetForUpdate was called.
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a2", value);
|
|
|
|
s = txn1->Put("A", "a1");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "B", "b0");
|
|
ASSERT_OK(s);
|
|
|
|
// Cannot lock B since it was written after the snapshot was set
|
|
s = txn1->Put("B", "b1");
|
|
ASSERT_TRUE(s.IsBusy());
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn1;
|
|
|
|
s = db->Get(read_options, "A", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("a1", value);
|
|
|
|
s = db->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("b0", value);
|
|
}
|
|
|
|
TEST_F(TransactionTest, DeferSnapshotTest2) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn1 = db->BeginTransaction(write_options);
|
|
|
|
txn1->SetSnapshot();
|
|
|
|
s = txn1->Put("A", "a1");
|
|
ASSERT_OK(s);
|
|
|
|
s = db->Put(write_options, "C", "c0");
|
|
ASSERT_OK(s);
|
|
s = db->Put(write_options, "D", "d0");
|
|
ASSERT_OK(s);
|
|
|
|
snapshot_read_options.snapshot = txn1->GetSnapshot();
|
|
|
|
txn1->SetSnapshotOnNextOperation();
|
|
|
|
s = txn1->Get(snapshot_read_options, "C", &value);
|
|
// Snapshot was set before C was written
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
s = txn1->Get(snapshot_read_options, "D", &value);
|
|
// Snapshot was set before D was written
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
// Snapshot should not have changed yet.
|
|
snapshot_read_options.snapshot = txn1->GetSnapshot();
|
|
|
|
s = txn1->Get(snapshot_read_options, "C", &value);
|
|
// Snapshot was set before C was written
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
s = txn1->Get(snapshot_read_options, "D", &value);
|
|
// Snapshot was set before D was written
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
s = txn1->GetForUpdate(read_options, "C", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("c0", value);
|
|
|
|
s = db->Put(write_options, "D", "d00");
|
|
ASSERT_OK(s);
|
|
|
|
// Snapshot is now set
|
|
snapshot_read_options.snapshot = txn1->GetSnapshot();
|
|
s = txn1->Get(snapshot_read_options, "D", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("d0", value);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn1;
|
|
}
|
|
|
|
TEST_F(TransactionTest, DeferSnapshotSavePointTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
Transaction* txn1 = db->BeginTransaction(write_options);
|
|
|
|
txn1->SetSavePoint(); // 1
|
|
|
|
s = db->Put(write_options, "T", "1");
|
|
ASSERT_OK(s);
|
|
|
|
txn1->SetSnapshotOnNextOperation();
|
|
|
|
s = db->Put(write_options, "T", "2");
|
|
ASSERT_OK(s);
|
|
|
|
txn1->SetSavePoint(); // 2
|
|
|
|
s = db->Put(write_options, "T", "3");
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Put("A", "a");
|
|
ASSERT_OK(s);
|
|
|
|
txn1->SetSavePoint(); // 3
|
|
|
|
s = db->Put(write_options, "T", "4");
|
|
ASSERT_OK(s);
|
|
|
|
txn1->SetSnapshot();
|
|
txn1->SetSnapshotOnNextOperation();
|
|
|
|
txn1->SetSavePoint(); // 4
|
|
|
|
s = db->Put(write_options, "T", "5");
|
|
ASSERT_OK(s);
|
|
|
|
snapshot_read_options.snapshot = txn1->GetSnapshot();
|
|
s = txn1->Get(snapshot_read_options, "T", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("4", value);
|
|
|
|
s = txn1->Put("A", "a1");
|
|
ASSERT_OK(s);
|
|
|
|
snapshot_read_options.snapshot = txn1->GetSnapshot();
|
|
s = txn1->Get(snapshot_read_options, "T", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("5", value);
|
|
|
|
s = txn1->RollbackToSavePoint(); // Rollback to 4
|
|
ASSERT_OK(s);
|
|
|
|
snapshot_read_options.snapshot = txn1->GetSnapshot();
|
|
s = txn1->Get(snapshot_read_options, "T", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("4", value);
|
|
|
|
s = txn1->RollbackToSavePoint(); // Rollback to 3
|
|
ASSERT_OK(s);
|
|
|
|
snapshot_read_options.snapshot = txn1->GetSnapshot();
|
|
s = txn1->Get(snapshot_read_options, "T", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("3", value);
|
|
|
|
s = txn1->Get(read_options, "T", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("5", value);
|
|
|
|
s = txn1->RollbackToSavePoint(); // Rollback to 2
|
|
ASSERT_OK(s);
|
|
|
|
snapshot_read_options.snapshot = txn1->GetSnapshot();
|
|
ASSERT_FALSE(snapshot_read_options.snapshot);
|
|
s = txn1->Get(snapshot_read_options, "T", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("5", value);
|
|
|
|
s = txn1->Delete("A");
|
|
ASSERT_OK(s);
|
|
|
|
snapshot_read_options.snapshot = txn1->GetSnapshot();
|
|
ASSERT_TRUE(snapshot_read_options.snapshot);
|
|
s = txn1->Get(snapshot_read_options, "T", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("5", value);
|
|
|
|
s = txn1->RollbackToSavePoint(); // Rollback to 1
|
|
ASSERT_OK(s);
|
|
|
|
s = txn1->Delete("A");
|
|
ASSERT_OK(s);
|
|
|
|
snapshot_read_options.snapshot = txn1->GetSnapshot();
|
|
ASSERT_FALSE(snapshot_read_options.snapshot);
|
|
s = txn1->Get(snapshot_read_options, "T", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("5", value);
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn1;
|
|
}
|
|
|
|
TEST_F(TransactionTest, SetSnapshotOnNextOperationWithNotification) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options;
|
|
string value;
|
|
|
|
class Notifier : public TransactionNotifier {
|
|
private:
|
|
const Snapshot** snapshot_ptr_;
|
|
|
|
public:
|
|
explicit Notifier(const Snapshot** snapshot_ptr)
|
|
: snapshot_ptr_(snapshot_ptr) {}
|
|
|
|
void SnapshotCreated(const Snapshot* newSnapshot) {
|
|
*snapshot_ptr_ = newSnapshot;
|
|
}
|
|
};
|
|
|
|
std::shared_ptr<Notifier> notifier =
|
|
std::make_shared<Notifier>(&read_options.snapshot);
|
|
Status s;
|
|
|
|
s = db->Put(write_options, "B", "0");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn1 = db->BeginTransaction(write_options);
|
|
|
|
txn1->SetSnapshotOnNextOperation(notifier);
|
|
ASSERT_FALSE(read_options.snapshot);
|
|
|
|
s = db->Put(write_options, "B", "1");
|
|
ASSERT_OK(s);
|
|
|
|
// A Get does not generate the snapshot
|
|
s = txn1->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_FALSE(read_options.snapshot);
|
|
ASSERT_EQ(value, "1");
|
|
|
|
// Any other operation does
|
|
s = txn1->Put("A", "0");
|
|
ASSERT_OK(s);
|
|
|
|
// Now change "B".
|
|
s = db->Put(write_options, "B", "2");
|
|
ASSERT_OK(s);
|
|
|
|
// The original value should still be read
|
|
s = txn1->Get(read_options, "B", &value);
|
|
ASSERT_OK(s);
|
|
ASSERT_TRUE(read_options.snapshot);
|
|
ASSERT_EQ(value, "1");
|
|
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn1;
|
|
}
|
|
|
|
TEST_F(TransactionTest, ClearSnapshotTest) {
|
|
WriteOptions write_options;
|
|
ReadOptions read_options, snapshot_read_options;
|
|
string value;
|
|
Status s;
|
|
|
|
s = db->Put(write_options, "foo", "0");
|
|
ASSERT_OK(s);
|
|
|
|
Transaction* txn = db->BeginTransaction(write_options);
|
|
ASSERT_TRUE(txn);
|
|
|
|
s = db->Put(write_options, "foo", "1");
|
|
ASSERT_OK(s);
|
|
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
ASSERT_FALSE(snapshot_read_options.snapshot);
|
|
|
|
// No snapshot created yet
|
|
s = txn->Get(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "1");
|
|
|
|
txn->SetSnapshot();
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
ASSERT_TRUE(snapshot_read_options.snapshot);
|
|
|
|
s = db->Put(write_options, "foo", "2");
|
|
ASSERT_OK(s);
|
|
|
|
// Snapshot was created before change to '2'
|
|
s = txn->Get(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "1");
|
|
|
|
txn->ClearSnapshot();
|
|
snapshot_read_options.snapshot = txn->GetSnapshot();
|
|
ASSERT_FALSE(snapshot_read_options.snapshot);
|
|
|
|
// Snapshot has now been cleared
|
|
s = txn->Get(snapshot_read_options, "foo", &value);
|
|
ASSERT_EQ(value, "2");
|
|
|
|
s = txn->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
delete txn;
|
|
}
|
|
|
|
TEST_F(TransactionTest, ToggleAutoCompactionTest) {
|
|
Status s;
|
|
|
|
TransactionOptions txn_options;
|
|
ColumnFamilyHandle *cfa, *cfb;
|
|
ColumnFamilyOptions cf_options;
|
|
|
|
// Create 2 new column families
|
|
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
|
|
ASSERT_OK(s);
|
|
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
|
|
ASSERT_OK(s);
|
|
|
|
delete cfa;
|
|
delete cfb;
|
|
delete db;
|
|
|
|
// open DB with three column families
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
// have to open default column family
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
|
|
// open the new column families
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
|
|
|
|
ColumnFamilyOptions* cf_opt_default = &column_families[0].options;
|
|
ColumnFamilyOptions* cf_opt_cfa = &column_families[1].options;
|
|
ColumnFamilyOptions* cf_opt_cfb = &column_families[2].options;
|
|
cf_opt_default->disable_auto_compactions = false;
|
|
cf_opt_cfa->disable_auto_compactions = true;
|
|
cf_opt_cfb->disable_auto_compactions = false;
|
|
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
|
|
s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
|
|
&handles, &db);
|
|
ASSERT_OK(s);
|
|
|
|
auto cfh_default = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[0]);
|
|
auto opt_default = *cfh_default->cfd()->GetLatestMutableCFOptions();
|
|
|
|
auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[1]);
|
|
auto opt_a = *cfh_a->cfd()->GetLatestMutableCFOptions();
|
|
|
|
auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[2]);
|
|
auto opt_b = *cfh_b->cfd()->GetLatestMutableCFOptions();
|
|
|
|
ASSERT_EQ(opt_default.disable_auto_compactions, false);
|
|
ASSERT_EQ(opt_a.disable_auto_compactions, true);
|
|
ASSERT_EQ(opt_b.disable_auto_compactions, false);
|
|
|
|
for (auto handle : handles) {
|
|
delete handle;
|
|
}
|
|
}
|
|
|
|
TEST_F(TransactionTest, ExpiredTransactionDataRace1) {
|
|
// In this test, txn1 should succeed committing,
|
|
// as the callback is called after txn1 starts committing.
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
|
{{"TransactionTest::ExpirableTransactionDataRace:1"}});
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"TransactionTest::ExpirableTransactionDataRace:1", [&](void* arg) {
|
|
WriteOptions write_options;
|
|
TransactionOptions txn_options;
|
|
|
|
// Force txn1 to expire
|
|
/* sleep override */
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(150));
|
|
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
|
Status s;
|
|
s = txn2->Put("X", "2");
|
|
ASSERT_TRUE(s.IsTimedOut());
|
|
s = txn2->Commit();
|
|
ASSERT_OK(s);
|
|
delete txn2;
|
|
});
|
|
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
WriteOptions write_options;
|
|
TransactionOptions txn_options;
|
|
|
|
txn_options.expiration = 100;
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
|
|
|
Status s;
|
|
s = txn1->Put("X", "1");
|
|
ASSERT_OK(s);
|
|
s = txn1->Commit();
|
|
ASSERT_OK(s);
|
|
|
|
ReadOptions read_options;
|
|
string value;
|
|
s = db->Get(read_options, "X", &value);
|
|
ASSERT_EQ("1", value);
|
|
|
|
delete txn1;
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|
|
|
|
#else
|
|
#include <stdio.h>
|
|
|
|
int main(int argc, char** argv) {
|
|
fprintf(stderr,
|
|
"SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
|
|
return 0;
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|