Add SnapshotCreationCallback

This commit is contained in:
Yanqin Jin 2022-04-15 21:06:01 -07:00
parent 41b0c6cb89
commit 6712cd8270
4 changed files with 76 additions and 5 deletions

View File

@ -689,10 +689,17 @@ Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
}
uint64_t seq_used = kMaxSequenceNumber;
auto s =
db_impl_->WriteImpl(write_options_, wb,
/*callback*/ nullptr, /*log_used*/ nullptr,
/*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);
SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_,
snapshot_notifier_, snapshot_);
PostMemTableCallback* post_mem_cb = nullptr;
if (snapshot_needed_ && commit_timestamp_ != kMaxTxnTimestamp) {
post_mem_cb = &snapshot_creation_cb;
}
auto s = db_impl_->WriteImpl(write_options_, wb,
/*callback*/ nullptr, /*log_used*/ nullptr,
/*log_ref*/ 0, /*disable_memtable*/ false,
&seq_used, /*batch_cnt=*/0,
/*pre_release_callback=*/nullptr, post_mem_cb);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) {
SetId(seq_used);
@ -764,9 +771,17 @@ Status WriteCommittedTxn::CommitInternal() {
assert(s.ok());
uint64_t seq_used = kMaxSequenceNumber;
SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_,
snapshot_notifier_, snapshot_);
PostMemTableCallback* post_mem_cb = nullptr;
if (snapshot_needed_ && commit_timestamp_ != kMaxTxnTimestamp) {
post_mem_cb = &snapshot_creation_cb;
}
s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
/*log_used*/ nullptr, /*log_ref*/ log_number_,
/*disable_memtable*/ false, &seq_used);
/*disable_memtable*/ false, &seq_used,
/*batch_cnt=*/0, /*pre_release_callback=*/nullptr,
post_mem_cb);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) {
SetId(seq_used);

View File

@ -680,5 +680,35 @@ Status PessimisticTransactionDB::GetSharedSnapshots(
assert(db_impl_);
return db_impl_->GetSharedSnapshots(ts_lb, ts_ub, shared_snapshots);
}
Status SnapshotCreationCallback::operator()(SequenceNumber seq,
bool disable_memtable) {
assert(db_impl_);
assert(commit_ts_ != kMaxTxnTimestamp);
const bool two_write_queues =
db_impl_->immutable_db_options().two_write_queues;
assert(!two_write_queues || !disable_memtable);
#ifdef NDEBUG
(void)two_write_queues;
(void)disable_memtable;
#endif
const bool seq_per_batch = db_impl_->seq_per_batch();
if (!seq_per_batch) {
assert(db_impl_->GetLastPublishedSequence() <= seq);
} else {
assert(db_impl_->GetLastPublishedSequence() < seq);
}
// Create a snapshot which can also be used for write conflict checking.
snapshot_ = db_impl_->CreateSharedSnapshot(seq, commit_ts_);
assert(snapshot_);
if (snapshot_notifier_) {
snapshot_notifier_->SnapshotCreated(snapshot_.get());
}
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

View File

@ -267,5 +267,29 @@ inline Status PessimisticTransactionDB::FailIfCfEnablesTs(
return Status::OK();
}
class SnapshotCreationCallback : public PostMemTableCallback {
public:
explicit SnapshotCreationCallback(
DBImpl* dbi, TxnTimestamp commit_ts,
const std::shared_ptr<TransactionNotifier>& notifier,
std::shared_ptr<const Snapshot>& snapshot)
: db_impl_(dbi),
commit_ts_(commit_ts),
snapshot_notifier_(notifier),
snapshot_(snapshot) {
assert(db_impl_);
}
~SnapshotCreationCallback() override {}
Status operator()(SequenceNumber seq, bool disable_memtable) override;
private:
DBImpl* const db_impl_;
const TxnTimestamp commit_ts_;
std::shared_ptr<TransactionNotifier> snapshot_notifier_;
std::shared_ptr<const Snapshot>& snapshot_;
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

View File

@ -350,7 +350,9 @@ class TransactionBaseImpl : public Transaction {
save_points_;
private:
friend class WriteCommittedTxn;
friend class WritePreparedTxn;
// Extra data to be persisted with the commit. Note this is only used when
// prepare phase is not skipped.
WriteBatch commit_time_batch_;