rocksdb/utilities/transactions/write_unprepared_txn.h
Cheng Chang 71c7e4935e Replace tracked_keys with a new LockTracker interface in TransactionDB (#7013)
Summary:
We're going to support more locking protocols such as range lock in transaction.

However, in current design, `TransactionBase` has a member `tracked_keys` which assumes that point lock (lock a single key) is used, and is used in snapshot checking (isolation protocol). When using range lock, we may use read committed instead of snapshot checking as the isolation protocol.

The most significant usage scenarios of `tracked_keys` are:
1. pessimistic transaction uses it to track the locked keys, and unlock these keys when commit or rollback.
2. optimistic transaction does not lock keys upfront, it only tracks the lock intentions in tracked_keys, and do write conflict checking when commit.
3. each `SavePoint` tracks the keys that are locked since the `SavePoint`, `RollbackToSavePoint` or `PopSavePoint` relies on both the tracked keys in `SavePoint`s and `tracked_keys`.

Based on these scenarios, if we can abstract out a `LockTracker` interface to hold a set of tracked locks (can be keys or key ranges), and have methods that can be composed together to implement the scenarios, then `tracked_keys` can be an internal data structure of one implementation of `LockTracker`. See `utilities/transactions/lock/lock_tracker.h` for the detailed interface design, and `utilities/transactions/lock/point_lock_tracker.cc` for the implementation.

In the future, a `RangeLockTracker` can be implemented to track range locks without affecting other components.

After this PR, a clean interface for lock manager should be possible, and then ideally, we can have pluggable locking protocols.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7013

Test Plan: Run `transaction_test` and `optimistic_transaction_test`.

Reviewed By: ajkr

Differential Revision: D22163706

Pulled By: cheng-chang

fbshipit-source-id: f2860577b5334e31dd2994f5bc6d7c40d502b1b4
2020-08-06 12:38:00 -07:00

342 lines
14 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <set>
#include "utilities/transactions/write_prepared_txn.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
namespace ROCKSDB_NAMESPACE {
class WriteUnpreparedTxnDB;
class WriteUnpreparedTxn;
// WriteUnprepared transactions needs to be able to read their own uncommitted
// writes, and supporting this requires some careful consideration. Because
// writes in the current transaction may be flushed to DB already, we cannot
// rely on the contents of WriteBatchWithIndex to determine whether a key should
// be visible or not, so we have to remember to check the DB for any uncommitted
// keys that should be visible to us. First, we will need to change the seek to
// snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
// Any key greater than max_visible_seq should not be visible because they
// cannot be unprepared by the current transaction and they are not in its
// snapshot.
//
// When we seek to max_visible_seq, one of these cases will happen:
// 1. We hit a unprepared key from the current transaction.
// 2. We hit a unprepared key from the another transaction.
// 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
// 4. We hit a committed key with seq <= snap_seq.
//
// IsVisibleFullCheck handles all cases correctly.
//
// Other notes:
// Note that max_visible_seq is only calculated once at iterator construction
// time, meaning if the same transaction is adding more unprep seqs through
// writes during iteration, these newer writes may not be visible. This is not a
// problem for MySQL though because it avoids modifying the index as it is
// scanning through it to avoid the Halloween Problem. Instead, it scans the
// index once up front, and modifies based on a temporary copy.
//
// In DBIter, there is a "reseek" optimization if the iterator skips over too
// many keys. However, this assumes that the reseek seeks exactly to the
// required key. In write unprepared, even after seeking directly to
// max_visible_seq, some iteration may be required before hitting a visible key,
// and special precautions must be taken to avoid performing another reseek,
// leading to an infinite loop.
//
class WriteUnpreparedTxnReadCallback : public ReadCallback {
public:
WriteUnpreparedTxnReadCallback(
WritePreparedTxnDB* db, SequenceNumber snapshot,
SequenceNumber min_uncommitted,
const std::map<SequenceNumber, size_t>& unprep_seqs,
SnapshotBackup backed_by_snapshot)
// Pass our last uncommitted seq as the snapshot to the parent class to
// ensure that the parent will not prematurely filter out own writes. We
// will do the exact comparison against snapshots in IsVisibleFullCheck
// override.
: ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted),
db_(db),
unprep_seqs_(unprep_seqs),
wup_snapshot_(snapshot),
backed_by_snapshot_(backed_by_snapshot) {
(void)backed_by_snapshot_; // to silence unused private field warning
}
virtual ~WriteUnpreparedTxnReadCallback() {
// If it is not backed by snapshot, the caller must check validity
assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
}
virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
inline bool valid() {
valid_checked_ = true;
return snap_released_ == false;
}
void Refresh(SequenceNumber seq) override {
max_visible_seq_ = std::max(max_visible_seq_, seq);
wup_snapshot_ = seq;
}
static SequenceNumber CalcMaxVisibleSeq(
const std::map<SequenceNumber, size_t>& unprep_seqs,
SequenceNumber snapshot_seq) {
SequenceNumber max_unprepared = 0;
if (unprep_seqs.size()) {
max_unprepared =
unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
}
return std::max(max_unprepared, snapshot_seq);
}
private:
WritePreparedTxnDB* db_;
const std::map<SequenceNumber, size_t>& unprep_seqs_;
SequenceNumber wup_snapshot_;
// Whether max_visible_seq_ is backed by a snapshot
const SnapshotBackup backed_by_snapshot_;
bool snap_released_ = false;
// Safety check to ensure that the caller has checked invalid statuses
bool valid_checked_ = false;
};
class WriteUnpreparedTxn : public WritePreparedTxn {
public:
WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
const WriteOptions& write_options,
const TransactionOptions& txn_options);
virtual ~WriteUnpreparedTxn();
using TransactionBaseImpl::Put;
virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value,
const bool assume_tracked = false) override;
virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value,
const bool assume_tracked = false) override;
using TransactionBaseImpl::Merge;
virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value,
const bool assume_tracked = false) override;
using TransactionBaseImpl::Delete;
virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
const bool assume_tracked = false) override;
virtual Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key,
const bool assume_tracked = false) override;
using TransactionBaseImpl::SingleDelete;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key,
const bool assume_tracked = false) override;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key,
const bool assume_tracked = false) override;
// In WriteUnprepared, untracked writes will break snapshot validation logic.
// Snapshot validation will only check the largest sequence number of a key to
// see if it was committed or not. However, an untracked unprepared write will
// hide smaller committed sequence numbers.
//
// TODO(lth): Investigate whether it is worth having snapshot validation
// validate all values larger than snap_seq. Otherwise, we should return
// Status::NotSupported for untracked writes.
virtual Status RebuildFromWriteBatch(WriteBatch*) override;
virtual uint64_t GetLastLogNumber() const override {
return last_log_number_;
}
void RemoveActiveIterator(Iterator* iter) {
active_iterators_.erase(
std::remove(active_iterators_.begin(), active_iterators_.end(), iter),
active_iterators_.end());
}
protected:
void Initialize(const TransactionOptions& txn_options) override;
Status PrepareInternal() override;
Status CommitWithoutPrepareInternal() override;
Status CommitInternal() override;
Status RollbackInternal() override;
void Clear() override;
void SetSavePoint() override;
Status RollbackToSavePoint() override;
Status PopSavePoint() override;
// Get and GetIterator needs to be overridden so that a ReadCallback to
// handle read-your-own-write is used.
using Transaction::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using Transaction::MultiGet;
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;
using Transaction::GetIterator;
virtual Iterator* GetIterator(const ReadOptions& options) override;
virtual Iterator* GetIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override;
virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key,
SequenceNumber* tracked_at_seq) override;
private:
friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
friend class WriteUnpreparedTxnDB;
const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
Status WriteRollbackKeys(const LockTracker& tracked_keys,
WriteBatchWithIndex* rollback_batch,
ReadCallback* callback, const ReadOptions& roptions);
Status MaybeFlushWriteBatchToDB();
Status FlushWriteBatchToDB(bool prepared);
Status FlushWriteBatchToDBInternal(bool prepared);
Status FlushWriteBatchWithSavePointToDB();
Status RollbackToSavePointInternal();
Status HandleWrite(std::function<Status()> do_write);
// For write unprepared, we check on every writebatch append to see if
// write_batch_flush_threshold_ has been exceeded, and then call
// FlushWriteBatchToDB if so. This logic is encapsulated in
// MaybeFlushWriteBatchToDB.
int64_t write_batch_flush_threshold_;
WriteUnpreparedTxnDB* wupt_db_;
// Ordered list of unprep_seq sequence numbers that we have already written
// to DB.
//
// This maps unprep_seq => prepare_batch_cnt for each unprepared batch
// written by this transaction.
//
// Note that this contains both prepared and unprepared batches, since they
// are treated similarily in prepare heap/commit map, so it simplifies the
// commit callbacks.
std::map<SequenceNumber, size_t> unprep_seqs_;
uint64_t last_log_number_;
// Recovered transactions have tracked_keys_ populated, but are not actually
// locked for efficiency reasons. For recovered transactions, skip unlocking
// keys when transaction ends.
bool recovered_txn_;
// Track the largest sequence number at which we performed snapshot
// validation. If snapshot validation was skipped because no snapshot was set,
// then this is set to GetLastPublishedSequence. This value is useful because
// it means that for keys that have unprepared seqnos, we can guarantee that
// no committed keys by other transactions can exist between
// largest_validated_seq_ and max_unprep_seq. See
// WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
// necessary for iterator Prev().
//
// Currently this value only increases during the lifetime of a transaction,
// but in some cases, we should be able to restore the previously largest
// value when calling RollbackToSavepoint.
SequenceNumber largest_validated_seq_;
struct SavePoint {
// Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
// used during RollbackToSavepoint to determine visibility when restoring
// old values.
//
// TODO(lth): Since all unprep_seqs_ sets further down the stack must be
// subsets, this can potentially be deduplicated by just storing set
// difference. Investigate if this is worth it.
std::map<SequenceNumber, size_t> unprep_seqs_;
// This snapshot will be used to read keys at this savepoint if we call
// RollbackToSavePoint.
std::unique_ptr<ManagedSnapshot> snapshot_;
SavePoint(const std::map<SequenceNumber, size_t>& seqs,
ManagedSnapshot* snapshot)
: unprep_seqs_(seqs), snapshot_(snapshot){};
};
// We have 3 data structures holding savepoint information:
// 1. TransactionBaseImpl::save_points_
// 2. WriteUnpreparedTxn::flushed_save_points_
// 3. WriteUnpreparecTxn::unflushed_save_points_
//
// TransactionBaseImpl::save_points_ holds information about all write
// batches, including the current in-memory write_batch_, or unprepared
// batches that have been written out. Its responsibility is just to track
// which keys have been modified in every savepoint.
//
// WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints
// set on unprepared batches that have already flushed. It holds the snapshot
// and unprep_seqs at that savepoint, so that the rollback process can
// determine which keys were visible at that point in time.
//
// WriteUnpreparecTxn::unflushed_save_points_ holds information about
// savepoints on the current in-memory write_batch_. It simply records the
// size of the write batch at every savepoint.
//
// TODO(lth): Remove the redundancy between save_point_boundaries_ and
// write_batch_.save_points_.
//
// Based on this information, here are some invariants:
// size(unflushed_save_points_) = size(write_batch_.save_points_)
// size(flushed_save_points_) + size(unflushed_save_points_)
// = size(save_points_)
//
std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>>
flushed_save_points_;
std::unique_ptr<autovector<size_t>> unflushed_save_points_;
// It is currently unsafe to flush a write batch if there are active iterators
// created from this transaction. This is because we use WriteBatchWithIndex
// to do merging reads from the DB and the write batch. If we flush the write
// batch, it is possible that the delta iterator on the iterator will point to
// invalid memory.
std::vector<Iterator*> active_iterators_;
// Untracked keys that we have to rollback.
//
// TODO(lth): Currently we we do not record untracked keys per-savepoint.
// This means that when rolling back to savepoints, we have to check all
// keys in the current transaction for rollback. Note that this is only
// inefficient, but still correct because we take a snapshot at every
// savepoint, and we will use that snapshot to construct the rollback batch.
// The rollback batch will then contain a reissue of the same marker.
//
// A more optimal solution would be to only check keys changed since the
// last savepoint. Also, it may make sense to merge this into tracked_keys_
// and differentiate between tracked but not locked keys to avoid having two
// very similar data structures.
using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>;
KeySet untracked_keys_;
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE