WritePrepared Txn: CommitBatch
Summary: Implements CommitBatch and CommitWithoutPrepare for WritePreparedTxn Closes https://github.com/facebook/rocksdb/pull/2854 Differential Revision: D5793999 Pulled By: maysamyabandeh fbshipit-source-id: d8b9858221162c6ac7a1f6912cbd3481d0d8a503
This commit is contained in:
parent
fce6c892ab
commit
9a4df72994
@ -148,7 +148,7 @@ Status WriteCommittedTxn::CommitBatch(WriteBatch* batch) {
|
|||||||
|
|
||||||
if (can_commit) {
|
if (can_commit) {
|
||||||
txn_state_.store(AWAITING_COMMIT);
|
txn_state_.store(AWAITING_COMMIT);
|
||||||
s = db_->Write(write_options_, batch);
|
s = CommitBatchInternal(batch);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
txn_state_.store(COMMITED);
|
txn_state_.store(COMMITED);
|
||||||
}
|
}
|
||||||
@ -305,6 +305,11 @@ Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch) {
|
||||||
|
Status s = db_->Write(write_options_, batch);
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
Status WriteCommittedTxn::CommitInternal() {
|
Status WriteCommittedTxn::CommitInternal() {
|
||||||
// We take the commit-time batch and append the Commit marker.
|
// We take the commit-time batch and append the Commit marker.
|
||||||
// The Memtable will ignore the Commit marker in non-recovery mode
|
// The Memtable will ignore the Commit marker in non-recovery mode
|
||||||
|
@ -49,6 +49,9 @@ class PessimisticTransaction : public TransactionBaseImpl {
|
|||||||
|
|
||||||
Status Commit() override;
|
Status Commit() override;
|
||||||
|
|
||||||
|
// It is basically Commit without going through Prepare phase. The write batch
|
||||||
|
// is also directly provided instead of expecting txn to gradually batch the
|
||||||
|
// transactions writes to an internal write batch.
|
||||||
virtual Status CommitBatch(WriteBatch* batch) = 0;
|
virtual Status CommitBatch(WriteBatch* batch) = 0;
|
||||||
|
|
||||||
Status Rollback() override = 0;
|
Status Rollback() override = 0;
|
||||||
@ -114,6 +117,8 @@ class PessimisticTransaction : public TransactionBaseImpl {
|
|||||||
|
|
||||||
virtual Status CommitWithoutPrepareInternal() = 0;
|
virtual Status CommitWithoutPrepareInternal() = 0;
|
||||||
|
|
||||||
|
virtual Status CommitBatchInternal(WriteBatch* batch) = 0;
|
||||||
|
|
||||||
virtual Status CommitInternal() = 0;
|
virtual Status CommitInternal() = 0;
|
||||||
|
|
||||||
void Initialize(const TransactionOptions& txn_options);
|
void Initialize(const TransactionOptions& txn_options);
|
||||||
@ -195,6 +200,8 @@ class WriteCommittedTxn : public PessimisticTransaction {
|
|||||||
|
|
||||||
Status CommitWithoutPrepareInternal() override;
|
Status CommitWithoutPrepareInternal() override;
|
||||||
|
|
||||||
|
Status CommitBatchInternal(WriteBatch* batch) override;
|
||||||
|
|
||||||
Status CommitInternal() override;
|
Status CommitInternal() override;
|
||||||
|
|
||||||
Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key,
|
Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
|
@ -51,9 +51,21 @@ Status WritePreparedTxn::PrepareInternal() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Status WritePreparedTxn::CommitWithoutPrepareInternal() {
|
Status WritePreparedTxn::CommitWithoutPrepareInternal() {
|
||||||
// TODO(myabandeh) Implement this
|
return CommitBatchInternal(GetWriteBatch()->GetWriteBatch());
|
||||||
throw std::runtime_error("Commit not Implemented");
|
}
|
||||||
return Status::OK();
|
|
||||||
|
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
|
||||||
|
const bool disable_memtable = true;
|
||||||
|
const uint64_t no_log_ref = 0;
|
||||||
|
uint64_t seq_used;
|
||||||
|
auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr,
|
||||||
|
no_log_ref, !disable_memtable, &seq_used);
|
||||||
|
uint64_t& prepare_seq = seq_used;
|
||||||
|
uint64_t& commit_seq = seq_used;
|
||||||
|
// TODO(myabandeh): skip AddPrepared
|
||||||
|
wpt_db_->AddPrepared(prepare_seq);
|
||||||
|
wpt_db_->AddCommitted(prepare_seq, commit_seq);
|
||||||
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status WritePreparedTxn::CommitInternal() {
|
Status WritePreparedTxn::CommitInternal() {
|
||||||
|
@ -54,6 +54,8 @@ class WritePreparedTxn : public PessimisticTransaction {
|
|||||||
|
|
||||||
Status CommitWithoutPrepareInternal() override;
|
Status CommitWithoutPrepareInternal() override;
|
||||||
|
|
||||||
|
Status CommitBatchInternal(WriteBatch* batch) override;
|
||||||
|
|
||||||
Status CommitInternal() override;
|
Status CommitInternal() override;
|
||||||
|
|
||||||
// TODO(myabandeh): verify that the current impl work with values being
|
// TODO(myabandeh): verify that the current impl work with values being
|
||||||
|
Loading…
x
Reference in New Issue
Block a user