WritePrepared Txn: Move DB class to its own file

Summary:
Move  WritePreparedTxnDB from pessimistic_transaction_db.h to its own header, write_prepared_txn_db.h
Closes https://github.com/facebook/rocksdb/pull/3114

Differential Revision: D6220987

Pulled By: maysamyabandeh

fbshipit-source-id: 18893fb4fdc6b809fe117dabb544080f9b4a301b
This commit is contained in:
Maysam Yabandeh 2017-11-02 11:05:55 -07:00 committed by Facebook Github Bot
parent 6778690b51
commit 60d83df23d
10 changed files with 888 additions and 823 deletions

View File

@ -554,6 +554,7 @@ set(SOURCES
utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_lock_mgr.cc
utilities/transactions/transaction_util.cc utilities/transactions/transaction_util.cc
utilities/transactions/write_prepared_txn.cc utilities/transactions/write_prepared_txn.cc
utilities/transactions/write_prepared_txn_db.cc
utilities/ttl/db_ttl_impl.cc utilities/ttl/db_ttl_impl.cc
utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index.cc
utilities/write_batch_with_index/write_batch_with_index_internal.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc
@ -576,12 +577,12 @@ if(WIN32)
port/win/win_logger.cc port/win/win_logger.cc
port/win/win_thread.cc port/win/win_thread.cc
port/win/xpress_win.cc) port/win/xpress_win.cc)
if(WITH_JEMALLOC) if(WITH_JEMALLOC)
list(APPEND SOURCES list(APPEND SOURCES
port/win/win_jemalloc.cc) port/win/win_jemalloc.cc)
endif() endif()
else() else()
list(APPEND SOURCES list(APPEND SOURCES
port/port_posix.cc port/port_posix.cc

View File

@ -256,6 +256,7 @@ cpp_library(
"utilities/transactions/transaction_lock_mgr.cc", "utilities/transactions/transaction_lock_mgr.cc",
"utilities/transactions/transaction_util.cc", "utilities/transactions/transaction_util.cc",
"utilities/transactions/write_prepared_txn.cc", "utilities/transactions/write_prepared_txn.cc",
"utilities/transactions/write_prepared_txn_db.cc",
"utilities/ttl/db_ttl_impl.cc", "utilities/ttl/db_ttl_impl.cc",
"utilities/write_batch_with_index/write_batch_with_index.cc", "utilities/write_batch_with_index/write_batch_with_index.cc",
"utilities/write_batch_with_index/write_batch_with_index_internal.cc", "utilities/write_batch_with_index/write_batch_with_index_internal.cc",

1
src.mk
View File

@ -202,6 +202,7 @@ LIB_SOURCES = \
utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_lock_mgr.cc \
utilities/transactions/transaction_util.cc \ utilities/transactions/transaction_util.cc \
utilities/transactions/write_prepared_txn.cc \ utilities/transactions/write_prepared_txn.cc \
utilities/transactions/write_prepared_txn_db.cc \
utilities/ttl/db_ttl_impl.cc \ utilities/ttl/db_ttl_impl.cc \
utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \
utilities/write_batch_with_index/write_batch_with_index_internal.cc \ utilities/write_batch_with_index/write_batch_with_index_internal.cc \

View File

@ -11,7 +11,6 @@
#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/pessimistic_transaction_db.h"
#include <inttypes.h>
#include <string> #include <string>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
@ -25,6 +24,7 @@
#include "util/sync_point.h" #include "util/sync_point.h"
#include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_db_mutex_impl.h" #include "utilities/transactions/transaction_db_mutex_impl.h"
#include "utilities/transactions/write_prepared_txn_db.h"
namespace rocksdb { namespace rocksdb {
@ -135,26 +135,6 @@ Status PessimisticTransactionDB::Initialize(
return s; return s;
} }
Status WritePreparedTxnDB::Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) {
auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
assert(dbimpl != nullptr);
auto rtxns = dbimpl->recovered_transactions();
for (auto rtxn : rtxns) {
AddPrepared(rtxn.second->seq_);
}
SequenceNumber prev_max = max_evicted_seq_;
SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
AdvanceMaxEvictedSeq(prev_max, last_seq);
db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
handles);
return s;
}
Transaction* WriteCommittedTxnDB::BeginTransaction( Transaction* WriteCommittedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options, const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) { Transaction* old_txn) {
@ -166,17 +146,6 @@ Transaction* WriteCommittedTxnDB::BeginTransaction(
} }
} }
Transaction* WritePreparedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn;
} else {
return new WritePreparedTxn(this, write_options, txn_options);
}
}
TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions( TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
const TransactionDBOptions& txn_db_options) { const TransactionDBOptions& txn_db_options) {
TransactionDBOptions validated = txn_db_options; TransactionDBOptions validated = txn_db_options;
@ -571,458 +540,5 @@ void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
transactions_.erase(it); transactions_.erase(it);
} }
Status WritePreparedTxnDB::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
// We are fine with the latest committed value. This could be done by
// specifying the snapshot as kMaxSequenceNumber.
SequenceNumber seq = kMaxSequenceNumber;
if (options.snapshot != nullptr) {
seq = options.snapshot->GetSequenceNumber();
}
WritePreparedTxnReadCallback callback(this, seq);
bool* dont_care = nullptr;
// Note: no need to specify a snapshot for read options as no specific
// snapshot is requested by the user.
return db_impl_->GetImpl(options, column_family, key, value, dont_care,
&callback);
}
// Struct to hold ownership of snapshot and read callback for iterator cleanup.
struct WritePreparedTxnDB::IteratorState {
IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
std::shared_ptr<ManagedSnapshot> s)
: callback(txn_db, sequence), snapshot(s) {}
WritePreparedTxnReadCallback callback;
std::shared_ptr<ManagedSnapshot> snapshot;
};
namespace {
static void CleanupWritePreparedTxnDBIterator(void* arg1, void* arg2) {
delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
}
} // anonymous namespace
Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) {
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber();
} else {
auto* snapshot = db_impl_->GetSnapshot();
snapshot_seq = snapshot->GetSequenceNumber();
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
}
assert(snapshot_seq != kMaxSequenceNumber);
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* state = new IteratorState(this, snapshot_seq, own_snapshot);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
return db_iter;
}
Status WritePreparedTxnDB::NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber();
} else {
auto* snapshot = db_impl_->GetSnapshot();
snapshot_seq = snapshot->GetSequenceNumber();
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
}
iterators->clear();
iterators->reserve(column_families.size());
for (auto* column_family : column_families) {
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* state = new IteratorState(this, snapshot_seq, own_snapshot);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
iterators->push_back(db_iter);
}
return Status::OK();
}
void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
// Adcance max_evicted_seq_ no more than 100 times before the cache wraps
// around.
INC_STEP_FOR_MAX_EVICTED =
std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast<size_t>(1));
snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
commit_cache_ = unique_ptr<std::atomic<CommitEntry64b>[]>(
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
}
// Returns true if commit_seq <= snapshot_seq
bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
uint64_t snapshot_seq) const {
// Here we try to infer the return value without looking into prepare list.
// This would help avoiding synchronization over a shared map.
// TODO(myabandeh): read your own writes
// TODO(myabandeh): optimize this. This sequence of checks must be correct but
// not necessary efficient
if (prep_seq == 0) {
// Compaction will output keys to bottom-level with sequence number 0 if
// it is visible to the earliest snapshot.
return true;
}
if (snapshot_seq < prep_seq) {
// snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
return false;
}
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here
ReadLock rl(&prepared_mutex_);
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// Then it is not committed yet
return false;
}
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
CommitEntry64b dont_care;
CommitEntry cached;
bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
if (exist && prep_seq == cached.prep_seq) {
// It is committed and also not evicted from commit cache
return cached.commit_seq <= snapshot_seq;
}
// else it could be committed but not inserted in the map which could happen
// after recovery, or it could be committed and evicted by another commit, or
// never committed.
// At this point we dont know if it was committed or it is still prepared
auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
if (max_evicted_seq < prep_seq) {
// Not evicted from cache and also not present, so must be still prepared
return false;
}
// When advancing max_evicted_seq_, we move older entires from prepared to
// delayed_prepared_. Also we move evicted entries from commit cache to
// old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
// max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
// old_commit_map_, iii) committed with no conflict with any snapshot (i)
// delayed_prepared_ is checked above
if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case
// only (iii) is the case: committed
// commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
// snapshot_seq
return true;
}
// else (ii) might be the case: check the commit data saved for this snapshot.
// If there was no overlapping commit entry, then it is committed with a
// commit_seq lower than any live snapshot, including snapshot_seq.
if (old_commit_map_empty_.load(std::memory_order_acquire)) {
return true;
}
{
// We should not normally reach here
ReadLock rl(&old_commit_map_mutex_);
auto old_commit_entry = old_commit_map_.find(prep_seq);
if (old_commit_entry == old_commit_map_.end() ||
old_commit_entry->second <= snapshot_seq) {
return true;
}
}
// (ii) it the case: it is committed but after the snapshot_seq
return false;
}
void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq);
// TODO(myabandeh): Add a runtime check to ensure the following assert.
assert(seq > max_evicted_seq_);
WriteLock wl(&prepared_mutex_);
prepared_txns_.push(seq);
}
void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq,
uint64_t rollback_seq) {
ROCKS_LOG_DEBUG(
info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "",
prep_seq, rollback_seq);
std::vector<SequenceNumber> snapshots =
GetSnapshotListFromDB(kMaxSequenceNumber);
// TODO(myabandeh): currently we are assuming that there is no snapshot taken
// when a transaciton is rolled back. This is the case the way MySQL does
// rollback which is after recovery. We should extend it to be able to
// rollback txns that overlap with exsiting snapshots.
assert(snapshots.size() == 0);
if (snapshots.size()) {
throw std::runtime_error(
"Rollback reqeust while there are live snapshots.");
}
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prep_seq);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prep_seq);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
}
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
uint64_t commit_seq) {
ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
prepare_seq, commit_seq);
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
CommitEntry64b evicted_64b;
CommitEntry evicted;
bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
if (to_be_evicted) {
auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
if (prev_max < evicted.commit_seq) {
// Inc max in larger steps to avoid frequent updates
auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED;
AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
}
// After each eviction from commit cache, check if the commit entry should
// be kept around because it overlaps with a live snapshot.
CheckAgainstSnapshots(evicted);
}
bool succ =
ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
if (!succ) {
// A very rare event, in which the commit entry is updated before we do.
// Here we apply a very simple solution of retrying.
// TODO(myabandeh): do precautions to detect bugs that cause infinite loops
AddCommitted(prepare_seq, commit_seq);
return;
}
{
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prepare_seq);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prepare_seq);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
}
}
bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
CommitEntry64b* entry_64b,
CommitEntry* entry) const {
*entry_64b = commit_cache_[indexed_seq].load(std::memory_order_acquire);
bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
return valid;
}
bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
const CommitEntry& new_entry,
CommitEntry* evicted_entry) {
CommitEntry64b new_entry_64b(new_entry, FORMAT);
CommitEntry64b evicted_entry_64b = commit_cache_[indexed_seq].exchange(
new_entry_64b, std::memory_order_acq_rel);
bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
return valid;
}
bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
CommitEntry64b& expected_entry_64b,
const CommitEntry& new_entry) {
auto& atomic_entry = commit_cache_[indexed_seq];
CommitEntry64b new_entry_64b(new_entry, FORMAT);
bool succ = atomic_entry.compare_exchange_strong(
expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
std::memory_order_acquire);
return succ;
}
void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max,
SequenceNumber& new_max) {
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
// then it is not in prepared_txns_ ans save an expensive, synchronized
// lookup from a shared set. delayed_prepared_ is expected to be empty in
// normal cases.
{
WriteLock wl(&prepared_mutex_);
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped);
prepared_txns_.pop();
delayed_prepared_empty_.store(false, std::memory_order_release);
}
}
// With each change to max_evicted_seq_ fetch the live snapshots behind it.
// We use max as the version of snapshots to identify how fresh are the
// snapshot list. This works because the snapshots are between 0 and
// max, so the larger the max, the more complete they are.
SequenceNumber new_snapshots_version = new_max;
std::vector<SequenceNumber> snapshots;
bool update_snapshots = false;
if (new_snapshots_version > snapshots_version_) {
// This is to avoid updating the snapshots_ if it already updated
// with a more recent vesion by a concrrent thread
update_snapshots = true;
// We only care about snapshots lower then max
snapshots = GetSnapshotListFromDB(new_max);
}
if (update_snapshots) {
UpdateSnapshots(snapshots, new_snapshots_version);
}
while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak(
prev_max, new_max, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
};
}
const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
SequenceNumber max) {
InstrumentedMutex(db_impl_->mutex());
return db_impl_->snapshots().GetAll(nullptr, max);
}
void WritePreparedTxnDB::UpdateSnapshots(
const std::vector<SequenceNumber>& snapshots,
const SequenceNumber& version) {
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
#ifndef NDEBUG
size_t sync_i = 0;
#endif
WriteLock wl(&snapshots_mutex_);
snapshots_version_ = version;
// We update the list concurrently with the readers.
// Both new and old lists are sorted and the new list is subset of the
// previous list plus some new items. Thus if a snapshot repeats in
// both new and old lists, it will appear upper in the new list. So if
// we simply insert the new snapshots in order, if an overwritten item
// is still valid in the new list is either written to the same place in
// the array or it is written in a higher palce before it gets
// overwritten by another item. This guarantess a reader that reads the
// list bottom-up will eventaully see a snapshot that repeats in the
// update, either before it gets overwritten by the writer or
// afterwards.
size_t i = 0;
auto it = snapshots.begin();
for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) {
snapshot_cache_[i].store(*it, std::memory_order_release);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
}
#ifndef NDEBUG
// Release the remaining sync points since they are useless given that the
// reader would also use lock to access snapshots
for (++sync_i; sync_i <= 10; ++sync_i) {
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
}
#endif
snapshots_.clear();
for (; it != snapshots.end(); it++) {
// Insert them to a vector that is less efficient to access
// concurrently
snapshots_.push_back(*it);
}
// Update the size at the end. Otherwise a parallel reader might read
// items that are not set yet.
snapshots_total_.store(snapshots.size(), std::memory_order_release);
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
}
void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
#ifndef NDEBUG
size_t sync_i = 0;
#endif
// First check the snapshot cache that is efficient for concurrent access
auto cnt = snapshots_total_.load(std::memory_order_acquire);
// The list might get updated concurrently as we are reading from it. The
// reader should be able to read all the snapshots that are still valid
// after the update. Since the survived snapshots are written in a higher
// place before gets overwritten the reader that reads bottom-up will
// eventully see it.
const bool next_is_larger = true;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
for (; 0 < ip1; ip1--) {
snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
++sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq, !next_is_larger)) {
break;
}
}
#ifndef NDEBUG
// Release the remaining sync points before accquiring the lock
for (++sync_i; sync_i <= 10; ++sync_i) {
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
}
#endif
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
snapshot_seq < evicted.prep_seq)) {
// Then access the less efficient list of snapshots_
ReadLock rl(&snapshots_mutex_);
// Items could have moved from the snapshots_ to snapshot_cache_ before
// accquiring the lock. To make sure that we do not miss a valid snapshot,
// read snapshot_cache_ again while holding the lock.
for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire);
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq, next_is_larger)) {
break;
}
}
for (auto snapshot_seq_2 : snapshots_) {
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq_2, next_is_larger)) {
break;
}
}
}
}
bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
const uint64_t& prep_seq, const uint64_t& commit_seq,
const uint64_t& snapshot_seq, const bool next_is_larger = true) {
// If we do not store an entry in old_commit_map we assume it is committed in
// all snapshots. if commit_seq <= snapshot_seq, it is considered already in
// the snapshot so we need not to keep the entry around for this snapshot.
if (commit_seq <= snapshot_seq) {
// continue the search if the next snapshot could be smaller than commit_seq
return !next_is_larger;
}
// then snapshot_seq < commit_seq
if (prep_seq <= snapshot_seq) { // overlapping range
WriteLock wl(&old_commit_map_mutex_);
old_commit_map_empty_.store(false, std::memory_order_release);
old_commit_map_[prep_seq] = commit_seq;
// Storing once is enough. No need to check it for other snapshots.
return false;
}
// continue the search if the next snapshot could be larger than prep_seq
return next_is_larger;
}
WritePreparedTxnDB::~WritePreparedTxnDB() {
// At this point there could be running compaction/flush holding a
// SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
// Make sure those jobs finished before destructing WritePreparedTxnDB.
db_impl_->CancelAllBackgroundWork(true/*wait*/);
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

View File

@ -162,339 +162,5 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB {
Transaction* old_txn) override; Transaction* old_txn) override;
}; };
// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
// In this way some data in the DB might not be committed. The DB provides
// mechanisms to tell such data apart from committed data.
class WritePreparedTxnDB : public PessimisticTransactionDB {
public:
explicit WritePreparedTxnDB(
DB* db, const TransactionDBOptions& txn_db_options,
size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
: PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
COMMIT_CACHE_BITS(commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) {
Init(txn_db_options);
}
explicit WritePreparedTxnDB(
StackableDB* db, const TransactionDBOptions& txn_db_options,
size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
: PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
COMMIT_CACHE_BITS(commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) {
Init(txn_db_options);
}
virtual ~WritePreparedTxnDB();
virtual Status Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) override;
Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options,
Transaction* old_txn) override;
using DB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using DB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override;
using DB::NewIterators;
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;
// Check whether the transaction that wrote the value with seqeunce number seq
// is visible to the snapshot with sequence number snapshot_seq
bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const;
// Add the trasnaction with prepare sequence seq to the prepared list
void AddPrepared(uint64_t seq);
// Rollback a prepared txn identified with prep_seq. rollback_seq is the seq
// with which the additional data is written to cancel the txn effect. It can
// be used to idenitfy the snapshots that overlap with the rolled back txn.
void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq);
// Add the transaction with prepare sequence prepare_seq and commit sequence
// commit_seq to the commit map
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq);
struct CommitEntry {
uint64_t prep_seq;
uint64_t commit_seq;
CommitEntry() : prep_seq(0), commit_seq(0) {}
CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
bool operator==(const CommitEntry& rhs) const {
return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
}
};
struct CommitEntry64bFormat {
explicit CommitEntry64bFormat(size_t index_bits)
: INDEX_BITS(index_bits),
PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)) {}
// Number of higher bits of a sequence number that is not used. They are
// used to encode the value type, ...
const size_t PAD_BITS = static_cast<size_t>(8);
// Number of lower bits from prepare seq that can be skipped as they are
// implied by the index of the entry in the array
const size_t INDEX_BITS;
// Number of bits we use to encode the prepare seq
const size_t PREP_BITS;
// Number of bits we use to encode the commit seq.
const size_t COMMIT_BITS;
// Filter to encode/decode commit seq
const uint64_t COMMIT_FILTER;
};
// Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
// INDEX Detal Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ...
// DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA
// ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
// hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
// bits that do not have to be encoded (will be provided externally) DELTA:
// prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
// index bits + PADs
struct CommitEntry64b {
constexpr CommitEntry64b() noexcept : rep_(0) {}
CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
: CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
CommitEntry64b(const uint64_t ps, const uint64_t cs,
const CommitEntry64bFormat& format) {
assert(ps < static_cast<uint64_t>(
(1ull << (format.PREP_BITS + format.INDEX_BITS))));
assert(ps <= cs);
uint64_t delta = cs - ps + 1; // make initialized delta always >= 1
// zero is reserved for uninitialized entries
assert(0 < delta);
assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
rep_ = rep_ | delta;
}
// Return false if the entry is empty
bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
const CommitEntry64bFormat& format) {
uint64_t delta = rep_ & format.COMMIT_FILTER;
// zero is reserved for uninitialized entries
assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
if (delta == 0) {
return false; // initialized entry would have non-zero delta
}
assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)));
uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
prep_up >>= format.PAD_BITS;
const uint64_t& prep_low = indexed_seq;
entry->prep_seq = prep_up | prep_low;
entry->commit_seq = entry->prep_seq + delta - 1;
return true;
}
private:
uint64_t rep_;
};
// Struct to hold ownership of snapshot and read callback for cleanup.
struct IteratorState;
private:
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
friend class WritePreparedTransactionTest_CommitMapTest_Test;
friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test;
friend class WritePreparedTransactionTest;
friend class PreparedHeap_BasicsTest_Test;
friend class WritePreparedTxnDBMock;
friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
friend class WritePreparedTransactionTest_RollbackTest_Test;
void Init(const TransactionDBOptions& /* unused */);
// A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap.
class PreparedHeap {
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
heap_;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
erased_heap_;
public:
bool empty() { return heap_.empty(); }
uint64_t top() { return heap_.top(); }
void push(uint64_t v) { heap_.push(v); }
void pop() {
heap_.pop();
while (!heap_.empty() && !erased_heap_.empty() &&
heap_.top() == erased_heap_.top()) {
heap_.pop();
erased_heap_.pop();
}
while (heap_.empty() && !erased_heap_.empty()) {
erased_heap_.pop();
}
}
void erase(uint64_t seq) {
if (!heap_.empty()) {
if (seq < heap_.top()) {
// Already popped, ignore it.
} else if (heap_.top() == seq) {
pop();
} else { // (heap_.top() > seq)
// Down the heap, remember to pop it later
erased_heap_.push(seq);
}
}
}
};
// Get the commit entry with index indexed_seq from the commit table. It
// returns true if such entry exists.
bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
CommitEntry* entry) const;
// Rewrite the entry with the index indexed_seq in the commit table with the
// commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
// sets the evicted_entry and returns true.
bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
CommitEntry* evicted_entry);
// Rewrite the entry with the index indexed_seq in the commit table with the
// commit entry new_entry only if the existing entry matches the
// expected_entry. Returns false otherwise.
bool ExchangeCommitEntry(const uint64_t indexed_seq,
CommitEntry64b& expected_entry,
const CommitEntry& new_entry);
// Increase max_evicted_seq_ from the previous value prev_max to the new
// value. This also involves taking care of prepared txns that are not
// committed before new_max, as well as updating the list of live snapshots at
// the time of updating the max. Thread-safety: this function can be called
// concurrently. The concurrent invocations of this function is equivalent to
// a serial invocation in which the last invocation is the one with the
// largetst new_max value.
void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max);
virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
SequenceNumber max);
// Update the list of snapshots corresponding to the soon-to-be-updated
// max_eviceted_seq_. Thread-safety: this function can be called concurrently.
// The concurrent invocations of this function is equivalent to a serial
// invocation in which the last invocation is the one with the largetst
// version value.
void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
const SequenceNumber& version);
// Check an evicted entry against live snapshots to see if it should be kept
// around or it can be safely discarded (and hence assume committed for all
// snapshots). Thread-safety: this function can be called concurrently. If it
// is called concurrently with multiple UpdateSnapshots, the result is the
// same as checking the intersection of the snapshot list before updates with
// the snapshot list of all the concurrent updates.
void CheckAgainstSnapshots(const CommitEntry& evicted);
// Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
// commit_seq. Return false if checking the next snapshot(s) is not needed.
// This is the case if the entry already added to old_commit_map_ or none of
// the next snapshots could satisfy the condition. next_is_larger: the next
// snapshot will be a larger value
bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
const uint64_t& commit_seq,
const uint64_t& snapshot_seq,
const bool next_is_larger);
// The list of live snapshots at the last time that max_evicted_seq_ advanced.
// The list stored into two data structures: in snapshot_cache_ that is
// efficient for concurrent reads, and in snapshots_ if the data does not fit
// into snapshot_cache_. The total number of snapshots in the two lists
std::atomic<size_t> snapshots_total_ = {};
// The list sorted in ascending order. Thread-safety for writes is provided
// with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
// each entry. In x86_64 architecture such reads are compiled to simple read
// instructions. 128 entries
static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast<size_t>(7);
const size_t SNAPSHOT_CACHE_BITS;
const size_t SNAPSHOT_CACHE_SIZE;
unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
// 2nd list for storing snapshots. The list sorted in ascending order.
// Thread-safety is provided with snapshots_mutex_.
std::vector<SequenceNumber> snapshots_;
// The version of the latest list of snapshots. This can be used to avoid
// rewrittiing a list that is concurrently updated with a more recent version.
SequenceNumber snapshots_version_ = 0;
// A heap of prepared transactions. Thread-safety is provided with
// prepared_mutex_.
PreparedHeap prepared_txns_;
// 10m entry, 80MB size
static const size_t DEF_COMMIT_CACHE_BITS = static_cast<size_t>(21);
const size_t COMMIT_CACHE_BITS;
const size_t COMMIT_CACHE_SIZE;
const CommitEntry64bFormat FORMAT;
// commit_cache_ must be initialized to zero to tell apart an empty index from
// a filled one. Thread-safety is provided with commit_cache_mutex_.
unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
// The largest evicted *commit* sequence number from the commit_cache_
std::atomic<uint64_t> max_evicted_seq_ = {};
// Advance max_evicted_seq_ by this value each time it needs an update. The
// larger the value, the less frequent advances we would have. We do not want
// it to be too large either as it would cause stalls by doing too much
// maintenance work under the lock.
size_t INC_STEP_FOR_MAX_EVICTED = 1;
// A map of the evicted entries from commit_cache_ that has to be kept around
// to service the old snapshots. This is expected to be empty normally.
// Thread-safety is provided with old_commit_map_mutex_.
std::map<uint64_t, uint64_t> old_commit_map_;
// A set of long-running prepared transactions that are not finished by the
// time max_evicted_seq_ advances their sequence number. This is expected to
// be empty normally. Thread-safety is provided with prepared_mutex_.
std::set<uint64_t> delayed_prepared_;
// Update when delayed_prepared_.empty() changes. Expected to be true
// normally.
std::atomic<bool> delayed_prepared_empty_ = {true};
// Update when old_commit_map_.empty() changes. Expected to be true normally.
std::atomic<bool> old_commit_map_empty_ = {true};
mutable port::RWMutex prepared_mutex_;
mutable port::RWMutex old_commit_map_mutex_;
mutable port::RWMutex commit_cache_mutex_;
mutable port::RWMutex snapshots_mutex_;
};
class WritePreparedTxnReadCallback : public ReadCallback {
public:
WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
: db_(db), snapshot_(snapshot) {}
// Will be called to see if the seq number accepted; if not it moves on to the
// next seq number.
virtual bool IsCommitted(SequenceNumber seq) override {
return db_->IsInSnapshot(seq, snapshot_);
}
private:
WritePreparedTxnDB* db_;
SequenceNumber snapshot_;
};
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

View File

@ -9,7 +9,7 @@
#include <assert.h> #include <assert.h>
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/write_prepared_txn_db.h"
namespace rocksdb { namespace rocksdb {

View File

@ -36,6 +36,7 @@
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h" #include "utilities/merge_operators/string_append/stringappend.h"
#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/write_prepared_txn_db.h"
#include "port/port.h" #include "port/port.h"

View File

@ -15,7 +15,7 @@
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/write_prepared_txn_db.h"
namespace rocksdb { namespace rocksdb {

View File

@ -0,0 +1,515 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include "utilities/transactions/write_prepared_txn_db.h"
#include <inttypes.h>
#include <string>
#include <unordered_set>
#include <vector>
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/mutexlock.h"
#include "util/sync_point.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
namespace rocksdb {
Status WritePreparedTxnDB::Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) {
auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
assert(dbimpl != nullptr);
auto rtxns = dbimpl->recovered_transactions();
for (auto rtxn : rtxns) {
AddPrepared(rtxn.second->seq_);
}
SequenceNumber prev_max = max_evicted_seq_;
SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
AdvanceMaxEvictedSeq(prev_max, last_seq);
db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
handles);
return s;
}
Transaction* WritePreparedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn;
} else {
return new WritePreparedTxn(this, write_options, txn_options);
}
}
Status WritePreparedTxnDB::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
// We are fine with the latest committed value. This could be done by
// specifying the snapshot as kMaxSequenceNumber.
SequenceNumber seq = kMaxSequenceNumber;
if (options.snapshot != nullptr) {
seq = options.snapshot->GetSequenceNumber();
}
WritePreparedTxnReadCallback callback(this, seq);
bool* dont_care = nullptr;
// Note: no need to specify a snapshot for read options as no specific
// snapshot is requested by the user.
return db_impl_->GetImpl(options, column_family, key, value, dont_care,
&callback);
}
// Struct to hold ownership of snapshot and read callback for iterator cleanup.
struct WritePreparedTxnDB::IteratorState {
IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
std::shared_ptr<ManagedSnapshot> s)
: callback(txn_db, sequence), snapshot(s) {}
WritePreparedTxnReadCallback callback;
std::shared_ptr<ManagedSnapshot> snapshot;
};
namespace {
static void CleanupWritePreparedTxnDBIterator(void* arg1, void* arg2) {
delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
}
} // anonymous namespace
Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) {
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber();
} else {
auto* snapshot = db_impl_->GetSnapshot();
snapshot_seq = snapshot->GetSequenceNumber();
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
}
assert(snapshot_seq != kMaxSequenceNumber);
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* state = new IteratorState(this, snapshot_seq, own_snapshot);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
return db_iter;
}
Status WritePreparedTxnDB::NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber();
} else {
auto* snapshot = db_impl_->GetSnapshot();
snapshot_seq = snapshot->GetSequenceNumber();
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
}
iterators->clear();
iterators->reserve(column_families.size());
for (auto* column_family : column_families) {
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* state = new IteratorState(this, snapshot_seq, own_snapshot);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
iterators->push_back(db_iter);
}
return Status::OK();
}
void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
// Adcance max_evicted_seq_ no more than 100 times before the cache wraps
// around.
INC_STEP_FOR_MAX_EVICTED =
std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast<size_t>(1));
snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
commit_cache_ = unique_ptr<std::atomic<CommitEntry64b>[]>(
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
}
// Returns true if commit_seq <= snapshot_seq
bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
uint64_t snapshot_seq) const {
// Here we try to infer the return value without looking into prepare list.
// This would help avoiding synchronization over a shared map.
// TODO(myabandeh): read your own writes
// TODO(myabandeh): optimize this. This sequence of checks must be correct but
// not necessary efficient
if (prep_seq == 0) {
// Compaction will output keys to bottom-level with sequence number 0 if
// it is visible to the earliest snapshot.
return true;
}
if (snapshot_seq < prep_seq) {
// snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
return false;
}
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here
ReadLock rl(&prepared_mutex_);
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// Then it is not committed yet
return false;
}
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
CommitEntry64b dont_care;
CommitEntry cached;
bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
if (exist && prep_seq == cached.prep_seq) {
// It is committed and also not evicted from commit cache
return cached.commit_seq <= snapshot_seq;
}
// else it could be committed but not inserted in the map which could happen
// after recovery, or it could be committed and evicted by another commit, or
// never committed.
// At this point we dont know if it was committed or it is still prepared
auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
if (max_evicted_seq < prep_seq) {
// Not evicted from cache and also not present, so must be still prepared
return false;
}
// When advancing max_evicted_seq_, we move older entires from prepared to
// delayed_prepared_. Also we move evicted entries from commit cache to
// old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
// max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
// old_commit_map_, iii) committed with no conflict with any snapshot (i)
// delayed_prepared_ is checked above
if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case
// only (iii) is the case: committed
// commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
// snapshot_seq
return true;
}
// else (ii) might be the case: check the commit data saved for this snapshot.
// If there was no overlapping commit entry, then it is committed with a
// commit_seq lower than any live snapshot, including snapshot_seq.
if (old_commit_map_empty_.load(std::memory_order_acquire)) {
return true;
}
{
// We should not normally reach here
ReadLock rl(&old_commit_map_mutex_);
auto old_commit_entry = old_commit_map_.find(prep_seq);
if (old_commit_entry == old_commit_map_.end() ||
old_commit_entry->second <= snapshot_seq) {
return true;
}
}
// (ii) it the case: it is committed but after the snapshot_seq
return false;
}
void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq);
// TODO(myabandeh): Add a runtime check to ensure the following assert.
assert(seq > max_evicted_seq_);
WriteLock wl(&prepared_mutex_);
prepared_txns_.push(seq);
}
void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq,
uint64_t rollback_seq) {
ROCKS_LOG_DEBUG(
info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "",
prep_seq, rollback_seq);
std::vector<SequenceNumber> snapshots =
GetSnapshotListFromDB(kMaxSequenceNumber);
// TODO(myabandeh): currently we are assuming that there is no snapshot taken
// when a transaciton is rolled back. This is the case the way MySQL does
// rollback which is after recovery. We should extend it to be able to
// rollback txns that overlap with exsiting snapshots.
assert(snapshots.size() == 0);
if (snapshots.size()) {
throw std::runtime_error(
"Rollback reqeust while there are live snapshots.");
}
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prep_seq);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prep_seq);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
}
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
uint64_t commit_seq) {
ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
prepare_seq, commit_seq);
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
CommitEntry64b evicted_64b;
CommitEntry evicted;
bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
if (to_be_evicted) {
auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
if (prev_max < evicted.commit_seq) {
// Inc max in larger steps to avoid frequent updates
auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED;
AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
}
// After each eviction from commit cache, check if the commit entry should
// be kept around because it overlaps with a live snapshot.
CheckAgainstSnapshots(evicted);
}
bool succ =
ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
if (!succ) {
// A very rare event, in which the commit entry is updated before we do.
// Here we apply a very simple solution of retrying.
// TODO(myabandeh): do precautions to detect bugs that cause infinite loops
AddCommitted(prepare_seq, commit_seq);
return;
}
{
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prepare_seq);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prepare_seq);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
}
}
bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
CommitEntry64b* entry_64b,
CommitEntry* entry) const {
*entry_64b = commit_cache_[indexed_seq].load(std::memory_order_acquire);
bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
return valid;
}
bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
const CommitEntry& new_entry,
CommitEntry* evicted_entry) {
CommitEntry64b new_entry_64b(new_entry, FORMAT);
CommitEntry64b evicted_entry_64b = commit_cache_[indexed_seq].exchange(
new_entry_64b, std::memory_order_acq_rel);
bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
return valid;
}
bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
CommitEntry64b& expected_entry_64b,
const CommitEntry& new_entry) {
auto& atomic_entry = commit_cache_[indexed_seq];
CommitEntry64b new_entry_64b(new_entry, FORMAT);
bool succ = atomic_entry.compare_exchange_strong(
expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
std::memory_order_acquire);
return succ;
}
void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max,
SequenceNumber& new_max) {
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
// then it is not in prepared_txns_ ans save an expensive, synchronized
// lookup from a shared set. delayed_prepared_ is expected to be empty in
// normal cases.
{
WriteLock wl(&prepared_mutex_);
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped);
prepared_txns_.pop();
delayed_prepared_empty_.store(false, std::memory_order_release);
}
}
// With each change to max_evicted_seq_ fetch the live snapshots behind it.
// We use max as the version of snapshots to identify how fresh are the
// snapshot list. This works because the snapshots are between 0 and
// max, so the larger the max, the more complete they are.
SequenceNumber new_snapshots_version = new_max;
std::vector<SequenceNumber> snapshots;
bool update_snapshots = false;
if (new_snapshots_version > snapshots_version_) {
// This is to avoid updating the snapshots_ if it already updated
// with a more recent vesion by a concrrent thread
update_snapshots = true;
// We only care about snapshots lower then max
snapshots = GetSnapshotListFromDB(new_max);
}
if (update_snapshots) {
UpdateSnapshots(snapshots, new_snapshots_version);
}
while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak(
prev_max, new_max, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
};
}
const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
SequenceNumber max) {
InstrumentedMutex(db_impl_->mutex());
return db_impl_->snapshots().GetAll(nullptr, max);
}
void WritePreparedTxnDB::UpdateSnapshots(
const std::vector<SequenceNumber>& snapshots,
const SequenceNumber& version) {
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
#ifndef NDEBUG
size_t sync_i = 0;
#endif
WriteLock wl(&snapshots_mutex_);
snapshots_version_ = version;
// We update the list concurrently with the readers.
// Both new and old lists are sorted and the new list is subset of the
// previous list plus some new items. Thus if a snapshot repeats in
// both new and old lists, it will appear upper in the new list. So if
// we simply insert the new snapshots in order, if an overwritten item
// is still valid in the new list is either written to the same place in
// the array or it is written in a higher palce before it gets
// overwritten by another item. This guarantess a reader that reads the
// list bottom-up will eventaully see a snapshot that repeats in the
// update, either before it gets overwritten by the writer or
// afterwards.
size_t i = 0;
auto it = snapshots.begin();
for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) {
snapshot_cache_[i].store(*it, std::memory_order_release);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
}
#ifndef NDEBUG
// Release the remaining sync points since they are useless given that the
// reader would also use lock to access snapshots
for (++sync_i; sync_i <= 10; ++sync_i) {
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
}
#endif
snapshots_.clear();
for (; it != snapshots.end(); it++) {
// Insert them to a vector that is less efficient to access
// concurrently
snapshots_.push_back(*it);
}
// Update the size at the end. Otherwise a parallel reader might read
// items that are not set yet.
snapshots_total_.store(snapshots.size(), std::memory_order_release);
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
}
void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
#ifndef NDEBUG
size_t sync_i = 0;
#endif
// First check the snapshot cache that is efficient for concurrent access
auto cnt = snapshots_total_.load(std::memory_order_acquire);
// The list might get updated concurrently as we are reading from it. The
// reader should be able to read all the snapshots that are still valid
// after the update. Since the survived snapshots are written in a higher
// place before gets overwritten the reader that reads bottom-up will
// eventully see it.
const bool next_is_larger = true;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
for (; 0 < ip1; ip1--) {
snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
++sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq, !next_is_larger)) {
break;
}
}
#ifndef NDEBUG
// Release the remaining sync points before accquiring the lock
for (++sync_i; sync_i <= 10; ++sync_i) {
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
}
#endif
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
snapshot_seq < evicted.prep_seq)) {
// Then access the less efficient list of snapshots_
ReadLock rl(&snapshots_mutex_);
// Items could have moved from the snapshots_ to snapshot_cache_ before
// accquiring the lock. To make sure that we do not miss a valid snapshot,
// read snapshot_cache_ again while holding the lock.
for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire);
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq, next_is_larger)) {
break;
}
}
for (auto snapshot_seq_2 : snapshots_) {
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq_2, next_is_larger)) {
break;
}
}
}
}
bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
const uint64_t& prep_seq, const uint64_t& commit_seq,
const uint64_t& snapshot_seq, const bool next_is_larger = true) {
// If we do not store an entry in old_commit_map we assume it is committed in
// all snapshots. if commit_seq <= snapshot_seq, it is considered already in
// the snapshot so we need not to keep the entry around for this snapshot.
if (commit_seq <= snapshot_seq) {
// continue the search if the next snapshot could be smaller than commit_seq
return !next_is_larger;
}
// then snapshot_seq < commit_seq
if (prep_seq <= snapshot_seq) { // overlapping range
WriteLock wl(&old_commit_map_mutex_);
old_commit_map_empty_.store(false, std::memory_order_release);
old_commit_map_[prep_seq] = commit_seq;
// Storing once is enough. No need to check it for other snapshots.
return false;
}
// continue the search if the next snapshot could be larger than prep_seq
return next_is_larger;
}
WritePreparedTxnDB::~WritePreparedTxnDB() {
// At this point there could be running compaction/flush holding a
// SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
// Make sure those jobs finished before destructing WritePreparedTxnDB.
db_impl_->CancelAllBackgroundWork(true /*wait*/);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,364 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <mutex>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/db_iter.h"
#include "db/read_callback.h"
#include "db/snapshot_checker.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_lock_mgr.h"
#include "utilities/transactions/write_prepared_txn.h"
namespace rocksdb {
// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
// In this way some data in the DB might not be committed. The DB provides
// mechanisms to tell such data apart from committed data.
class WritePreparedTxnDB : public PessimisticTransactionDB {
public:
explicit WritePreparedTxnDB(
DB* db, const TransactionDBOptions& txn_db_options,
size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
: PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
COMMIT_CACHE_BITS(commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) {
Init(txn_db_options);
}
explicit WritePreparedTxnDB(
StackableDB* db, const TransactionDBOptions& txn_db_options,
size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
: PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
COMMIT_CACHE_BITS(commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) {
Init(txn_db_options);
}
virtual ~WritePreparedTxnDB();
virtual Status Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) override;
Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options,
Transaction* old_txn) override;
using DB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using DB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override;
using DB::NewIterators;
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;
// Check whether the transaction that wrote the value with seqeunce number seq
// is visible to the snapshot with sequence number snapshot_seq
bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const;
// Add the trasnaction with prepare sequence seq to the prepared list
void AddPrepared(uint64_t seq);
// Rollback a prepared txn identified with prep_seq. rollback_seq is the seq
// with which the additional data is written to cancel the txn effect. It can
// be used to idenitfy the snapshots that overlap with the rolled back txn.
void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq);
// Add the transaction with prepare sequence prepare_seq and commit sequence
// commit_seq to the commit map
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq);
struct CommitEntry {
uint64_t prep_seq;
uint64_t commit_seq;
CommitEntry() : prep_seq(0), commit_seq(0) {}
CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
bool operator==(const CommitEntry& rhs) const {
return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
}
};
struct CommitEntry64bFormat {
explicit CommitEntry64bFormat(size_t index_bits)
: INDEX_BITS(index_bits),
PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)) {}
// Number of higher bits of a sequence number that is not used. They are
// used to encode the value type, ...
const size_t PAD_BITS = static_cast<size_t>(8);
// Number of lower bits from prepare seq that can be skipped as they are
// implied by the index of the entry in the array
const size_t INDEX_BITS;
// Number of bits we use to encode the prepare seq
const size_t PREP_BITS;
// Number of bits we use to encode the commit seq.
const size_t COMMIT_BITS;
// Filter to encode/decode commit seq
const uint64_t COMMIT_FILTER;
};
// Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
// INDEX Detal Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ...
// DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA
// ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
// hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
// bits that do not have to be encoded (will be provided externally) DELTA:
// prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
// index bits + PADs
struct CommitEntry64b {
constexpr CommitEntry64b() noexcept : rep_(0) {}
CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
: CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
CommitEntry64b(const uint64_t ps, const uint64_t cs,
const CommitEntry64bFormat& format) {
assert(ps < static_cast<uint64_t>(
(1ull << (format.PREP_BITS + format.INDEX_BITS))));
assert(ps <= cs);
uint64_t delta = cs - ps + 1; // make initialized delta always >= 1
// zero is reserved for uninitialized entries
assert(0 < delta);
assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
rep_ = rep_ | delta;
}
// Return false if the entry is empty
bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
const CommitEntry64bFormat& format) {
uint64_t delta = rep_ & format.COMMIT_FILTER;
// zero is reserved for uninitialized entries
assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
if (delta == 0) {
return false; // initialized entry would have non-zero delta
}
assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)));
uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
prep_up >>= format.PAD_BITS;
const uint64_t& prep_low = indexed_seq;
entry->prep_seq = prep_up | prep_low;
entry->commit_seq = entry->prep_seq + delta - 1;
return true;
}
private:
uint64_t rep_;
};
// Struct to hold ownership of snapshot and read callback for cleanup.
struct IteratorState;
private:
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
friend class WritePreparedTransactionTest_CommitMapTest_Test;
friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test;
friend class WritePreparedTransactionTest;
friend class PreparedHeap_BasicsTest_Test;
friend class WritePreparedTxnDBMock;
friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
friend class WritePreparedTransactionTest_RollbackTest_Test;
void Init(const TransactionDBOptions& /* unused */);
// A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap.
class PreparedHeap {
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
heap_;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
erased_heap_;
public:
bool empty() { return heap_.empty(); }
uint64_t top() { return heap_.top(); }
void push(uint64_t v) { heap_.push(v); }
void pop() {
heap_.pop();
while (!heap_.empty() && !erased_heap_.empty() &&
heap_.top() == erased_heap_.top()) {
heap_.pop();
erased_heap_.pop();
}
while (heap_.empty() && !erased_heap_.empty()) {
erased_heap_.pop();
}
}
void erase(uint64_t seq) {
if (!heap_.empty()) {
if (seq < heap_.top()) {
// Already popped, ignore it.
} else if (heap_.top() == seq) {
pop();
} else { // (heap_.top() > seq)
// Down the heap, remember to pop it later
erased_heap_.push(seq);
}
}
}
};
// Get the commit entry with index indexed_seq from the commit table. It
// returns true if such entry exists.
bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
CommitEntry* entry) const;
// Rewrite the entry with the index indexed_seq in the commit table with the
// commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
// sets the evicted_entry and returns true.
bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
CommitEntry* evicted_entry);
// Rewrite the entry with the index indexed_seq in the commit table with the
// commit entry new_entry only if the existing entry matches the
// expected_entry. Returns false otherwise.
bool ExchangeCommitEntry(const uint64_t indexed_seq,
CommitEntry64b& expected_entry,
const CommitEntry& new_entry);
// Increase max_evicted_seq_ from the previous value prev_max to the new
// value. This also involves taking care of prepared txns that are not
// committed before new_max, as well as updating the list of live snapshots at
// the time of updating the max. Thread-safety: this function can be called
// concurrently. The concurrent invocations of this function is equivalent to
// a serial invocation in which the last invocation is the one with the
// largetst new_max value.
void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max);
virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
SequenceNumber max);
// Update the list of snapshots corresponding to the soon-to-be-updated
// max_eviceted_seq_. Thread-safety: this function can be called concurrently.
// The concurrent invocations of this function is equivalent to a serial
// invocation in which the last invocation is the one with the largetst
// version value.
void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
const SequenceNumber& version);
// Check an evicted entry against live snapshots to see if it should be kept
// around or it can be safely discarded (and hence assume committed for all
// snapshots). Thread-safety: this function can be called concurrently. If it
// is called concurrently with multiple UpdateSnapshots, the result is the
// same as checking the intersection of the snapshot list before updates with
// the snapshot list of all the concurrent updates.
void CheckAgainstSnapshots(const CommitEntry& evicted);
// Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
// commit_seq. Return false if checking the next snapshot(s) is not needed.
// This is the case if the entry already added to old_commit_map_ or none of
// the next snapshots could satisfy the condition. next_is_larger: the next
// snapshot will be a larger value
bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
const uint64_t& commit_seq,
const uint64_t& snapshot_seq,
const bool next_is_larger);
// The list of live snapshots at the last time that max_evicted_seq_ advanced.
// The list stored into two data structures: in snapshot_cache_ that is
// efficient for concurrent reads, and in snapshots_ if the data does not fit
// into snapshot_cache_. The total number of snapshots in the two lists
std::atomic<size_t> snapshots_total_ = {};
// The list sorted in ascending order. Thread-safety for writes is provided
// with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
// each entry. In x86_64 architecture such reads are compiled to simple read
// instructions. 128 entries
static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast<size_t>(7);
const size_t SNAPSHOT_CACHE_BITS;
const size_t SNAPSHOT_CACHE_SIZE;
unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
// 2nd list for storing snapshots. The list sorted in ascending order.
// Thread-safety is provided with snapshots_mutex_.
std::vector<SequenceNumber> snapshots_;
// The version of the latest list of snapshots. This can be used to avoid
// rewrittiing a list that is concurrently updated with a more recent version.
SequenceNumber snapshots_version_ = 0;
// A heap of prepared transactions. Thread-safety is provided with
// prepared_mutex_.
PreparedHeap prepared_txns_;
// 10m entry, 80MB size
static const size_t DEF_COMMIT_CACHE_BITS = static_cast<size_t>(21);
const size_t COMMIT_CACHE_BITS;
const size_t COMMIT_CACHE_SIZE;
const CommitEntry64bFormat FORMAT;
// commit_cache_ must be initialized to zero to tell apart an empty index from
// a filled one. Thread-safety is provided with commit_cache_mutex_.
unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
// The largest evicted *commit* sequence number from the commit_cache_
std::atomic<uint64_t> max_evicted_seq_ = {};
// Advance max_evicted_seq_ by this value each time it needs an update. The
// larger the value, the less frequent advances we would have. We do not want
// it to be too large either as it would cause stalls by doing too much
// maintenance work under the lock.
size_t INC_STEP_FOR_MAX_EVICTED = 1;
// A map of the evicted entries from commit_cache_ that has to be kept around
// to service the old snapshots. This is expected to be empty normally.
// Thread-safety is provided with old_commit_map_mutex_.
std::map<uint64_t, uint64_t> old_commit_map_;
// A set of long-running prepared transactions that are not finished by the
// time max_evicted_seq_ advances their sequence number. This is expected to
// be empty normally. Thread-safety is provided with prepared_mutex_.
std::set<uint64_t> delayed_prepared_;
// Update when delayed_prepared_.empty() changes. Expected to be true
// normally.
std::atomic<bool> delayed_prepared_empty_ = {true};
// Update when old_commit_map_.empty() changes. Expected to be true normally.
std::atomic<bool> old_commit_map_empty_ = {true};
mutable port::RWMutex prepared_mutex_;
mutable port::RWMutex old_commit_map_mutex_;
mutable port::RWMutex commit_cache_mutex_;
mutable port::RWMutex snapshots_mutex_;
};
class WritePreparedTxnReadCallback : public ReadCallback {
public:
WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
: db_(db), snapshot_(snapshot) {}
// Will be called to see if the seq number accepted; if not it moves on to the
// next seq number.
virtual bool IsCommitted(SequenceNumber seq) override {
return db_->IsInSnapshot(seq, snapshot_);
}
private:
WritePreparedTxnDB* db_;
SequenceNumber snapshot_;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE