3f5282268f
Summary: TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. This could be as an optimization if the application knows that the transaction would not have any conflict with concurrent transactions. It is currently used during recovery assuming (i) application guarantees no conflict between prepared transactions in the WAL (ii) application guarantees that recovered transactions will be rolled back/commit before new transactions start. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4346 Differential Revision: D9759149 Pulled By: maysamyabandeh fbshipit-source-id: f896e84fa58b0b584be904c7fd3883a41ea3215b
2069 lines
79 KiB
C++
2069 lines
79 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#ifndef __STDC_FORMAT_MACROS
|
|
#define __STDC_FORMAT_MACROS
|
|
#endif
|
|
|
|
#include "utilities/transactions/transaction_test.h"
|
|
|
|
#include <inttypes.h>
|
|
#include <algorithm>
|
|
#include <functional>
|
|
#include <string>
|
|
#include <thread>
|
|
|
|
#include "db/db_impl.h"
|
|
#include "db/dbformat.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/types.h"
|
|
#include "rocksdb/utilities/debug.h"
|
|
#include "rocksdb/utilities/transaction.h"
|
|
#include "rocksdb/utilities/transaction_db.h"
|
|
#include "table/mock_table.h"
|
|
#include "util/fault_injection_test_env.h"
|
|
#include "util/random.h"
|
|
#include "util/string_util.h"
|
|
#include "util/sync_point.h"
|
|
#include "util/testharness.h"
|
|
#include "util/testutil.h"
|
|
#include "util/transaction_test_util.h"
|
|
#include "utilities/merge_operators.h"
|
|
#include "utilities/merge_operators/string_append/stringappend.h"
|
|
#include "utilities/transactions/pessimistic_transaction_db.h"
|
|
#include "utilities/transactions/write_prepared_txn_db.h"
|
|
|
|
#include "port/port.h"
|
|
|
|
using std::string;
|
|
|
|
namespace rocksdb {
|
|
|
|
using CommitEntry = WritePreparedTxnDB::CommitEntry;
|
|
using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b;
|
|
using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
|
|
|
|
TEST(PreparedHeap, BasicsTest) {
|
|
WritePreparedTxnDB::PreparedHeap heap;
|
|
heap.push(14l);
|
|
// Test with one element
|
|
ASSERT_EQ(14l, heap.top());
|
|
heap.push(24l);
|
|
heap.push(34l);
|
|
// Test that old min is still on top
|
|
ASSERT_EQ(14l, heap.top());
|
|
heap.push(13l);
|
|
// Test that the new min will be on top
|
|
ASSERT_EQ(13l, heap.top());
|
|
// Test that it is persistent
|
|
ASSERT_EQ(13l, heap.top());
|
|
heap.push(44l);
|
|
heap.push(54l);
|
|
heap.push(64l);
|
|
heap.push(74l);
|
|
heap.push(84l);
|
|
// Test that old min is still on top
|
|
ASSERT_EQ(13l, heap.top());
|
|
heap.erase(24l);
|
|
// Test that old min is still on top
|
|
ASSERT_EQ(13l, heap.top());
|
|
heap.erase(14l);
|
|
// Test that old min is still on top
|
|
ASSERT_EQ(13l, heap.top());
|
|
heap.erase(13l);
|
|
// Test that the new comes to the top after multiple erase
|
|
ASSERT_EQ(34l, heap.top());
|
|
heap.erase(34l);
|
|
// Test that the new comes to the top after single erase
|
|
ASSERT_EQ(44l, heap.top());
|
|
heap.erase(54l);
|
|
ASSERT_EQ(44l, heap.top());
|
|
heap.pop(); // pop 44l
|
|
// Test that the erased items are ignored after pop
|
|
ASSERT_EQ(64l, heap.top());
|
|
heap.erase(44l);
|
|
// Test that erasing an already popped item would work
|
|
ASSERT_EQ(64l, heap.top());
|
|
heap.erase(84l);
|
|
ASSERT_EQ(64l, heap.top());
|
|
heap.push(85l);
|
|
heap.push(86l);
|
|
heap.push(87l);
|
|
heap.push(88l);
|
|
heap.push(89l);
|
|
heap.erase(87l);
|
|
heap.erase(85l);
|
|
heap.erase(89l);
|
|
heap.erase(86l);
|
|
heap.erase(88l);
|
|
// Test top remains the same after a random order of many erases
|
|
ASSERT_EQ(64l, heap.top());
|
|
heap.pop();
|
|
// Test that pop works with a series of random pending erases
|
|
ASSERT_EQ(74l, heap.top());
|
|
ASSERT_FALSE(heap.empty());
|
|
heap.pop();
|
|
// Test that empty works
|
|
ASSERT_TRUE(heap.empty());
|
|
}
|
|
|
|
// This is a scenario reconstructed from a buggy trace. Test that the bug does
|
|
// not resurface again.
|
|
TEST(PreparedHeap, EmptyAtTheEnd) {
|
|
WritePreparedTxnDB::PreparedHeap heap;
|
|
heap.push(40l);
|
|
ASSERT_EQ(40l, heap.top());
|
|
// Although not a recommended scenario, we must be resilient against erase
|
|
// without a prior push.
|
|
heap.erase(50l);
|
|
ASSERT_EQ(40l, heap.top());
|
|
heap.push(60l);
|
|
ASSERT_EQ(40l, heap.top());
|
|
|
|
heap.erase(60l);
|
|
ASSERT_EQ(40l, heap.top());
|
|
heap.erase(40l);
|
|
ASSERT_TRUE(heap.empty());
|
|
|
|
heap.push(40l);
|
|
ASSERT_EQ(40l, heap.top());
|
|
heap.erase(50l);
|
|
ASSERT_EQ(40l, heap.top());
|
|
heap.push(60l);
|
|
ASSERT_EQ(40l, heap.top());
|
|
|
|
heap.erase(40l);
|
|
// Test that the erase has not emptied the heap (we had a bug doing that)
|
|
ASSERT_FALSE(heap.empty());
|
|
ASSERT_EQ(60l, heap.top());
|
|
heap.erase(60l);
|
|
ASSERT_TRUE(heap.empty());
|
|
}
|
|
|
|
// Generate random order of PreparedHeap access and test that the heap will be
|
|
// successfully emptied at the end.
|
|
TEST(PreparedHeap, Concurrent) {
|
|
const size_t t_cnt = 10;
|
|
rocksdb::port::Thread t[t_cnt];
|
|
Random rnd(1103);
|
|
WritePreparedTxnDB::PreparedHeap heap;
|
|
port::RWMutex prepared_mutex;
|
|
|
|
for (size_t n = 0; n < 100; n++) {
|
|
for (size_t i = 0; i < t_cnt; i++) {
|
|
// This is not recommended usage but we should be resilient against it.
|
|
bool skip_push = rnd.OneIn(5);
|
|
t[i] = rocksdb::port::Thread([&heap, &prepared_mutex, skip_push, i]() {
|
|
auto seq = i;
|
|
std::this_thread::yield();
|
|
if (!skip_push) {
|
|
WriteLock wl(&prepared_mutex);
|
|
heap.push(seq);
|
|
}
|
|
std::this_thread::yield();
|
|
{
|
|
WriteLock wl(&prepared_mutex);
|
|
heap.erase(seq);
|
|
}
|
|
});
|
|
}
|
|
for (size_t i = 0; i < t_cnt; i++) {
|
|
t[i].join();
|
|
}
|
|
ASSERT_TRUE(heap.empty());
|
|
}
|
|
}
|
|
|
|
// Test that WriteBatchWithIndex correctly counts the number of sub-batches
|
|
TEST(WriteBatchWithIndex, SubBatchCnt) {
|
|
ColumnFamilyOptions cf_options;
|
|
std::string cf_name = "two";
|
|
DB* db;
|
|
Options options;
|
|
options.create_if_missing = true;
|
|
const std::string dbname = test::PerThreadDBPath("transaction_testdb");
|
|
DestroyDB(dbname, options);
|
|
ASSERT_OK(DB::Open(options, dbname, &db));
|
|
ColumnFamilyHandle* cf_handle = nullptr;
|
|
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
|
|
WriteOptions write_options;
|
|
size_t batch_cnt = 1;
|
|
size_t save_points = 0;
|
|
std::vector<size_t> batch_cnt_at;
|
|
WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true,
|
|
0);
|
|
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
|
|
batch_cnt_at.push_back(batch_cnt);
|
|
batch.SetSavePoint();
|
|
save_points++;
|
|
batch.Put(Slice("key"), Slice("value"));
|
|
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
|
|
batch_cnt_at.push_back(batch_cnt);
|
|
batch.SetSavePoint();
|
|
save_points++;
|
|
batch.Put(Slice("key2"), Slice("value2"));
|
|
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
|
|
// duplicate the keys
|
|
batch_cnt_at.push_back(batch_cnt);
|
|
batch.SetSavePoint();
|
|
save_points++;
|
|
batch.Put(Slice("key"), Slice("value3"));
|
|
batch_cnt++;
|
|
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
|
|
// duplicate the 2nd key. It should not be counted duplicate since a
|
|
// sub-patch is cut after the last duplicate.
|
|
batch_cnt_at.push_back(batch_cnt);
|
|
batch.SetSavePoint();
|
|
save_points++;
|
|
batch.Put(Slice("key2"), Slice("value4"));
|
|
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
|
|
// duplicate the keys but in a different cf. It should not be counted as
|
|
// duplicate keys
|
|
batch_cnt_at.push_back(batch_cnt);
|
|
batch.SetSavePoint();
|
|
save_points++;
|
|
batch.Put(cf_handle, Slice("key"), Slice("value5"));
|
|
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
|
|
|
|
// Test that the number of sub-batches matches what we count with
|
|
// SubBatchCounter
|
|
std::map<uint32_t, const Comparator*> comparators;
|
|
comparators[0] = db->DefaultColumnFamily()->GetComparator();
|
|
comparators[cf_handle->GetID()] = cf_handle->GetComparator();
|
|
SubBatchCounter counter(comparators);
|
|
ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter));
|
|
ASSERT_EQ(batch_cnt, counter.BatchCount());
|
|
|
|
// Test that RollbackToSavePoint will properly resets the number of
|
|
// sub-batches
|
|
for (size_t i = save_points; i > 0; i--) {
|
|
batch.RollbackToSavePoint();
|
|
ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
|
|
}
|
|
|
|
// Test the count is right with random batches
|
|
{
|
|
const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms
|
|
Random rnd(1131);
|
|
std::string keys[TOTAL_KEYS];
|
|
for (size_t k = 0; k < TOTAL_KEYS; k++) {
|
|
int len = static_cast<int>(rnd.Uniform(50));
|
|
keys[k] = test::RandomKey(&rnd, len);
|
|
}
|
|
for (size_t i = 0; i < 1000; i++) { // 1000 random batches
|
|
WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(),
|
|
0, true, 0);
|
|
for (size_t k = 0; k < 10; k++) { // 10 key per batch
|
|
size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS));
|
|
Slice key = Slice(keys[ki]);
|
|
std::string buffer;
|
|
Slice value = Slice(test::RandomString(&rnd, 16, &buffer));
|
|
rndbatch.Put(key, value);
|
|
}
|
|
SubBatchCounter batch_counter(comparators);
|
|
ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
|
|
ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount());
|
|
}
|
|
}
|
|
|
|
delete cf_handle;
|
|
delete db;
|
|
}
|
|
|
|
TEST(CommitEntry64b, BasicTest) {
|
|
const size_t INDEX_BITS = static_cast<size_t>(21);
|
|
const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
|
|
const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS));
|
|
|
|
// zero-initialized CommitEntry64b should indicate an empty entry
|
|
CommitEntry64b empty_entry64b;
|
|
uint64_t empty_index = 11ul;
|
|
CommitEntry empty_entry;
|
|
bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT);
|
|
ASSERT_FALSE(ok);
|
|
|
|
// the zero entry is reserved for un-initialized entries
|
|
const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1;
|
|
// Samples over the numbers that are covered by that many index bits
|
|
std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}};
|
|
// Samples over the numbers that are covered by that many commit bits
|
|
std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}};
|
|
// Iterate over prepare numbers that have i) cover all bits of a sequence
|
|
// number, and ii) include some bits that fall into the range of index or
|
|
// commit bits
|
|
for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) {
|
|
for (uint64_t i : is) {
|
|
for (uint64_t d : ds) {
|
|
uint64_t p = base + i + d;
|
|
for (uint64_t c : {p, p + d / 2, p + d}) {
|
|
uint64_t index = p % INDEX_SIZE;
|
|
CommitEntry before(p, c), after;
|
|
CommitEntry64b entry64b(before, FORMAT);
|
|
ok = entry64b.Parse(index, &after, FORMAT);
|
|
ASSERT_TRUE(ok);
|
|
if (!(before == after)) {
|
|
printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64
|
|
" c %" PRIu64 " index %" PRIu64 "\n",
|
|
base, i, d, p, c, index);
|
|
}
|
|
ASSERT_EQ(before, after);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
class WritePreparedTxnDBMock : public WritePreparedTxnDB {
|
|
public:
|
|
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
|
|
: WritePreparedTxnDB(db_impl, opt) {}
|
|
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
|
|
size_t snapshot_cache_size)
|
|
: WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {}
|
|
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
|
|
size_t snapshot_cache_size, size_t commit_cache_size)
|
|
: WritePreparedTxnDB(db_impl, opt, snapshot_cache_size,
|
|
commit_cache_size) {}
|
|
void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
|
|
snapshots_ = snapshots;
|
|
}
|
|
void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
|
|
|
|
protected:
|
|
virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
|
|
SequenceNumber /* unused */) override {
|
|
return snapshots_;
|
|
}
|
|
|
|
private:
|
|
std::vector<SequenceNumber> snapshots_;
|
|
};
|
|
|
|
class WritePreparedTransactionTestBase : public TransactionTestBase {
|
|
public:
|
|
WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue,
|
|
TxnDBWritePolicy write_policy)
|
|
: TransactionTestBase(use_stackable_db, two_write_queue, write_policy){};
|
|
|
|
protected:
|
|
// If expect_update is set, check if it actually updated old_commit_map_. If
|
|
// it did not and yet suggested not to check the next snapshot, do the
|
|
// opposite to check if it was not a bad suggestion.
|
|
void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit,
|
|
uint64_t snapshot,
|
|
uint64_t next_snapshot,
|
|
bool expect_update) {
|
|
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
// reset old_commit_map_empty_ so that its value indicate whether
|
|
// old_commit_map_ was updated
|
|
wp_db->old_commit_map_empty_ = true;
|
|
bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot,
|
|
snapshot < next_snapshot);
|
|
if (expect_update == wp_db->old_commit_map_empty_) {
|
|
printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
|
|
" next: %" PRIu64 "\n",
|
|
prepare, commit, snapshot, next_snapshot);
|
|
}
|
|
EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_);
|
|
if (!check_next && wp_db->old_commit_map_empty_) {
|
|
// do the opposite to make sure it was not a bad suggestion
|
|
const bool dont_care_bool = true;
|
|
wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot,
|
|
dont_care_bool);
|
|
if (!wp_db->old_commit_map_empty_) {
|
|
printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
|
|
" next: %" PRIu64 "\n",
|
|
prepare, commit, snapshot, next_snapshot);
|
|
}
|
|
EXPECT_TRUE(wp_db->old_commit_map_empty_);
|
|
}
|
|
}
|
|
|
|
// Test that a CheckAgainstSnapshots thread reading old_snapshots will not
|
|
// miss a snapshot because of a concurrent update by UpdateSnapshots that is
|
|
// writing new_snapshots. Both threads are broken at two points. The sync
|
|
// points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
|
|
// entry is expected to be vital for one of the snapshots that is common
|
|
// between the old and new list of snapshots.
|
|
void SnapshotConcurrentAccessTestInternal(
|
|
WritePreparedTxnDB* wp_db,
|
|
const std::vector<SequenceNumber>& old_snapshots,
|
|
const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry,
|
|
SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) {
|
|
// First reset the snapshot list
|
|
const std::vector<SequenceNumber> empty_snapshots;
|
|
wp_db->old_commit_map_empty_ = true;
|
|
wp_db->UpdateSnapshots(empty_snapshots, ++version);
|
|
// Then initialize it with the old_snapshots
|
|
wp_db->UpdateSnapshots(old_snapshots, ++version);
|
|
|
|
// Starting from the first thread, cut each thread at two points
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
|
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1),
|
|
"WritePreparedTxnDB::UpdateSnapshots:s:start"},
|
|
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1),
|
|
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)},
|
|
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2),
|
|
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)},
|
|
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2),
|
|
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)},
|
|
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
|
|
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)},
|
|
});
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
{
|
|
ASSERT_TRUE(wp_db->old_commit_map_empty_);
|
|
rocksdb::port::Thread t1(
|
|
[&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
|
|
rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); });
|
|
t1.join();
|
|
t2.join();
|
|
ASSERT_FALSE(wp_db->old_commit_map_empty_);
|
|
}
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
|
|
wp_db->old_commit_map_empty_ = true;
|
|
wp_db->UpdateSnapshots(empty_snapshots, ++version);
|
|
wp_db->UpdateSnapshots(old_snapshots, ++version);
|
|
// Starting from the second thread, cut each thread at two points
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
|
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1),
|
|
"WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
|
|
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1),
|
|
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)},
|
|
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2),
|
|
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)},
|
|
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2),
|
|
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)},
|
|
{"WritePreparedTxnDB::UpdateSnapshots:p:end",
|
|
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)},
|
|
});
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
{
|
|
ASSERT_TRUE(wp_db->old_commit_map_empty_);
|
|
rocksdb::port::Thread t1(
|
|
[&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
|
|
rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); });
|
|
t1.join();
|
|
t2.join();
|
|
ASSERT_FALSE(wp_db->old_commit_map_empty_);
|
|
}
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
// Verify value of keys.
|
|
void VerifyKeys(const std::unordered_map<std::string, std::string>& data,
|
|
const Snapshot* snapshot = nullptr) {
|
|
std::string value;
|
|
ReadOptions read_options;
|
|
read_options.snapshot = snapshot;
|
|
for (auto& kv : data) {
|
|
auto s = db->Get(read_options, kv.first, &value);
|
|
ASSERT_TRUE(s.ok() || s.IsNotFound());
|
|
if (s.ok()) {
|
|
if (kv.second != value) {
|
|
printf("key = %s\n", kv.first.c_str());
|
|
}
|
|
ASSERT_EQ(kv.second, value);
|
|
} else {
|
|
ASSERT_EQ(kv.second, "NOT_FOUND");
|
|
}
|
|
|
|
// Try with MultiGet API too
|
|
std::vector<std::string> values;
|
|
auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()},
|
|
{kv.first}, &values);
|
|
ASSERT_EQ(1, values.size());
|
|
ASSERT_EQ(1, s_vec.size());
|
|
s = s_vec[0];
|
|
ASSERT_TRUE(s.ok() || s.IsNotFound());
|
|
if (s.ok()) {
|
|
ASSERT_TRUE(kv.second == values[0]);
|
|
} else {
|
|
ASSERT_EQ(kv.second, "NOT_FOUND");
|
|
}
|
|
}
|
|
}
|
|
|
|
// Verify all versions of keys.
|
|
void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
|
|
std::vector<KeyVersion> versions;
|
|
const size_t kMaxKeys = 100000;
|
|
ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
|
|
expected_versions.back().user_key, kMaxKeys,
|
|
&versions));
|
|
ASSERT_EQ(expected_versions.size(), versions.size());
|
|
for (size_t i = 0; i < versions.size(); i++) {
|
|
ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
|
|
ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence);
|
|
ASSERT_EQ(expected_versions[i].type, versions[i].type);
|
|
if (versions[i].type != kTypeDeletion &&
|
|
versions[i].type != kTypeSingleDeletion) {
|
|
ASSERT_EQ(expected_versions[i].value, versions[i].value);
|
|
}
|
|
// Range delete not supported.
|
|
assert(expected_versions[i].type != kTypeRangeDeletion);
|
|
}
|
|
}
|
|
};
|
|
|
|
class WritePreparedTransactionTest
|
|
: public WritePreparedTransactionTestBase,
|
|
virtual public ::testing::WithParamInterface<
|
|
std::tuple<bool, bool, TxnDBWritePolicy>> {
|
|
public:
|
|
WritePreparedTransactionTest()
|
|
: WritePreparedTransactionTestBase(std::get<0>(GetParam()),
|
|
std::get<1>(GetParam()),
|
|
std::get<2>(GetParam())){};
|
|
};
|
|
|
|
#ifndef ROCKSDB_VALGRIND_RUN
|
|
class SnapshotConcurrentAccessTest
|
|
: public WritePreparedTransactionTestBase,
|
|
virtual public ::testing::WithParamInterface<
|
|
std::tuple<bool, bool, TxnDBWritePolicy, size_t, size_t>> {
|
|
public:
|
|
SnapshotConcurrentAccessTest()
|
|
: WritePreparedTransactionTestBase(std::get<0>(GetParam()),
|
|
std::get<1>(GetParam()),
|
|
std::get<2>(GetParam())),
|
|
split_id_(std::get<3>(GetParam())),
|
|
split_cnt_(std::get<4>(GetParam())){};
|
|
|
|
protected:
|
|
// A test is split into split_cnt_ tests, each identified with split_id_ where
|
|
// 0 <= split_id_ < split_cnt_
|
|
size_t split_id_;
|
|
size_t split_cnt_;
|
|
};
|
|
#endif // ROCKSDB_VALGRIND_RUN
|
|
|
|
class SeqAdvanceConcurrentTest
|
|
: public WritePreparedTransactionTestBase,
|
|
virtual public ::testing::WithParamInterface<
|
|
std::tuple<bool, bool, TxnDBWritePolicy, size_t, size_t>> {
|
|
public:
|
|
SeqAdvanceConcurrentTest()
|
|
: WritePreparedTransactionTestBase(std::get<0>(GetParam()),
|
|
std::get<1>(GetParam()),
|
|
std::get<2>(GetParam())),
|
|
split_id_(std::get<3>(GetParam())),
|
|
split_cnt_(std::get<4>(GetParam())){};
|
|
|
|
protected:
|
|
// A test is split into split_cnt_ tests, each identified with split_id_ where
|
|
// 0 <= split_id_ < split_cnt_
|
|
size_t split_id_;
|
|
size_t split_cnt_;
|
|
};
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
WritePreparedTransactionTest, WritePreparedTransactionTest,
|
|
::testing::Values(std::make_tuple(false, false, WRITE_PREPARED),
|
|
std::make_tuple(false, true, WRITE_PREPARED)));
|
|
|
|
#ifndef ROCKSDB_VALGRIND_RUN
|
|
INSTANTIATE_TEST_CASE_P(
|
|
TwoWriteQueues, SnapshotConcurrentAccessTest,
|
|
::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 1, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 2, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 3, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 4, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 5, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 6, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 7, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 8, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 9, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 10, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 11, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 12, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 13, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 14, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 15, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 16, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 17, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 18, 20),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 19, 20)));
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
OneWriteQueue, SnapshotConcurrentAccessTest,
|
|
::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 1, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 2, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 3, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 4, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 5, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 6, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 7, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 8, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 9, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 10, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 11, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 12, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 13, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 14, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 15, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 16, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 17, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 18, 20),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 19, 20)));
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
TwoWriteQueues, SeqAdvanceConcurrentTest,
|
|
::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 10),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 1, 10),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 2, 10),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 3, 10),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 4, 10),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 5, 10),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 6, 10),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 7, 10),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 8, 10),
|
|
std::make_tuple(false, true, WRITE_PREPARED, 9, 10)));
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
OneWriteQueue, SeqAdvanceConcurrentTest,
|
|
::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 10),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 1, 10),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 2, 10),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 3, 10),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 4, 10),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 5, 10),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 6, 10),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 7, 10),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 8, 10),
|
|
std::make_tuple(false, false, WRITE_PREPARED, 9, 10)));
|
|
#endif // ROCKSDB_VALGRIND_RUN
|
|
|
|
TEST_P(WritePreparedTransactionTest, CommitMapTest) {
|
|
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
assert(wp_db);
|
|
assert(wp_db->db_impl_);
|
|
size_t size = wp_db->COMMIT_CACHE_SIZE;
|
|
CommitEntry c = {5, 12}, e;
|
|
bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
|
|
ASSERT_FALSE(evicted);
|
|
|
|
// Should be able to read the same value
|
|
CommitEntry64b dont_care;
|
|
bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
|
|
ASSERT_TRUE(found);
|
|
ASSERT_EQ(c, e);
|
|
// Should be able to distinguish between overlapping entries
|
|
found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e);
|
|
ASSERT_TRUE(found);
|
|
ASSERT_NE(c.prep_seq + size, e.prep_seq);
|
|
// Should be able to detect non-existent entry
|
|
found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e);
|
|
ASSERT_FALSE(found);
|
|
|
|
// Reject an invalid exchange
|
|
CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size};
|
|
CommitEntry64b e2_64b(e2, wp_db->FORMAT);
|
|
bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e);
|
|
ASSERT_FALSE(exchanged);
|
|
// check whether it did actually reject that
|
|
found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e);
|
|
ASSERT_TRUE(found);
|
|
ASSERT_EQ(c, e);
|
|
|
|
// Accept a valid exchange
|
|
CommitEntry64b c_64b(c, wp_db->FORMAT);
|
|
CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
|
|
exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3);
|
|
ASSERT_TRUE(exchanged);
|
|
// check whether it did actually accepted that
|
|
found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
|
|
ASSERT_TRUE(found);
|
|
ASSERT_EQ(e3, e);
|
|
|
|
// Rewrite an entry
|
|
CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1};
|
|
evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
|
|
ASSERT_TRUE(evicted);
|
|
ASSERT_EQ(e3, e);
|
|
found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e);
|
|
ASSERT_TRUE(found);
|
|
ASSERT_EQ(e4, e);
|
|
}
|
|
|
|
TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
|
|
// If prepare <= snapshot < commit we should keep the entry around since its
|
|
// nonexistence could be interpreted as committed in the snapshot while it is
|
|
// not true. We keep such entries around by adding them to the
|
|
// old_commit_map_.
|
|
uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/;
|
|
p = 10l, c = 15l, s = 20l, ns = 21l;
|
|
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
|
|
// If we do not expect the old commit map to be updated, try also with a next
|
|
// snapshot that is expected to update the old commit map. This would test
|
|
// that MaybeUpdateOldCommitMap would not prevent us from checking the next
|
|
// snapshot that must be checked.
|
|
p = 10l, c = 15l, s = 20l, ns = 11l;
|
|
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
|
|
|
|
p = 10l, c = 20l, s = 20l, ns = 19l;
|
|
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
|
|
p = 10l, c = 20l, s = 20l, ns = 21l;
|
|
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
|
|
|
|
p = 20l, c = 20l, s = 20l, ns = 21l;
|
|
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
|
|
p = 20l, c = 20l, s = 20l, ns = 19l;
|
|
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
|
|
|
|
p = 10l, c = 25l, s = 20l, ns = 21l;
|
|
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
|
|
|
|
p = 20l, c = 25l, s = 20l, ns = 21l;
|
|
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
|
|
|
|
p = 21l, c = 25l, s = 20l, ns = 22l;
|
|
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
|
|
p = 21l, c = 25l, s = 20l, ns = 19l;
|
|
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
|
|
}
|
|
|
|
// Test that the entries in old_commit_map_ get garbage collected properly
|
|
TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
|
|
const size_t snapshot_cache_bits = 0;
|
|
const size_t commit_cache_bits = 0;
|
|
DBImpl* mock_db = new DBImpl(options, dbname);
|
|
std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
|
|
mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
|
|
|
|
SequenceNumber seq = 0;
|
|
// Take the first snapshot that overlaps with two txn
|
|
auto prep_seq = ++seq;
|
|
wp_db->AddPrepared(prep_seq);
|
|
auto prep_seq2 = ++seq;
|
|
wp_db->AddPrepared(prep_seq2);
|
|
auto snap_seq1 = seq;
|
|
wp_db->TakeSnapshot(snap_seq1);
|
|
auto commit_seq = ++seq;
|
|
wp_db->AddCommitted(prep_seq, commit_seq);
|
|
wp_db->RemovePrepared(prep_seq);
|
|
auto commit_seq2 = ++seq;
|
|
wp_db->AddCommitted(prep_seq2, commit_seq2);
|
|
wp_db->RemovePrepared(prep_seq2);
|
|
// Take the 2nd and 3rd snapshot that overlap with the same txn
|
|
prep_seq = ++seq;
|
|
wp_db->AddPrepared(prep_seq);
|
|
auto snap_seq2 = seq;
|
|
wp_db->TakeSnapshot(snap_seq2);
|
|
seq++;
|
|
auto snap_seq3 = seq;
|
|
wp_db->TakeSnapshot(snap_seq3);
|
|
seq++;
|
|
commit_seq = ++seq;
|
|
wp_db->AddCommitted(prep_seq, commit_seq);
|
|
wp_db->RemovePrepared(prep_seq);
|
|
// Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
|
|
// only item in the commit_cache_ via another commit.
|
|
prep_seq = ++seq;
|
|
wp_db->AddPrepared(prep_seq);
|
|
commit_seq = ++seq;
|
|
wp_db->AddCommitted(prep_seq, commit_seq);
|
|
wp_db->RemovePrepared(prep_seq);
|
|
|
|
// Verify that the evicted commit entries for all snapshots are in the
|
|
// old_commit_map_
|
|
{
|
|
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
|
|
ReadLock rl(&wp_db->old_commit_map_mutex_);
|
|
ASSERT_EQ(3, wp_db->old_commit_map_.size());
|
|
ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size());
|
|
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq2].size());
|
|
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
|
|
}
|
|
|
|
// Verify that the 2nd snapshot is cleaned up after the release
|
|
wp_db->ReleaseSnapshotInternal(snap_seq2);
|
|
{
|
|
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
|
|
ReadLock rl(&wp_db->old_commit_map_mutex_);
|
|
ASSERT_EQ(2, wp_db->old_commit_map_.size());
|
|
ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size());
|
|
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
|
|
}
|
|
|
|
// Verify that the 1st snapshot is cleaned up after the release
|
|
wp_db->ReleaseSnapshotInternal(snap_seq1);
|
|
{
|
|
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
|
|
ReadLock rl(&wp_db->old_commit_map_mutex_);
|
|
ASSERT_EQ(1, wp_db->old_commit_map_.size());
|
|
ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
|
|
}
|
|
|
|
// Verify that the 3rd snapshot is cleaned up after the release
|
|
wp_db->ReleaseSnapshotInternal(snap_seq3);
|
|
{
|
|
ASSERT_TRUE(wp_db->old_commit_map_empty_.load());
|
|
ReadLock rl(&wp_db->old_commit_map_mutex_);
|
|
ASSERT_EQ(0, wp_db->old_commit_map_.size());
|
|
}
|
|
}
|
|
|
|
TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
|
|
std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
|
|
600l, 700l, 800l, 900l};
|
|
const size_t snapshot_cache_bits = 2;
|
|
// Safety check to express the intended size in the test. Can be adjusted if
|
|
// the snapshots lists changed.
|
|
assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
|
|
DBImpl* mock_db = new DBImpl(options, dbname);
|
|
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
|
|
new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
|
|
SequenceNumber version = 1000l;
|
|
ASSERT_EQ(0, wp_db->snapshots_total_);
|
|
wp_db->UpdateSnapshots(snapshots, version);
|
|
ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_);
|
|
// seq numbers are chosen so that we have two of them between each two
|
|
// snapshots. If the diff of two consecutive seq is more than 5, there is a
|
|
// snapshot between them.
|
|
std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l,
|
|
355l, 450l, 455l, 550l, 555l, 650l, 655l,
|
|
750l, 755l, 850l, 855l, 950l, 955l};
|
|
assert(seqs.size() > 1);
|
|
for (size_t i = 0; i < seqs.size() - 1; i++) {
|
|
wp_db->old_commit_map_empty_ = true; // reset
|
|
CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
|
|
wp_db->CheckAgainstSnapshots(commit_entry);
|
|
// Expect update if there is snapshot in between the prepare and commit
|
|
bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 &&
|
|
commit_entry.commit_seq >= snapshots.front() &&
|
|
commit_entry.prep_seq <= snapshots.back();
|
|
ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
|
|
}
|
|
}
|
|
|
|
// This test is too slow for travis
|
|
#ifndef TRAVIS
|
|
#ifndef ROCKSDB_VALGRIND_RUN
|
|
// Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
|
|
// parallel with UpdateSnapshots.
|
|
TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccessTest) {
|
|
// We have a sync point in the method under test after checking each snapshot.
|
|
// If you increase the max number of snapshots in this test, more sync points
|
|
// in the methods must also be added.
|
|
const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
|
|
60l, 70l, 80l, 90l, 100l};
|
|
const size_t snapshot_cache_bits = 2;
|
|
// Safety check to express the intended size in the test. Can be adjusted if
|
|
// the snapshots lists changed.
|
|
assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size());
|
|
SequenceNumber version = 1000l;
|
|
// Choose the cache size so that the new snapshot list could replace all the
|
|
// existing items in the cache and also have some overflow.
|
|
DBImpl* mock_db = new DBImpl(options, dbname);
|
|
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
|
|
new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
|
|
const size_t extra = 2;
|
|
size_t loop_id = 0;
|
|
// Add up to extra items that do not fit into the cache
|
|
for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra;
|
|
old_size++) {
|
|
const std::vector<SequenceNumber> old_snapshots(
|
|
snapshots.begin(), snapshots.begin() + old_size);
|
|
|
|
// Each member of old snapshot might or might not appear in the new list. We
|
|
// create a common_snapshots for each combination.
|
|
size_t new_comb_cnt = size_t(1) << old_size;
|
|
for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) {
|
|
if (loop_id % split_cnt_ != split_id_) continue;
|
|
printf("."); // To signal progress
|
|
fflush(stdout);
|
|
std::vector<SequenceNumber> common_snapshots;
|
|
for (size_t i = 0; i < old_snapshots.size(); i++) {
|
|
if (IsInCombination(i, new_comb)) {
|
|
common_snapshots.push_back(old_snapshots[i]);
|
|
}
|
|
}
|
|
// And add some new snapshots to the common list
|
|
for (size_t added_snapshots = 0;
|
|
added_snapshots <= snapshots.size() - old_snapshots.size();
|
|
added_snapshots++) {
|
|
std::vector<SequenceNumber> new_snapshots = common_snapshots;
|
|
for (size_t i = 0; i < added_snapshots; i++) {
|
|
new_snapshots.push_back(snapshots[old_snapshots.size() + i]);
|
|
}
|
|
for (auto it = common_snapshots.begin(); it != common_snapshots.end();
|
|
it++) {
|
|
auto snapshot = *it;
|
|
// Create a commit entry that is around the snapshot and thus should
|
|
// be not be discarded
|
|
CommitEntry entry = {static_cast<uint64_t>(snapshot - 1),
|
|
snapshot + 1};
|
|
// The critical part is when iterating the snapshot cache. Afterwards,
|
|
// we are operating under the lock
|
|
size_t a_range =
|
|
std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
|
|
size_t b_range =
|
|
std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
|
|
// Break each thread at two points
|
|
for (size_t a1 = 1; a1 <= a_range; a1++) {
|
|
for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
|
|
for (size_t b1 = 1; b1 <= b_range; b1++) {
|
|
for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
|
|
SnapshotConcurrentAccessTestInternal(
|
|
wp_db.get(), old_snapshots, new_snapshots, entry, version,
|
|
a1, a2, b1, b2);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
printf("\n");
|
|
}
|
|
#endif // ROCKSDB_VALGRIND_RUN
|
|
#endif // TRAVIS
|
|
|
|
// This test clarifies the contract of AdvanceMaxEvictedSeq method
|
|
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) {
|
|
DBImpl* mock_db = new DBImpl(options, dbname);
|
|
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
|
|
new WritePreparedTxnDBMock(mock_db, txn_db_options));
|
|
|
|
// 1. Set the initial values for max, prepared, and snapshots
|
|
SequenceNumber zero_max = 0l;
|
|
// Set the initial list of prepared txns
|
|
const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100,
|
|
150, 200, 250};
|
|
for (auto p : initial_prepared) {
|
|
wp_db->AddPrepared(p);
|
|
}
|
|
// This updates the max value and also set old prepared
|
|
SequenceNumber init_max = 100;
|
|
wp_db->AdvanceMaxEvictedSeq(zero_max, init_max);
|
|
const std::vector<SequenceNumber> initial_snapshots = {20, 40};
|
|
wp_db->SetDBSnapshots(initial_snapshots);
|
|
// This will update the internal cache of snapshots from the DB
|
|
wp_db->UpdateSnapshots(initial_snapshots, init_max);
|
|
|
|
// 2. Invoke AdvanceMaxEvictedSeq
|
|
const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300};
|
|
wp_db->SetDBSnapshots(latest_snapshots);
|
|
SequenceNumber new_max = 200;
|
|
wp_db->AdvanceMaxEvictedSeq(init_max, new_max);
|
|
|
|
// 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
|
|
// a. max should be updated to new_max
|
|
ASSERT_EQ(wp_db->max_evicted_seq_, new_max);
|
|
// b. delayed prepared should contain every txn <= max and prepared should
|
|
// only contain txns > max
|
|
auto it = initial_prepared.begin();
|
|
for (; it != initial_prepared.end() && *it <= new_max; it++) {
|
|
ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it));
|
|
}
|
|
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
|
|
for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty();
|
|
it++, wp_db->prepared_txns_.pop()) {
|
|
ASSERT_EQ(*it, wp_db->prepared_txns_.top());
|
|
}
|
|
ASSERT_TRUE(it == initial_prepared.end());
|
|
ASSERT_TRUE(wp_db->prepared_txns_.empty());
|
|
// c. snapshots should contain everything below new_max
|
|
auto sit = latest_snapshots.begin();
|
|
for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max &&
|
|
i < wp_db->snapshots_total_;
|
|
sit++, i++) {
|
|
ASSERT_TRUE(i < wp_db->snapshots_total_);
|
|
// This test is in small scale and the list of snapshots are assumed to be
|
|
// within the cache size limit. This is just a safety check to double check
|
|
// that assumption.
|
|
ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE);
|
|
ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]);
|
|
}
|
|
}
|
|
|
|
// This tests that transactions with duplicate keys perform correctly after max
|
|
// is advancing their prepared sequence numbers. This will not be the case if
|
|
// for example the txn does not add the prepared seq for the second sub-batch to
|
|
// the PrepareHeap structure.
|
|
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
|
|
WriteOptions write_options;
|
|
TransactionOptions txn_options;
|
|
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
|
|
ASSERT_OK(txn0->SetName("xid"));
|
|
ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
|
|
ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
|
|
ASSERT_OK(txn0->Prepare());
|
|
|
|
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
// Ensure that all the prepared sequence numbers will be removed from the
|
|
// PrepareHeap.
|
|
SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE;
|
|
wp_db->AdvanceMaxEvictedSeq(0, new_max);
|
|
|
|
ReadOptions ropt;
|
|
PinnableSlice pinnable_val;
|
|
auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
delete txn0;
|
|
|
|
wp_db->db_impl_->FlushWAL(true);
|
|
wp_db->TEST_Crash();
|
|
ReOpenNoDelete();
|
|
assert(db != nullptr);
|
|
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
wp_db->AdvanceMaxEvictedSeq(0, new_max);
|
|
s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
|
|
txn0 = db->GetTransactionByName("xid");
|
|
ASSERT_OK(txn0->Rollback());
|
|
delete txn0;
|
|
}
|
|
|
|
TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) {
|
|
// Given the sequential run of txns, with this timeout we should never see a
|
|
// deadlock nor a timeout unless we have a key conflict, which should be
|
|
// almost infeasible.
|
|
txn_db_options.transaction_lock_timeout = 1000;
|
|
txn_db_options.default_lock_timeout = 1000;
|
|
ReOpen();
|
|
FlushOptions fopt;
|
|
|
|
// Number of different txn types we use in this test
|
|
const size_t type_cnt = 5;
|
|
// The size of the first write group
|
|
// TODO(myabandeh): This should be increase for pre-release tests
|
|
const size_t first_group_size = 2;
|
|
// Total number of txns we run in each test
|
|
// TODO(myabandeh): This should be increase for pre-release tests
|
|
const size_t txn_cnt = first_group_size + 1;
|
|
|
|
size_t base[txn_cnt + 1] = {
|
|
1,
|
|
};
|
|
for (size_t bi = 1; bi <= txn_cnt; bi++) {
|
|
base[bi] = base[bi - 1] * type_cnt;
|
|
}
|
|
const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
|
|
printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n);
|
|
for (size_t n = 0; n < max_n; n++, ReOpen()) {
|
|
if (n % split_cnt_ != split_id_) continue;
|
|
if (n % 1000 == 0) {
|
|
printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
|
|
}
|
|
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
|
auto seq = db_impl->TEST_GetLastVisibleSequence();
|
|
exp_seq = seq;
|
|
// This is increased before writing the batch for commit
|
|
commit_writes = 0;
|
|
// This is increased before txn starts linking if it expects to do a commit
|
|
// eventually
|
|
expected_commits = 0;
|
|
std::vector<port::Thread> threads;
|
|
|
|
linked = 0;
|
|
std::atomic<bool> batch_formed(false);
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"WriteThread::EnterAsBatchGroupLeader:End",
|
|
[&](void* /*arg*/) { batch_formed = true; });
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
|
|
linked++;
|
|
if (linked == 1) {
|
|
// Wait until the others are linked too.
|
|
while (linked < first_group_size) {
|
|
}
|
|
} else if (linked == 1 + first_group_size) {
|
|
// Make the 2nd batch of the rest of writes plus any followup
|
|
// commits from the first batch
|
|
while (linked < txn_cnt + commit_writes) {
|
|
}
|
|
}
|
|
// Then we will have one or more batches consisting of follow-up
|
|
// commits from the 2nd batch. There is a bit of non-determinism here
|
|
// but it should be tolerable.
|
|
});
|
|
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
for (size_t bi = 0; bi < txn_cnt; bi++) {
|
|
// get the bi-th digit in number system based on type_cnt
|
|
size_t d = (n % base[bi + 1]) / base[bi];
|
|
switch (d) {
|
|
case 0:
|
|
threads.emplace_back(txn_t0, bi);
|
|
break;
|
|
case 1:
|
|
threads.emplace_back(txn_t1, bi);
|
|
break;
|
|
case 2:
|
|
threads.emplace_back(txn_t2, bi);
|
|
break;
|
|
case 3:
|
|
threads.emplace_back(txn_t3, bi);
|
|
break;
|
|
case 4:
|
|
threads.emplace_back(txn_t3, bi);
|
|
break;
|
|
default:
|
|
assert(false);
|
|
}
|
|
// wait to be linked
|
|
while (linked.load() <= bi) {
|
|
}
|
|
// after a queue of size first_group_size
|
|
if (bi + 1 == first_group_size) {
|
|
while (!batch_formed) {
|
|
}
|
|
// to make it more deterministic, wait until the commits are linked
|
|
while (linked.load() <= bi + expected_commits) {
|
|
}
|
|
}
|
|
}
|
|
for (auto& t : threads) {
|
|
t.join();
|
|
}
|
|
if (options.two_write_queues) {
|
|
// In this case none of the above scheduling tricks to deterministically
|
|
// form merged batches works because the writes go to separate queues.
|
|
// This would result in different write groups in each run of the test. We
|
|
// still keep the test since although non-deterministic and hard to debug,
|
|
// it is still useful to have.
|
|
// TODO(myabandeh): Add a deterministic unit test for two_write_queues
|
|
}
|
|
|
|
// Check if memtable inserts advanced seq number as expected
|
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
|
ASSERT_EQ(exp_seq, seq);
|
|
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
|
|
// Check if recovery preserves the last sequence number
|
|
db_impl->FlushWAL(true);
|
|
ReOpenNoDelete();
|
|
assert(db != nullptr);
|
|
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
|
seq = db_impl->TEST_GetLastVisibleSequence();
|
|
ASSERT_EQ(exp_seq, seq);
|
|
|
|
// Check if flush preserves the last sequence number
|
|
db_impl->Flush(fopt);
|
|
seq = db_impl->GetLatestSequenceNumber();
|
|
ASSERT_EQ(exp_seq, seq);
|
|
|
|
// Check if recovery after flush preserves the last sequence number
|
|
db_impl->FlushWAL(true);
|
|
ReOpenNoDelete();
|
|
assert(db != nullptr);
|
|
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
|
seq = db_impl->GetLatestSequenceNumber();
|
|
ASSERT_EQ(exp_seq, seq);
|
|
}
|
|
}
|
|
|
|
// Run a couple of different txns among them some uncommitted. Restart the db at
|
|
// a couple points to check whether the list of uncommitted txns are recovered
|
|
// properly.
|
|
TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) {
|
|
options.disable_auto_compactions = true;
|
|
ReOpen();
|
|
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
|
|
txn_t0(0);
|
|
|
|
TransactionOptions txn_options;
|
|
WriteOptions write_options;
|
|
size_t index = 1000;
|
|
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
|
|
auto istr0 = std::to_string(index);
|
|
auto s = txn0->SetName("xid" + istr0);
|
|
ASSERT_OK(s);
|
|
s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
|
|
ASSERT_OK(s);
|
|
s = txn0->Prepare();
|
|
auto prep_seq_0 = txn0->GetId();
|
|
|
|
txn_t1(0);
|
|
|
|
index++;
|
|
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
|
|
auto istr1 = std::to_string(index);
|
|
s = txn1->SetName("xid" + istr1);
|
|
ASSERT_OK(s);
|
|
s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
|
|
ASSERT_OK(s);
|
|
s = txn1->Prepare();
|
|
auto prep_seq_1 = txn1->GetId();
|
|
|
|
txn_t2(0);
|
|
|
|
ReadOptions ropt;
|
|
PinnableSlice pinnable_val;
|
|
// Check the value is not committed before restart
|
|
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
pinnable_val.Reset();
|
|
|
|
delete txn0;
|
|
delete txn1;
|
|
wp_db->db_impl_->FlushWAL(true);
|
|
wp_db->TEST_Crash();
|
|
ReOpenNoDelete();
|
|
assert(db != nullptr);
|
|
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
// After recovery, all the uncommitted txns (0 and 1) should be inserted into
|
|
// delayed_prepared_
|
|
ASSERT_TRUE(wp_db->prepared_txns_.empty());
|
|
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
|
|
ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
|
|
ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_);
|
|
{
|
|
ReadLock rl(&wp_db->prepared_mutex_);
|
|
ASSERT_EQ(2, wp_db->delayed_prepared_.size());
|
|
ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) !=
|
|
wp_db->delayed_prepared_.end());
|
|
ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) !=
|
|
wp_db->delayed_prepared_.end());
|
|
}
|
|
|
|
// Check the value is still not committed after restart
|
|
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
pinnable_val.Reset();
|
|
|
|
txn_t3(0);
|
|
|
|
// Test that a recovered txns will be properly marked committed for the next
|
|
// recovery
|
|
txn1 = db->GetTransactionByName("xid" + istr1);
|
|
ASSERT_NE(txn1, nullptr);
|
|
txn1->Commit();
|
|
delete txn1;
|
|
|
|
index++;
|
|
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
|
|
auto istr2 = std::to_string(index);
|
|
s = txn2->SetName("xid" + istr2);
|
|
ASSERT_OK(s);
|
|
s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
|
|
ASSERT_OK(s);
|
|
s = txn2->Prepare();
|
|
auto prep_seq_2 = txn2->GetId();
|
|
|
|
delete txn2;
|
|
wp_db->db_impl_->FlushWAL(true);
|
|
wp_db->TEST_Crash();
|
|
ReOpenNoDelete();
|
|
assert(db != nullptr);
|
|
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
ASSERT_TRUE(wp_db->prepared_txns_.empty());
|
|
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
|
|
|
|
// 0 and 2 are prepared and 1 is committed
|
|
{
|
|
ReadLock rl(&wp_db->prepared_mutex_);
|
|
ASSERT_EQ(2, wp_db->delayed_prepared_.size());
|
|
const auto& end = wp_db->delayed_prepared_.end();
|
|
ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end);
|
|
ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end);
|
|
ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end);
|
|
}
|
|
ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
|
|
ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_);
|
|
|
|
// Commit all the remaining txns
|
|
txn0 = db->GetTransactionByName("xid" + istr0);
|
|
ASSERT_NE(txn0, nullptr);
|
|
txn0->Commit();
|
|
txn2 = db->GetTransactionByName("xid" + istr2);
|
|
ASSERT_NE(txn2, nullptr);
|
|
txn2->Commit();
|
|
|
|
// Check the value is committed after commit
|
|
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
|
|
ASSERT_TRUE(s.ok());
|
|
ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
|
|
pinnable_val.Reset();
|
|
|
|
delete txn0;
|
|
delete txn2;
|
|
wp_db->db_impl_->FlushWAL(true);
|
|
ReOpenNoDelete();
|
|
assert(db != nullptr);
|
|
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
ASSERT_TRUE(wp_db->prepared_txns_.empty());
|
|
ASSERT_TRUE(wp_db->delayed_prepared_empty_);
|
|
|
|
// Check the value is still committed after recovery
|
|
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
|
|
ASSERT_TRUE(s.ok());
|
|
ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
|
|
pinnable_val.Reset();
|
|
}
|
|
|
|
// After recovery the commit map is empty while the max is set. The code would
|
|
// go through a different path which requires a separate test.
|
|
TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) {
|
|
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
wp_db->max_evicted_seq_ = 100;
|
|
ASSERT_FALSE(wp_db->IsInSnapshot(50, 40));
|
|
ASSERT_TRUE(wp_db->IsInSnapshot(50, 50));
|
|
ASSERT_TRUE(wp_db->IsInSnapshot(50, 100));
|
|
ASSERT_TRUE(wp_db->IsInSnapshot(50, 150));
|
|
ASSERT_FALSE(wp_db->IsInSnapshot(100, 80));
|
|
ASSERT_TRUE(wp_db->IsInSnapshot(100, 100));
|
|
ASSERT_TRUE(wp_db->IsInSnapshot(100, 150));
|
|
}
|
|
|
|
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of
|
|
// snapshot, max_committed_seq_, prepared, and commit entries.
|
|
TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
|
|
WriteOptions wo;
|
|
// Use small commit cache to trigger lots of eviction and fast advance of
|
|
// max_evicted_seq_
|
|
const size_t commit_cache_bits = 3;
|
|
// Same for snapshot cache size
|
|
const size_t snapshot_cache_bits = 2;
|
|
|
|
// Take some preliminary snapshots first. This is to stress the data structure
|
|
// that holds the old snapshots as it will be designed to be efficient when
|
|
// only a few snapshots are below the max_evicted_seq_.
|
|
for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
|
|
// Leave some gap between the preliminary snapshots and the final snapshot
|
|
// that we check. This should test for also different overlapping scenarios
|
|
// between the last snapshot and the commits.
|
|
for (int max_gap = 1; max_gap < 10; max_gap++) {
|
|
// Since we do not actually write to db, we mock the seq as it would be
|
|
// increased by the db. The only exception is that we need db seq to
|
|
// advance for our snapshots. for which we apply a dummy put each time we
|
|
// increase our mock of seq.
|
|
uint64_t seq = 0;
|
|
// At each step we prepare a txn and then we commit it in the next txn.
|
|
// This emulates the consecutive transactions that write to the same key
|
|
uint64_t cur_txn = 0;
|
|
// Number of snapshots taken so far
|
|
int num_snapshots = 0;
|
|
// Number of gaps applied so far
|
|
int gap_cnt = 0;
|
|
// The final snapshot that we will inspect
|
|
uint64_t snapshot = 0;
|
|
bool found_committed = false;
|
|
// To stress the data structure that maintain prepared txns, at each cycle
|
|
// we add a new prepare txn. These do not mean to be committed for
|
|
// snapshot inspection.
|
|
std::set<uint64_t> prepared;
|
|
// We keep the list of txns committed before we take the last snapshot.
|
|
// These should be the only seq numbers that will be found in the snapshot
|
|
std::set<uint64_t> committed_before;
|
|
// The set of commit seq numbers to be excluded from IsInSnapshot queries
|
|
std::set<uint64_t> commit_seqs;
|
|
DBImpl* mock_db = new DBImpl(options, dbname);
|
|
std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
|
|
mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
|
|
// We continue until max advances a bit beyond the snapshot.
|
|
while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
|
|
// do prepare for a transaction
|
|
seq++;
|
|
wp_db->AddPrepared(seq);
|
|
prepared.insert(seq);
|
|
|
|
// If cur_txn is not started, do prepare for it.
|
|
if (!cur_txn) {
|
|
seq++;
|
|
cur_txn = seq;
|
|
wp_db->AddPrepared(cur_txn);
|
|
} else { // else commit it
|
|
seq++;
|
|
wp_db->AddCommitted(cur_txn, seq);
|
|
wp_db->RemovePrepared(cur_txn);
|
|
commit_seqs.insert(seq);
|
|
if (!snapshot) {
|
|
committed_before.insert(cur_txn);
|
|
}
|
|
cur_txn = 0;
|
|
}
|
|
|
|
if (num_snapshots < max_snapshots - 1) {
|
|
// Take preliminary snapshots
|
|
wp_db->TakeSnapshot(seq);
|
|
num_snapshots++;
|
|
} else if (gap_cnt < max_gap) {
|
|
// Wait for some gap before taking the final snapshot
|
|
gap_cnt++;
|
|
} else if (!snapshot) {
|
|
// Take the final snapshot if it is not already taken
|
|
snapshot = seq;
|
|
wp_db->TakeSnapshot(snapshot);
|
|
num_snapshots++;
|
|
}
|
|
|
|
// If the snapshot is taken, verify seq numbers visible to it. We redo
|
|
// it at each cycle to test that the system is still sound when
|
|
// max_evicted_seq_ advances.
|
|
if (snapshot) {
|
|
for (uint64_t s = 1;
|
|
s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) {
|
|
bool was_committed =
|
|
(committed_before.find(s) != committed_before.end());
|
|
bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
|
|
if (was_committed != is_in_snapshot) {
|
|
printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
|
|
" snapshot %" PRIu64
|
|
" gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
|
|
max_snapshots, max_gap, seq,
|
|
wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
|
|
num_snapshots, s);
|
|
}
|
|
ASSERT_EQ(was_committed, is_in_snapshot);
|
|
found_committed = found_committed || is_in_snapshot;
|
|
}
|
|
}
|
|
}
|
|
// Safety check to make sure the test actually ran
|
|
ASSERT_TRUE(found_committed);
|
|
// As an extra check, check if prepared set will be properly empty after
|
|
// they are committed.
|
|
if (cur_txn) {
|
|
wp_db->AddCommitted(cur_txn, seq);
|
|
wp_db->RemovePrepared(cur_txn);
|
|
}
|
|
for (auto p : prepared) {
|
|
wp_db->AddCommitted(p, seq);
|
|
wp_db->RemovePrepared(p);
|
|
}
|
|
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
|
|
ASSERT_TRUE(wp_db->prepared_txns_.empty());
|
|
}
|
|
}
|
|
}
|
|
|
|
void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
|
|
PinnableSlice& exp_v, Slice key) {
|
|
Status s;
|
|
PinnableSlice v;
|
|
s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
|
|
ASSERT_TRUE(exp_s == s);
|
|
ASSERT_TRUE(s.ok() || s.IsNotFound());
|
|
if (s.ok()) {
|
|
ASSERT_TRUE(exp_v == v);
|
|
}
|
|
|
|
// Try with MultiGet API too
|
|
std::vector<std::string> values;
|
|
auto s_vec =
|
|
db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values);
|
|
ASSERT_EQ(1, values.size());
|
|
ASSERT_EQ(1, s_vec.size());
|
|
s = s_vec[0];
|
|
ASSERT_TRUE(exp_s == s);
|
|
ASSERT_TRUE(s.ok() || s.IsNotFound());
|
|
if (s.ok()) {
|
|
ASSERT_TRUE(exp_v == values[0]);
|
|
}
|
|
}
|
|
|
|
void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
|
|
Slice key) {
|
|
ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key);
|
|
}
|
|
|
|
TEST_P(WritePreparedTransactionTest, RollbackTest) {
|
|
ReadOptions roptions;
|
|
WriteOptions woptions;
|
|
TransactionOptions txn_options;
|
|
const size_t num_keys = 4;
|
|
const size_t num_values = 5;
|
|
for (size_t ikey = 1; ikey <= num_keys; ikey++) {
|
|
for (size_t ivalue = 0; ivalue < num_values; ivalue++) {
|
|
for (bool crash : {false, true}) {
|
|
ReOpen();
|
|
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
std::string key_str = "key" + ToString(ikey);
|
|
switch (ivalue) {
|
|
case 0:
|
|
break;
|
|
case 1:
|
|
ASSERT_OK(db->Put(woptions, key_str, "initvalue1"));
|
|
break;
|
|
case 2:
|
|
ASSERT_OK(db->Merge(woptions, key_str, "initvalue2"));
|
|
break;
|
|
case 3:
|
|
ASSERT_OK(db->Delete(woptions, key_str));
|
|
break;
|
|
case 4:
|
|
ASSERT_OK(db->SingleDelete(woptions, key_str));
|
|
break;
|
|
default:
|
|
assert(0);
|
|
}
|
|
|
|
PinnableSlice v1;
|
|
auto s1 =
|
|
db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1);
|
|
PinnableSlice v2;
|
|
auto s2 =
|
|
db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2);
|
|
PinnableSlice v3;
|
|
auto s3 =
|
|
db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3);
|
|
PinnableSlice v4;
|
|
auto s4 =
|
|
db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4);
|
|
Transaction* txn = db->BeginTransaction(woptions, txn_options);
|
|
auto s = txn->SetName("xid0");
|
|
ASSERT_OK(s);
|
|
s = txn->Put(Slice("key1"), Slice("value1"));
|
|
ASSERT_OK(s);
|
|
s = txn->Merge(Slice("key2"), Slice("value2"));
|
|
ASSERT_OK(s);
|
|
s = txn->Delete(Slice("key3"));
|
|
ASSERT_OK(s);
|
|
s = txn->SingleDelete(Slice("key4"));
|
|
ASSERT_OK(s);
|
|
s = txn->Prepare();
|
|
ASSERT_OK(s);
|
|
|
|
{
|
|
ReadLock rl(&wp_db->prepared_mutex_);
|
|
ASSERT_FALSE(wp_db->prepared_txns_.empty());
|
|
ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top());
|
|
}
|
|
|
|
ASSERT_SAME(db, s1, v1, "key1");
|
|
ASSERT_SAME(db, s2, v2, "key2");
|
|
ASSERT_SAME(db, s3, v3, "key3");
|
|
ASSERT_SAME(db, s4, v4, "key4");
|
|
|
|
if (crash) {
|
|
delete txn;
|
|
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
|
db_impl->FlushWAL(true);
|
|
dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
|
|
ReOpenNoDelete();
|
|
assert(db != nullptr);
|
|
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
|
txn = db->GetTransactionByName("xid0");
|
|
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
|
|
ReadLock rl(&wp_db->prepared_mutex_);
|
|
ASSERT_TRUE(wp_db->prepared_txns_.empty());
|
|
ASSERT_FALSE(wp_db->delayed_prepared_.empty());
|
|
ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
|
|
wp_db->delayed_prepared_.end());
|
|
}
|
|
|
|
ASSERT_SAME(db, s1, v1, "key1");
|
|
ASSERT_SAME(db, s2, v2, "key2");
|
|
ASSERT_SAME(db, s3, v3, "key3");
|
|
ASSERT_SAME(db, s4, v4, "key4");
|
|
|
|
s = txn->Rollback();
|
|
ASSERT_OK(s);
|
|
|
|
{
|
|
ASSERT_TRUE(wp_db->delayed_prepared_empty_);
|
|
ReadLock rl(&wp_db->prepared_mutex_);
|
|
ASSERT_TRUE(wp_db->prepared_txns_.empty());
|
|
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
|
|
}
|
|
|
|
ASSERT_SAME(db, s1, v1, "key1");
|
|
ASSERT_SAME(db, s2, v2, "key2");
|
|
ASSERT_SAME(db, s3, v3, "key3");
|
|
ASSERT_SAME(db, s4, v4, "key4");
|
|
delete txn;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) {
|
|
// Use large buffer to avoid memtable flush after 1024 insertions
|
|
options.write_buffer_size = 1024 * 1024;
|
|
ReOpen();
|
|
std::vector<KeyVersion> versions;
|
|
uint64_t seq = 0;
|
|
for (uint64_t i = 1; i <= 1024; i++) {
|
|
std::string v = "bar" + ToString(i);
|
|
ASSERT_OK(db->Put(WriteOptions(), "foo", v));
|
|
VerifyKeys({{"foo", v}});
|
|
seq++; // one for the key/value
|
|
KeyVersion kv = {"foo", v, seq, kTypeValue};
|
|
if (options.two_write_queues) {
|
|
seq++; // one for the commit
|
|
}
|
|
versions.emplace_back(kv);
|
|
}
|
|
std::reverse(std::begin(versions), std::end(versions));
|
|
VerifyInternalKeys(versions);
|
|
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
|
db_impl->FlushWAL(true);
|
|
// Use small buffer to ensure memtable flush during recovery
|
|
options.write_buffer_size = 1024;
|
|
ReOpenNoDelete();
|
|
VerifyInternalKeys(versions);
|
|
}
|
|
|
|
TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) {
|
|
ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
|
|
VerifyKeys({{"foo", "bar"}});
|
|
const Snapshot* snapshot = db->GetSnapshot();
|
|
ASSERT_OK(db->Flush(FlushOptions()));
|
|
// Dummy keys to avoid compaction trivially move files and get around actual
|
|
// compaction logic.
|
|
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
|
|
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
|
|
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
// Compaction will output keys with sequence number 0, if it is visible to
|
|
// earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
|
|
// visible to any snapshot.
|
|
VerifyKeys({{"foo", "bar"}});
|
|
VerifyKeys({{"foo", "bar"}}, snapshot);
|
|
VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}});
|
|
db->ReleaseSnapshot(snapshot);
|
|
}
|
|
|
|
// Compaction should not remove a key if it is not committed, and should
|
|
// proceed with older versions of the key as-if the new version doesn't exist.
|
|
TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
|
|
options.disable_auto_compactions = true;
|
|
ReOpen();
|
|
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
|
// Snapshots to avoid keys get evicted.
|
|
std::vector<const Snapshot*> snapshots;
|
|
// Keep track of expected sequence number.
|
|
SequenceNumber expected_seq = 0;
|
|
|
|
auto add_key = [&](std::function<Status()> func) {
|
|
ASSERT_OK(func());
|
|
expected_seq++;
|
|
if (options.two_write_queues) {
|
|
expected_seq++; // 1 for commit
|
|
}
|
|
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
|
snapshots.push_back(db->GetSnapshot());
|
|
};
|
|
|
|
// Each key here represent a standalone test case.
|
|
add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); });
|
|
add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); });
|
|
add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); });
|
|
add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); });
|
|
add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); });
|
|
add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); });
|
|
add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); });
|
|
add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); });
|
|
ASSERT_OK(db->Flush(FlushOptions()));
|
|
add_key([&]() { return db->Delete(WriteOptions(), "key6"); });
|
|
add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); });
|
|
|
|
auto* transaction = db->BeginTransaction(WriteOptions());
|
|
ASSERT_OK(transaction->SetName("txn"));
|
|
ASSERT_OK(transaction->Put("key1", "value1_2"));
|
|
ASSERT_OK(transaction->Delete("key2"));
|
|
ASSERT_OK(transaction->SingleDelete("key3"));
|
|
ASSERT_OK(transaction->Merge("key4", "value4_2"));
|
|
ASSERT_OK(transaction->Merge("key5", "value5_3"));
|
|
ASSERT_OK(transaction->Put("key6", "value6_2"));
|
|
ASSERT_OK(transaction->Put("key7", "value7_2"));
|
|
// Prepare but not commit.
|
|
ASSERT_OK(transaction->Prepare());
|
|
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
|
|
ASSERT_OK(db->Flush(FlushOptions()));
|
|
for (auto* s : snapshots) {
|
|
db->ReleaseSnapshot(s);
|
|
}
|
|
// Dummy keys to avoid compaction trivially move files and get around actual
|
|
// compaction logic.
|
|
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
|
|
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
|
|
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
VerifyKeys({
|
|
{"key1", "value1_1"},
|
|
{"key2", "value2_1"},
|
|
{"key3", "value3_1"},
|
|
{"key4", "value4_1"},
|
|
{"key5", "value5_1,value5_2"},
|
|
{"key6", "NOT_FOUND"},
|
|
{"key7", "NOT_FOUND"},
|
|
});
|
|
VerifyInternalKeys({
|
|
{"key1", "value1_2", expected_seq, kTypeValue},
|
|
{"key1", "value1_1", 0, kTypeValue},
|
|
{"key2", "", expected_seq, kTypeDeletion},
|
|
{"key2", "value2_1", 0, kTypeValue},
|
|
{"key3", "", expected_seq, kTypeSingleDeletion},
|
|
{"key3", "value3_1", 0, kTypeValue},
|
|
{"key4", "value4_2", expected_seq, kTypeMerge},
|
|
{"key4", "value4_1", 0, kTypeValue},
|
|
{"key5", "value5_3", expected_seq, kTypeMerge},
|
|
{"key5", "value5_1,value5_2", 0, kTypeValue},
|
|
{"key6", "value6_2", expected_seq, kTypeValue},
|
|
{"key7", "value7_2", expected_seq, kTypeValue},
|
|
});
|
|
ASSERT_OK(transaction->Commit());
|
|
VerifyKeys({
|
|
{"key1", "value1_2"},
|
|
{"key2", "NOT_FOUND"},
|
|
{"key3", "NOT_FOUND"},
|
|
{"key4", "value4_1,value4_2"},
|
|
{"key5", "value5_1,value5_2,value5_3"},
|
|
{"key6", "value6_2"},
|
|
{"key7", "value7_2"},
|
|
});
|
|
delete transaction;
|
|
}
|
|
|
|
// Compaction should keep keys visible to a snapshot based on commit sequence,
|
|
// not just prepare sequence.
|
|
TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
|
|
options.disable_auto_compactions = true;
|
|
ReOpen();
|
|
// Keep track of expected sequence number.
|
|
SequenceNumber expected_seq = 0;
|
|
auto* txn1 = db->BeginTransaction(WriteOptions());
|
|
ASSERT_OK(txn1->SetName("txn1"));
|
|
ASSERT_OK(txn1->Put("key1", "value1_1"));
|
|
ASSERT_OK(txn1->Prepare());
|
|
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
|
|
ASSERT_OK(txn1->Commit());
|
|
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
|
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
|
delete txn1;
|
|
// Take a snapshots to avoid keys get evicted before compaction.
|
|
const Snapshot* snapshot1 = db->GetSnapshot();
|
|
auto* txn2 = db->BeginTransaction(WriteOptions());
|
|
ASSERT_OK(txn2->SetName("txn2"));
|
|
ASSERT_OK(txn2->Put("key2", "value2_1"));
|
|
ASSERT_OK(txn2->Prepare());
|
|
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
|
|
// txn1 commit before snapshot2 and it is visible to snapshot2.
|
|
// txn2 commit after snapshot2 and it is not visible.
|
|
const Snapshot* snapshot2 = db->GetSnapshot();
|
|
ASSERT_OK(txn2->Commit());
|
|
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
|
delete txn2;
|
|
// Take a snapshots to avoid keys get evicted before compaction.
|
|
const Snapshot* snapshot3 = db->GetSnapshot();
|
|
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
|
|
expected_seq++; // 1 for write
|
|
SequenceNumber seq1 = expected_seq;
|
|
if (options.two_write_queues) {
|
|
expected_seq++; // 1 for commit
|
|
}
|
|
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
|
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
|
|
expected_seq++; // 1 for write
|
|
SequenceNumber seq2 = expected_seq;
|
|
if (options.two_write_queues) {
|
|
expected_seq++; // 1 for commit
|
|
}
|
|
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
|
ASSERT_OK(db->Flush(FlushOptions()));
|
|
db->ReleaseSnapshot(snapshot1);
|
|
db->ReleaseSnapshot(snapshot3);
|
|
// Dummy keys to avoid compaction trivially move files and get around actual
|
|
// compaction logic.
|
|
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
|
|
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
|
|
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
|
|
VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2);
|
|
VerifyInternalKeys({
|
|
{"key1", "value1_2", seq1, kTypeValue},
|
|
// "value1_1" is visible to snapshot2. Also keys at bottom level visible
|
|
// to earliest snapshot will output with seq = 0.
|
|
{"key1", "value1_1", 0, kTypeValue},
|
|
{"key2", "value2_2", seq2, kTypeValue},
|
|
});
|
|
db->ReleaseSnapshot(snapshot2);
|
|
}
|
|
|
|
// A more complex test to verify compaction/flush should keep keys visible
|
|
// to snapshots.
|
|
TEST_P(WritePreparedTransactionTest,
|
|
CompactionShouldKeepSnapshotVisibleKeysRandomized) {
|
|
constexpr size_t kNumTransactions = 10;
|
|
constexpr size_t kNumIterations = 1000;
|
|
|
|
std::vector<Transaction*> transactions(kNumTransactions, nullptr);
|
|
std::vector<size_t> versions(kNumTransactions, 0);
|
|
std::unordered_map<std::string, std::string> current_data;
|
|
std::vector<const Snapshot*> snapshots;
|
|
std::vector<std::unordered_map<std::string, std::string>> snapshot_data;
|
|
|
|
Random rnd(1103);
|
|
options.disable_auto_compactions = true;
|
|
ReOpen();
|
|
|
|
for (size_t i = 0; i < kNumTransactions; i++) {
|
|
std::string key = "key" + ToString(i);
|
|
std::string value = "value0";
|
|
ASSERT_OK(db->Put(WriteOptions(), key, value));
|
|
current_data[key] = value;
|
|
}
|
|
VerifyKeys(current_data);
|
|
|
|
for (size_t iter = 0; iter < kNumIterations; iter++) {
|
|
auto r = rnd.Next() % (kNumTransactions + 1);
|
|
if (r < kNumTransactions) {
|
|
std::string key = "key" + ToString(r);
|
|
if (transactions[r] == nullptr) {
|
|
std::string value = "value" + ToString(versions[r] + 1);
|
|
auto* txn = db->BeginTransaction(WriteOptions());
|
|
ASSERT_OK(txn->SetName("txn" + ToString(r)));
|
|
ASSERT_OK(txn->Put(key, value));
|
|
ASSERT_OK(txn->Prepare());
|
|
transactions[r] = txn;
|
|
} else {
|
|
std::string value = "value" + ToString(++versions[r]);
|
|
ASSERT_OK(transactions[r]->Commit());
|
|
delete transactions[r];
|
|
transactions[r] = nullptr;
|
|
current_data[key] = value;
|
|
}
|
|
} else {
|
|
auto* snapshot = db->GetSnapshot();
|
|
VerifyKeys(current_data, snapshot);
|
|
snapshots.push_back(snapshot);
|
|
snapshot_data.push_back(current_data);
|
|
}
|
|
VerifyKeys(current_data);
|
|
}
|
|
// Take a last snapshot to test compaction with uncommitted prepared
|
|
// transaction.
|
|
snapshots.push_back(db->GetSnapshot());
|
|
snapshot_data.push_back(current_data);
|
|
|
|
assert(snapshots.size() == snapshot_data.size());
|
|
for (size_t i = 0; i < snapshots.size(); i++) {
|
|
VerifyKeys(snapshot_data[i], snapshots[i]);
|
|
}
|
|
ASSERT_OK(db->Flush(FlushOptions()));
|
|
for (size_t i = 0; i < snapshots.size(); i++) {
|
|
VerifyKeys(snapshot_data[i], snapshots[i]);
|
|
}
|
|
// Dummy keys to avoid compaction trivially move files and get around actual
|
|
// compaction logic.
|
|
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
|
|
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
|
|
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
for (size_t i = 0; i < snapshots.size(); i++) {
|
|
VerifyKeys(snapshot_data[i], snapshots[i]);
|
|
}
|
|
// cleanup
|
|
for (size_t i = 0; i < kNumTransactions; i++) {
|
|
if (transactions[i] == nullptr) {
|
|
continue;
|
|
}
|
|
ASSERT_OK(transactions[i]->Commit());
|
|
delete transactions[i];
|
|
}
|
|
for (size_t i = 0; i < snapshots.size(); i++) {
|
|
db->ReleaseSnapshot(snapshots[i]);
|
|
}
|
|
}
|
|
|
|
// Compaction should not apply the optimization to output key with sequence
|
|
// number equal to 0 if the key is not visible to earliest snapshot, based on
|
|
// commit sequence number.
|
|
TEST_P(WritePreparedTransactionTest,
|
|
CompactionShouldKeepSequenceForUncommittedKeys) {
|
|
options.disable_auto_compactions = true;
|
|
ReOpen();
|
|
// Keep track of expected sequence number.
|
|
SequenceNumber expected_seq = 0;
|
|
auto* transaction = db->BeginTransaction(WriteOptions());
|
|
ASSERT_OK(transaction->SetName("txn"));
|
|
ASSERT_OK(transaction->Put("key1", "value1"));
|
|
ASSERT_OK(transaction->Prepare());
|
|
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
|
|
SequenceNumber seq1 = expected_seq;
|
|
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
|
|
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
|
expected_seq++; // one for data
|
|
if (options.two_write_queues) {
|
|
expected_seq++; // one for commit
|
|
}
|
|
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
|
|
ASSERT_OK(db->Flush(FlushOptions()));
|
|
// Dummy keys to avoid compaction trivially move files and get around actual
|
|
// compaction logic.
|
|
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
|
|
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
|
|
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
VerifyKeys({
|
|
{"key1", "NOT_FOUND"},
|
|
{"key2", "value2"},
|
|
});
|
|
VerifyInternalKeys({
|
|
// "key1" has not been committed. It keeps its sequence number.
|
|
{"key1", "value1", seq1, kTypeValue},
|
|
// "key2" is committed and output with seq = 0.
|
|
{"key2", "value2", 0, kTypeValue},
|
|
});
|
|
ASSERT_OK(transaction->Commit());
|
|
VerifyKeys({
|
|
{"key1", "value1"},
|
|
{"key2", "value2"},
|
|
});
|
|
delete transaction;
|
|
}
|
|
|
|
TEST_P(WritePreparedTransactionTest, Iterate) {
|
|
auto verify_state = [](Iterator* iter, const std::string& key,
|
|
const std::string& value) {
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_EQ(key, iter->key().ToString());
|
|
ASSERT_EQ(value, iter->value().ToString());
|
|
};
|
|
|
|
auto verify_iter = [&](const std::string& expected_val) {
|
|
// Get iterator from a concurrent transaction and make sure it has the
|
|
// same view as an iterator from the DB.
|
|
auto* txn = db->BeginTransaction(WriteOptions());
|
|
|
|
for (int i = 0; i < 2; i++) {
|
|
Iterator* iter = (i == 0)
|
|
? db->NewIterator(ReadOptions())
|
|
: txn->GetIterator(ReadOptions());
|
|
// Seek
|
|
iter->Seek("foo");
|
|
verify_state(iter, "foo", expected_val);
|
|
// Next
|
|
iter->Seek("a");
|
|
verify_state(iter, "a", "va");
|
|
iter->Next();
|
|
verify_state(iter, "foo", expected_val);
|
|
// SeekForPrev
|
|
iter->SeekForPrev("y");
|
|
verify_state(iter, "foo", expected_val);
|
|
// Prev
|
|
iter->SeekForPrev("z");
|
|
verify_state(iter, "z", "vz");
|
|
iter->Prev();
|
|
verify_state(iter, "foo", expected_val);
|
|
delete iter;
|
|
}
|
|
delete txn;
|
|
};
|
|
|
|
ASSERT_OK(db->Put(WriteOptions(), "foo", "v1"));
|
|
auto* transaction = db->BeginTransaction(WriteOptions());
|
|
ASSERT_OK(transaction->SetName("txn"));
|
|
ASSERT_OK(transaction->Put("foo", "v2"));
|
|
ASSERT_OK(transaction->Prepare());
|
|
VerifyKeys({{"foo", "v1"}});
|
|
// dummy keys
|
|
ASSERT_OK(db->Put(WriteOptions(), "a", "va"));
|
|
ASSERT_OK(db->Put(WriteOptions(), "z", "vz"));
|
|
verify_iter("v1");
|
|
ASSERT_OK(transaction->Commit());
|
|
VerifyKeys({{"foo", "v2"}});
|
|
verify_iter("v2");
|
|
delete transaction;
|
|
}
|
|
|
|
TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
|
|
Iterator* iter = db->NewIterator(ReadOptions());
|
|
ASSERT_TRUE(iter->Refresh().IsNotSupported());
|
|
delete iter;
|
|
}
|
|
|
|
// Test that updating the commit map will not affect the existing snapshots
|
|
TEST_P(WritePreparedTransactionTest, AtomicCommit) {
|
|
for (bool skip_prepare : {true, false}) {
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
|
{"WritePreparedTxnDB::AddCommitted:start",
|
|
"AtomicCommit::GetSnapshot:start"},
|
|
{"AtomicCommit::Get:end",
|
|
"WritePreparedTxnDB::AddCommitted:start:pause"},
|
|
{"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
|
|
{"AtomicCommit::Get2:end",
|
|
"WritePreparedTxnDB::AddCommitted:end:pause:"},
|
|
});
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
rocksdb::port::Thread write_thread([&]() {
|
|
if (skip_prepare) {
|
|
db->Put(WriteOptions(), Slice("key"), Slice("value"));
|
|
} else {
|
|
Transaction* txn =
|
|
db->BeginTransaction(WriteOptions(), TransactionOptions());
|
|
ASSERT_OK(txn->SetName("xid"));
|
|
ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
|
|
ASSERT_OK(txn->Prepare());
|
|
ASSERT_OK(txn->Commit());
|
|
delete txn;
|
|
}
|
|
});
|
|
rocksdb::port::Thread read_thread([&]() {
|
|
ReadOptions roptions;
|
|
TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
|
|
roptions.snapshot = db->GetSnapshot();
|
|
PinnableSlice val;
|
|
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val);
|
|
TEST_SYNC_POINT("AtomicCommit::Get:end");
|
|
TEST_SYNC_POINT("AtomicCommit::Get2:start");
|
|
ASSERT_SAME(roptions, db, s, val, "key");
|
|
TEST_SYNC_POINT("AtomicCommit::Get2:end");
|
|
db->ReleaseSnapshot(roptions.snapshot);
|
|
});
|
|
read_thread.join();
|
|
write_thread.join();
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
}
|
|
|
|
// Test that we can change write policy from WriteCommitted to WritePrepared
|
|
// after a clean shutdown (which would empty the WAL)
|
|
TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {
|
|
bool empty_wal = true;
|
|
CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal);
|
|
}
|
|
|
|
// Test that we fail fast if WAL is not emptied between changing the write
|
|
// policy from WriteCommitted to WritePrepared
|
|
TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) {
|
|
bool empty_wal = true;
|
|
CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal);
|
|
}
|
|
|
|
// Test that we can change write policy from WritePrepare back to WriteCommitted
|
|
// after a clean shutdown (which would empty the WAL)
|
|
TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) {
|
|
bool empty_wal = true;
|
|
CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal);
|
|
}
|
|
|
|
// Test that we fail fast if WAL is not emptied between changing the write
|
|
// policy from WriteCommitted to WritePrepared
|
|
TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) {
|
|
bool empty_wal = true;
|
|
CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal);
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|
|
|
|
#else
|
|
#include <stdio.h>
|
|
|
|
int main(int /*argc*/, char** /*argv*/) {
|
|
fprintf(stderr,
|
|
"SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
|
|
return 0;
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|