rocksdb/utilities/transactions/write_unprepared_txn_db.cc

468 lines
17 KiB
C++
Raw Normal View History

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/transactions/write_unprepared_txn_db.h"
#include "db/arena_wrapped_db_iter.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
namespace ROCKSDB_NAMESPACE {
// Instead of reconstructing a Transaction object, and calling rollback on it,
// we can be more efficient with RollbackRecoveredTransaction by skipping
// unnecessary steps (eg. updating CommitMap, reconstructing keyset)
Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
const DBImpl::RecoveredTransaction* rtxn) {
// TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
assert(rtxn->unprepared_);
auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
// In theory we could write with disableWAL = true during recovery, and
// assume that if we crash again during recovery, we can just replay from
// the very beginning. Unfortunately, the XIDs from the application may not
// necessarily be unique across restarts, potentially leading to situations
// like this:
//
// BEGIN_PREPARE(unprepared) Put(a) END_PREPARE(xid = 1)
// -- crash and recover with Put(a) rolled back as it was not prepared
// BEGIN_PREPARE(prepared) Put(b) END_PREPARE(xid = 1)
// COMMIT(xid = 1)
// -- crash and recover with both a, b
//
// We could just write the rollback marker, but then we would have to extend
// MemTableInserter during recovery to actually do writes into the DB
// instead of just dropping the in-memory write batch.
//
WriteOptions w_options;
WritePrepared: Report released snapshots in IsInSnapshot (#4856) Summary: Previously IsInSnapshot assumed that the snapshot is valid at the time that the function is called. However there are cases where that might not be valid. Example is background compactions where the compaction algorithm operates with a list of snapshots some of which might be released by the time they are being passed to IsInSnapshot. The patch make two changes to enable the caller to tell difference: i) any live snapshot below max is added to max_committed_seq_, which allows IsInSnapshot to confidently tell whether the passed snapshot is invalid if it below max, ii) extends IsInSnapshot API with a "released" variable that is set true when IsInSnapshot find no such snapshot below max and also find no other way to give a certain return value. In such cases the return value is true but the caller should also check the "released" boolean after the call. In short here is the changes in the API: i) If the snapshot is valid, no change is required. ii) If the snapshot might be invalid, a reference to "released" boolean must be passed to IsInSnapshot. ii-a) If snapshot is above max, IsInSnapshot can figure the return valid using the commit cache. ii-b) otherwise if snapshot is in old_commit_map_, IsInSnapshot can use that to tell if value was visible to the snapshot. ii-c) otherwise it sets "released" to true and returns true as well. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4856 Differential Revision: D13599847 Pulled By: maysamyabandeh fbshipit-source-id: 1752be28667f886a1efec8cae5714b9b7a8f1e0f
2019-01-08 14:43:24 -08:00
class InvalidSnapshotReadCallback : public ReadCallback {
public:
InvalidSnapshotReadCallback(SequenceNumber snapshot)
: ReadCallback(snapshot) {}
inline bool IsVisibleFullCheck(SequenceNumber) override {
// The seq provided as snapshot is the seq right before we have locked and
// wrote to it, so whatever is there, it is committed.
return true;
WritePrepared: Report released snapshots in IsInSnapshot (#4856) Summary: Previously IsInSnapshot assumed that the snapshot is valid at the time that the function is called. However there are cases where that might not be valid. Example is background compactions where the compaction algorithm operates with a list of snapshots some of which might be released by the time they are being passed to IsInSnapshot. The patch make two changes to enable the caller to tell difference: i) any live snapshot below max is added to max_committed_seq_, which allows IsInSnapshot to confidently tell whether the passed snapshot is invalid if it below max, ii) extends IsInSnapshot API with a "released" variable that is set true when IsInSnapshot find no such snapshot below max and also find no other way to give a certain return value. In such cases the return value is true but the caller should also check the "released" boolean after the call. In short here is the changes in the API: i) If the snapshot is valid, no change is required. ii) If the snapshot might be invalid, a reference to "released" boolean must be passed to IsInSnapshot. ii-a) If snapshot is above max, IsInSnapshot can figure the return valid using the commit cache. ii-b) otherwise if snapshot is in old_commit_map_, IsInSnapshot can use that to tell if value was visible to the snapshot. ii-c) otherwise it sets "released" to true and returns true as well. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4856 Differential Revision: D13599847 Pulled By: maysamyabandeh fbshipit-source-id: 1752be28667f886a1efec8cae5714b9b7a8f1e0f
2019-01-08 14:43:24 -08:00
}
// Ignore the refresh request since we are confident that our snapshot seq
// is not going to be affected by concurrent compactions (not enabled yet.)
void Refresh(SequenceNumber) override {}
WritePrepared: Report released snapshots in IsInSnapshot (#4856) Summary: Previously IsInSnapshot assumed that the snapshot is valid at the time that the function is called. However there are cases where that might not be valid. Example is background compactions where the compaction algorithm operates with a list of snapshots some of which might be released by the time they are being passed to IsInSnapshot. The patch make two changes to enable the caller to tell difference: i) any live snapshot below max is added to max_committed_seq_, which allows IsInSnapshot to confidently tell whether the passed snapshot is invalid if it below max, ii) extends IsInSnapshot API with a "released" variable that is set true when IsInSnapshot find no such snapshot below max and also find no other way to give a certain return value. In such cases the return value is true but the caller should also check the "released" boolean after the call. In short here is the changes in the API: i) If the snapshot is valid, no change is required. ii) If the snapshot might be invalid, a reference to "released" boolean must be passed to IsInSnapshot. ii-a) If snapshot is above max, IsInSnapshot can figure the return valid using the commit cache. ii-b) otherwise if snapshot is in old_commit_map_, IsInSnapshot can use that to tell if value was visible to the snapshot. ii-c) otherwise it sets "released" to true and returns true as well. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4856 Differential Revision: D13599847 Pulled By: maysamyabandeh fbshipit-source-id: 1752be28667f886a1efec8cae5714b9b7a8f1e0f
2019-01-08 14:43:24 -08:00
};
// Iterate starting with largest sequence number.
for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) {
auto last_visible_txn = it->first - 1;
const auto& batch = it->second.batch_;
WriteBatch rollback_batch;
struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
DBImpl* db_;
ReadOptions roptions;
WritePrepared: Report released snapshots in IsInSnapshot (#4856) Summary: Previously IsInSnapshot assumed that the snapshot is valid at the time that the function is called. However there are cases where that might not be valid. Example is background compactions where the compaction algorithm operates with a list of snapshots some of which might be released by the time they are being passed to IsInSnapshot. The patch make two changes to enable the caller to tell difference: i) any live snapshot below max is added to max_committed_seq_, which allows IsInSnapshot to confidently tell whether the passed snapshot is invalid if it below max, ii) extends IsInSnapshot API with a "released" variable that is set true when IsInSnapshot find no such snapshot below max and also find no other way to give a certain return value. In such cases the return value is true but the caller should also check the "released" boolean after the call. In short here is the changes in the API: i) If the snapshot is valid, no change is required. ii) If the snapshot might be invalid, a reference to "released" boolean must be passed to IsInSnapshot. ii-a) If snapshot is above max, IsInSnapshot can figure the return valid using the commit cache. ii-b) otherwise if snapshot is in old_commit_map_, IsInSnapshot can use that to tell if value was visible to the snapshot. ii-c) otherwise it sets "released" to true and returns true as well. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4856 Differential Revision: D13599847 Pulled By: maysamyabandeh fbshipit-source-id: 1752be28667f886a1efec8cae5714b9b7a8f1e0f
2019-01-08 14:43:24 -08:00
InvalidSnapshotReadCallback callback;
WriteBatch* rollback_batch_;
std::map<uint32_t, const Comparator*>& comparators_;
std::map<uint32_t, ColumnFamilyHandle*>& handles_;
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
bool rollback_merge_operands_;
RollbackWriteBatchBuilder(
DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators,
std::map<uint32_t, ColumnFamilyHandle*>& handles,
bool rollback_merge_operands)
: db_(db),
callback(snap_seq),
WriteUnPrepared: less virtual in iterator callback (#5049) Summary: WriteUnPrepared adds a virtual function, MaxUnpreparedSequenceNumber, to ReadCallback, which returns 0 unless WriteUnPrepared is enabled and the transaction has uncommitted data written to the DB. Together with snapshot sequence number, this determines the last sequence that is visible to reads. The patch clarifies the guarantees of the GetIterator API in WriteUnPrepared transactions and make use of that to statically initialize the read callback and thus avoid the virtual call. Furthermore it increases the minimum value for min_uncommitted from 0 to 1 as seq 0 is used only for last level keys that are committed in all snapshots. The following benchmark shows +0.26% higher throughput in seekrandom benchmark. Benchmark: ./db_bench --benchmarks=fillrandom --use_existing_db=0 --num=1000000 --db=/dev/shm/dbbench ./db_bench --benchmarks=seekrandom[X10] --use_existing_db=1 --db=/dev/shm/dbbench --num=1000000 --duration=60 --seek_nexts=100 seekrandom [AVG 10 runs] : 20355 ops/sec; 225.2 MB/sec seekrandom [MEDIAN 10 runs] : 20425 ops/sec; 225.9 MB/sec ./db_bench_lessvirtual3 --benchmarks=seekrandom[X10] --use_existing_db=1 --db=/dev/shm/dbbench --num=1000000 --duration=60 --seek_nexts=100 seekrandom [AVG 10 runs] : 20409 ops/sec; 225.8 MB/sec seekrandom [MEDIAN 10 runs] : 20487 ops/sec; 226.6 MB/sec Pull Request resolved: https://github.com/facebook/rocksdb/pull/5049 Differential Revision: D14366459 Pulled By: maysamyabandeh fbshipit-source-id: ebaff8908332a5ae9af7defeadabcb624be660ef
2019-04-02 14:43:03 -07:00
// disable min_uncommitted optimization
rollback_batch_(dst_batch),
comparators_(comparators),
handles_(handles),
rollback_merge_operands_(rollback_merge_operands) {}
Status Rollback(uint32_t cf, const Slice& key) {
Status s;
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
auto cmp = comparators_[cf];
keys_[cf] = CFKeys(SetComparator(cmp));
}
auto res = cf_keys.insert(key);
if (res.second ==
false) { // second is false if a element already existed.
return s;
}
PinnableSlice pinnable_val;
bool not_used;
auto cf_handle = handles_[cf];
New API to get all merge operands for a Key (#5604) Summary: This is a new API added to db.h to allow for fetching all merge operands associated with a Key. The main motivation for this API is to support use cases where doing a full online merge is not necessary as it is performance sensitive. Example use-cases: 1. Update subset of columns and read subset of columns - Imagine a SQL Table, a row is encoded as a K/V pair (as it is done in MyRocks). If there are many columns and users only updated one of them, we can use merge operator to reduce write amplification. While users only read one or two columns in the read query, this feature can avoid a full merging of the whole row, and save some CPU. 2. Updating very few attributes in a value which is a JSON-like document - Updating one attribute can be done efficiently using merge operator, while reading back one attribute can be done more efficiently if we don't need to do a full merge. ---------------------------------------------------------------------------------------------------- API : Status GetMergeOperands( const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* merge_operands, GetMergeOperandsOptions* get_merge_operands_options, int* number_of_operands) Example usage : int size = 100; int number_of_operands = 0; std::vector<PinnableSlice> values(size); GetMergeOperandsOptions merge_operands_info; db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), merge_operands_info, &number_of_operands); Description : Returns all the merge operands corresponding to the key. If the number of merge operands in DB is greater than merge_operands_options.expected_max_number_of_operands no merge operands are returned and status is Incomplete. Merge operands returned are in the order of insertion. merge_operands-> Points to an array of at-least merge_operands_options.expected_max_number_of_operands and the caller is responsible for allocating it. If the status returned is Incomplete then number_of_operands will contain the total number of merge operands found in DB for key. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5604 Test Plan: Added unit test and perf test in db_bench that can be run using the command: ./db_bench -benchmarks=getmergeoperands --merge_operator=sortlist Differential Revision: D16657366 Pulled By: vjnadimpalli fbshipit-source-id: 0faadd752351745224ee12d4ae9ef3cb529951bf
2019-08-06 14:22:34 -07:00
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cf_handle;
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = &not_used;
get_impl_options.callback = &callback;
s = db_->GetImpl(roptions, key, get_impl_options);
assert(s.ok() || s.IsNotFound());
if (s.ok()) {
s = rollback_batch_->Put(cf_handle, key, pinnable_val);
assert(s.ok());
} else if (s.IsNotFound()) {
// There has been no readable value before txn. By adding a delete we
// make sure that there will be none afterwards either.
s = rollback_batch_->Delete(cf_handle, key);
assert(s.ok());
} else {
// Unexpected status. Return it to the user.
}
return s;
}
Status PutCF(uint32_t cf, const Slice& key,
const Slice& /*val*/) override {
return Rollback(cf, key);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return Rollback(cf, key);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return Rollback(cf, key);
}
Status MergeCF(uint32_t cf, const Slice& key,
const Slice& /*val*/) override {
if (rollback_merge_operands_) {
return Rollback(cf, key);
} else {
return Status::OK();
}
}
// Recovered batches do not contain 2PC markers.
Status MarkNoop(bool) override { return Status::InvalidArgument(); }
Status MarkBeginPrepare(bool) override {
return Status::InvalidArgument();
}
Status MarkEndPrepare(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkCommit(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
} rollback_handler(db_impl_, last_visible_txn, &rollback_batch,
*cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
txn_db_options_.rollback_merge_operands);
auto s = batch->Iterate(&rollback_handler);
if (!s.ok()) {
return s;
}
// The Rollback marker will be used as a batch separator
WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
const uint64_t kNoLogRef = 0;
const bool kDisableMemtable = true;
const size_t kOneBatch = 1;
uint64_t seq_used = kMaxSequenceNumber;
s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch);
if (!s.ok()) {
return s;
}
// If two_write_queues, we must manually release the sequence number to
// readers.
if (db_impl_->immutable_db_options().two_write_queues) {
db_impl_->SetLastPublishedSequence(seq_used);
}
}
return Status::OK();
}
Status WriteUnpreparedTxnDB::Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) {
// TODO(lth): Reduce code duplication in this function.
auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
assert(dbimpl != nullptr);
db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
// A callback to commit a single sub-batch
class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
public:
explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
: db_(db) {}
Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)), uint64_t,
size_t /*index*/, size_t /*total*/) override {
assert(!is_mem_disabled);
db_->AddCommitted(commit_seq, commit_seq);
return Status::OK();
}
private:
WritePreparedTxnDB* db_;
};
db_impl_->SetRecoverableStatePreReleaseCallback(
new CommitSubBatchPreReleaseCallback(this));
// PessimisticTransactionDB::Initialize
for (auto cf_ptr : handles) {
AddColumnFamily(cf_ptr);
}
// Verify cf options
for (auto handle : handles) {
ColumnFamilyDescriptor cfd;
Status s = handle->GetDescriptor(&cfd);
if (!s.ok()) {
return s;
}
s = VerifyCFOptions(cfd.options);
if (!s.ok()) {
return s;
}
}
// Re-enable compaction for the column families that initially had
// compaction enabled.
std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
for (auto index : compaction_enabled_cf_indices) {
compaction_enabled_cf_handles.push_back(handles[index]);
}
// create 'real' transactions from recovered shell transactions
auto rtxns = dbimpl->recovered_transactions();
std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
for (auto rtxn : rtxns) {
auto recovered_trx = rtxn.second;
assert(recovered_trx);
assert(recovered_trx->batches_.size() >= 1);
assert(recovered_trx->name_.length());
// We can only rollback transactions after AdvanceMaxEvictedSeq is called,
// but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
// two iterations is required.
if (recovered_trx->unprepared_) {
continue;
}
WriteOptions w_options;
w_options.sync = true;
TransactionOptions t_options;
auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
auto first_seq = recovered_trx->batches_.begin()->first;
auto last_prepare_batch_cnt =
recovered_trx->batches_.begin()->second.batch_cnt_;
Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
assert(real_trx);
auto wupt = static_cast_with_check<WriteUnpreparedTxn>(real_trx);
wupt->recovered_txn_ = true;
real_trx->SetLogNumber(first_log_number);
real_trx->SetId(first_seq);
Status s = real_trx->SetName(recovered_trx->name_);
if (!s.ok()) {
return s;
}
wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
for (auto batch : recovered_trx->batches_) {
const auto& seq = batch.first;
const auto& batch_info = batch.second;
auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
assert(batch_info.log_number_);
ordered_seq_cnt[seq] = cnt;
assert(wupt->unprep_seqs_.count(seq) == 0);
wupt->unprep_seqs_[seq] = cnt;
s = wupt->RebuildFromWriteBatch(batch_info.batch_);
assert(s.ok());
if (!s.ok()) {
return s;
}
}
const bool kClear = true;
wupt->InitWriteBatch(kClear);
real_trx->SetState(Transaction::PREPARED);
if (!s.ok()) {
return s;
}
}
// AddPrepared must be called in order
for (auto seq_cnt : ordered_seq_cnt) {
auto seq = seq_cnt.first;
auto cnt = seq_cnt.second;
for (size_t i = 0; i < cnt; i++) {
AddPrepared(seq + i);
}
}
SequenceNumber prev_max = max_evicted_seq_;
SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
AdvanceMaxEvictedSeq(prev_max, last_seq);
// Create a gap between max and the next snapshot. This simplifies the logic
// in IsInSnapshot by not having to consider the special case of max ==
// snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
if (last_seq) {
db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
db_impl_->versions_->SetLastSequence(last_seq + 1);
db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
}
Status s;
// Rollback unprepared transactions.
for (auto rtxn : rtxns) {
auto recovered_trx = rtxn.second;
if (recovered_trx->unprepared_) {
s = RollbackRecoveredTransaction(recovered_trx);
if (!s.ok()) {
return s;
}
continue;
}
}
if (s.ok()) {
dbimpl->DeleteAllRecoveredTransactions();
// Compaction should start only after max_evicted_seq_ is set AND recovered
// transactions are either added to PrepareHeap or rolled back.
s = EnableAutoCompaction(compaction_enabled_cf_handles);
}
return s;
}
Transaction* WriteUnpreparedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn;
} else {
return new WriteUnpreparedTxn(this, write_options, txn_options);
}
}
// Struct to hold ownership of snapshot and read callback for iterator cleanup.
struct WriteUnpreparedTxnDB::IteratorState {
IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
std::shared_ptr<ManagedSnapshot> s,
SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
: callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_,
kBackedByDBSnapshot),
snapshot(s) {}
WriteUnPrepared: less virtual in iterator callback (#5049) Summary: WriteUnPrepared adds a virtual function, MaxUnpreparedSequenceNumber, to ReadCallback, which returns 0 unless WriteUnPrepared is enabled and the transaction has uncommitted data written to the DB. Together with snapshot sequence number, this determines the last sequence that is visible to reads. The patch clarifies the guarantees of the GetIterator API in WriteUnPrepared transactions and make use of that to statically initialize the read callback and thus avoid the virtual call. Furthermore it increases the minimum value for min_uncommitted from 0 to 1 as seq 0 is used only for last level keys that are committed in all snapshots. The following benchmark shows +0.26% higher throughput in seekrandom benchmark. Benchmark: ./db_bench --benchmarks=fillrandom --use_existing_db=0 --num=1000000 --db=/dev/shm/dbbench ./db_bench --benchmarks=seekrandom[X10] --use_existing_db=1 --db=/dev/shm/dbbench --num=1000000 --duration=60 --seek_nexts=100 seekrandom [AVG 10 runs] : 20355 ops/sec; 225.2 MB/sec seekrandom [MEDIAN 10 runs] : 20425 ops/sec; 225.9 MB/sec ./db_bench_lessvirtual3 --benchmarks=seekrandom[X10] --use_existing_db=1 --db=/dev/shm/dbbench --num=1000000 --duration=60 --seek_nexts=100 seekrandom [AVG 10 runs] : 20409 ops/sec; 225.8 MB/sec seekrandom [MEDIAN 10 runs] : 20487 ops/sec; 226.6 MB/sec Pull Request resolved: https://github.com/facebook/rocksdb/pull/5049 Differential Revision: D14366459 Pulled By: maysamyabandeh fbshipit-source-id: ebaff8908332a5ae9af7defeadabcb624be660ef
2019-04-02 14:43:03 -07:00
SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }
WriteUnpreparedTxnReadCallback callback;
std::shared_ptr<ManagedSnapshot> snapshot;
};
namespace {
static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
}
} // anonymous namespace
Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family,
WriteUnpreparedTxn* txn) {
// TODO(lth): Refactor so that this logic is shared with WritePrepared.
constexpr bool expose_blob_index = false;
constexpr bool allow_refresh = false;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
// Currently, the Prev() iterator logic does not work well without snapshot
// validation. The logic simply iterates through values of a key in
// ascending seqno order, stopping at the first non-visible value and
// returning the last visible value.
//
// For example, if snapshot sequence is 3, and we have the following keys:
// foo: v1 1
// foo: v2 2
// foo: v3 3
// foo: v4 4
// foo: v5 5
//
// Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3,
// which is the last visible value.
//
// For unprepared transactions, if we have snap_seq = 3, but the current
// transaction has unprep_seq 5, then returning the first non-visible value
// would be incorrect, as we should return v5, and not v3. The problem is that
// there are committed values at snapshot_seq < commit_seq < unprep_seq.
//
// Snapshot validation can prevent this problem by ensuring that no committed
// values exist at snapshot_seq < commit_seq, and thus any value with a
// sequence number greater than snapshot_seq must be unprepared values. For
// example, if the transaction had a snapshot at 3, then snapshot validation
// would be performed during the Put(v5) call. It would find v4, and the Put
// would fail with snapshot validation failure.
//
// TODO(lth): Improve Prev() logic to continue iterating until
// max_visible_seq, and then return the last visible value, so that this
// restriction can be lifted.
const Snapshot* snapshot = nullptr;
if (options.snapshot == nullptr) {
snapshot = GetSnapshot();
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
} else {
snapshot = options.snapshot;
}
snapshot_seq = snapshot->GetSequenceNumber();
assert(snapshot_seq != kMaxSequenceNumber);
// Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are
// guaranteed that for keys that were modified by this transaction (and thus
// might have unprepared values), no committed values exist at
// largest_validated_seq < commit_seq (or the contrapositive: any committed
// value must exist at commit_seq <= largest_validated_seq). This implies
// that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <=
// snapshot_seq. As explained above, the problem with Prev() only happens when
// snapshot_seq < commit_seq.
//
// For keys that were not modified by this transaction, largest_validated_seq_
// is meaningless, and Prev() should just work with the existing visibility
// logic.
if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() &&
!txn->unprep_seqs_.empty()) {
ROCKS_LOG_ERROR(info_log_,
"WriteUnprepared iterator creation failed since the "
"transaction has performed unvalidated writes");
return nullptr;
}
min_uncommitted =
static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
auto* db_iter = db_impl_->NewIteratorImpl(
options, cfd, state->MaxVisibleSeq(), &state->callback, expose_blob_index,
allow_refresh);
db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
return db_iter;
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE