From 6e56a114beec8436cad125a5ac91027a6d0a3784 Mon Sep 17 00:00:00 2001 From: Reid Horuff Date: Thu, 7 Apr 2016 23:35:51 -0700 Subject: [PATCH] Modification of WriteBatch to support two phase commit Summary: Adds three new WriteBatch data types: Prepare(xid), Commit(xid), Rollback(xid). Prepare(xid) should precede the (single) operation to which is applies. There can obviously be multiple Prepare(xid) markers. There should only be one Rollback(xid) or Commit(xid) marker yet not both. None of this logic is currently enforced and will most likely be implemented further up such as in the memtableinserter. All three markers are similar to PutLogData in that they are writebatch meta-data, ie stored but not counted. All three markers differ from PutLogData in that they will actually be written to disk. As for WriteBatchWithIndex, Prepare, Commit, Rollback are all implemented just as PutLogData and none are tested just as PutLogData. Test Plan: single unit test in write_batch_test. Reviewers: hermanlee4, sdong, anthony Subscribers: andrewkr, vasilep, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D54093 --- db/dbformat.h | 7 +- db/write_batch.cc | 140 +++++++++++++++++- db/write_batch_internal.h | 8 + db/write_batch_test.cc | 40 +++++ .../utilities/write_batch_with_index.h | 3 +- include/rocksdb/write_batch.h | 29 ++++ .../write_batch_with_index.cc | 15 +- .../write_batch_with_index_internal.cc | 19 ++- .../write_batch_with_index_internal.h | 2 +- 9 files changed, 241 insertions(+), 22 deletions(-) diff --git a/db/dbformat.h b/db/dbformat.h index 51f5e4143..220232321 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -39,6 +39,11 @@ enum ValueType : unsigned char { kTypeColumnFamilyMerge = 0x6, // WAL only. kTypeSingleDeletion = 0x7, kTypeColumnFamilySingleDeletion = 0x8, // WAL only. + kTypeBeginPrepareXID = 0x9, // WAL only. + kTypeEndPrepareXID = 0xA, // WAL only. + kTypeCommitXID = 0xB, // WAL only. + kTypeRollbackXID = 0xC, // WAL only. + kTypeNoop = 0xD, // WAL only. kMaxValue = 0x7F // Not used for storing records. }; @@ -478,5 +483,5 @@ extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, // input will be advanced to after the record. extern Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, - Slice* value, Slice* blob); + Slice* value, Slice* blob, Slice* xid); } // namespace rocksdb diff --git a/db/write_batch.cc b/db/write_batch.cc index 8a54432bb..801c87ba4 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -20,6 +20,11 @@ // kTypeColumnFamilyDeletion varint32 varstring varstring // kTypeColumnFamilySingleDeletion varint32 varstring varstring // kTypeColumnFamilyMerge varint32 varstring varstring +// kTypeBeginPrepareXID varstring +// kTypeEndPrepareXID +// kTypeCommitXID varstring +// kTypeRollbackXID varstring +// kTypeNoop // varstring := // len: varint32 // data: uint8[len] @@ -48,11 +53,15 @@ namespace rocksdb { namespace { enum ContentFlags : uint32_t { - DEFERRED = 1, - HAS_PUT = 2, - HAS_DELETE = 4, - HAS_SINGLE_DELETE = 8, - HAS_MERGE = 16, + DEFERRED = 1 << 0, + HAS_PUT = 1 << 1, + HAS_DELETE = 1 << 2, + HAS_SINGLE_DELETE = 1 << 3, + HAS_MERGE = 1 << 4, + HAS_BEGIN_PREPARE = 1 << 5, + HAS_END_PREPARE = 1 << 6, + HAS_COMMIT = 1 << 7, + HAS_ROLLBACK = 1 << 8, }; struct BatchContentClassifier : public WriteBatch::Handler { @@ -77,6 +86,26 @@ struct BatchContentClassifier : public WriteBatch::Handler { content_flags |= ContentFlags::HAS_MERGE; return Status::OK(); } + + Status MarkBeginPrepare() override { + content_flags |= ContentFlags::HAS_BEGIN_PREPARE; + return Status::OK(); + } + + Status MarkEndPrepare(const Slice&) override { + content_flags |= ContentFlags::HAS_END_PREPARE; + return Status::OK(); + } + + Status MarkCommit(const Slice&) override { + content_flags |= ContentFlags::HAS_COMMIT; + return Status::OK(); + } + + Status MarkRollback(const Slice&) override { + content_flags |= ContentFlags::HAS_ROLLBACK; + return Status::OK(); + } }; } // anon namespace @@ -97,6 +126,7 @@ WriteBatch::WriteBatch(size_t reserved_bytes) rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? reserved_bytes : WriteBatchInternal::kHeader); rep_.resize(WriteBatchInternal::kHeader); + rep_.push_back(static_cast(kTypeNoop)); } WriteBatch::WriteBatch(const std::string& rep) @@ -146,6 +176,7 @@ bool WriteBatch::Handler::Continue() { void WriteBatch::Clear() { rep_.clear(); rep_.resize(WriteBatchInternal::kHeader); + rep_.push_back(static_cast(kTypeNoop)); content_flags_.store(0, std::memory_order_relaxed); @@ -209,9 +240,25 @@ bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) { return GetLengthPrefixedSlice(input, key); } +bool WriteBatch::HasBeginPrepare() const { + return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0; +} + +bool WriteBatch::HasEndPrepare() const { + return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0; +} + +bool WriteBatch::HasCommit() const { + return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0; +} + +bool WriteBatch::HasRollback() const { + return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0; +} + Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, - Slice* value, Slice* blob) { + Slice* value, Slice* blob, Slice* xid) { assert(key != nullptr && value != nullptr); *tag = (*input)[0]; input->remove_prefix(1); @@ -257,6 +304,24 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, return Status::Corruption("bad WriteBatch Blob"); } break; + case kTypeNoop: + case kTypeBeginPrepareXID: + break; + case kTypeEndPrepareXID: + if (!GetLengthPrefixedSlice(input, xid)) { + return Status::Corruption("bad EndPrepare XID"); + } + break; + case kTypeCommitXID: + if (!GetLengthPrefixedSlice(input, xid)) { + return Status::Corruption("bad Commit XID"); + } + break; + case kTypeRollbackXID: + if (!GetLengthPrefixedSlice(input, xid)) { + return Status::Corruption("bad Rollback XID"); + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -270,7 +335,7 @@ Status WriteBatch::Iterate(Handler* handler) const { } input.remove_prefix(WriteBatchInternal::kHeader); - Slice key, value, blob; + Slice key, value, blob, xid; int found = 0; Status s; while (s.ok() && !input.empty() && handler->Continue()) { @@ -278,7 +343,7 @@ Status WriteBatch::Iterate(Handler* handler) const { uint32_t column_family = 0; // default s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value, - &blob); + &blob, &xid); if (!s.ok()) { return s; } @@ -315,6 +380,28 @@ Status WriteBatch::Iterate(Handler* handler) const { case kTypeLogData: handler->LogData(blob); break; + case kTypeBeginPrepareXID: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); + handler->MarkBeginPrepare(); + break; + case kTypeEndPrepareXID: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE)); + handler->MarkEndPrepare(xid); + break; + case kTypeCommitXID: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT)); + handler->MarkCommit(xid); + break; + case kTypeRollbackXID: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK)); + handler->MarkRollback(xid); + break; + case kTypeNoop: + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -391,6 +478,43 @@ void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); } +void WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) { + // a manually constructed batch can only contain one prepare section + assert(b->rep_[12] == static_cast(kTypeNoop)); + + // all savepoints up to this point are cleared + if (b->save_points_ != nullptr) { + while (!b->save_points_->stack.empty()) { + b->save_points_->stack.pop(); + } + } + + // rewrite noop as begin marker + b->rep_[12] = static_cast(kTypeBeginPrepareXID); + b->rep_.push_back(static_cast(kTypeEndPrepareXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_END_PREPARE | + ContentFlags::HAS_BEGIN_PREPARE, + std::memory_order_relaxed); +} + +void WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) { + b->rep_.push_back(static_cast(kTypeCommitXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_COMMIT, + std::memory_order_relaxed); +} + +void WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) { + b->rep_.push_back(static_cast(kTypeRollbackXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_ROLLBACK, + std::memory_order_relaxed); +} + void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, const Slice& key) { WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 3987645ef..521e4f596 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -92,6 +92,14 @@ class WriteBatchInternal { static void Merge(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value); + static void MarkBeginPrepare(WriteBatch* batch); + + static void MarkEndPrepare(WriteBatch* batch, const Slice& xid); + + static void MarkRollback(WriteBatch* batch, const Slice& xid); + + static void MarkCommit(WriteBatch* batch, const Slice& xid); + // Return the number of entries in the batch. static int Count(const WriteBatch* batch); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 58c7273c3..155f65fb9 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -231,6 +231,22 @@ namespace { virtual void LogData(const Slice& blob) override { seen += "LogData(" + blob.ToString() + ")"; } + virtual Status MarkBeginPrepare() override { + seen += "MarkBeginPrepare()"; + return Status::OK(); + } + virtual Status MarkEndPrepare(const Slice& xid) override { + seen += "MarkEndPrepare(" + xid.ToString() + ")"; + return Status::OK(); + } + virtual Status MarkCommit(const Slice& xid) override { + seen += "MarkCommit(" + xid.ToString() + ")"; + return Status::OK(); + } + virtual Status MarkRollback(const Slice& xid) override { + seen += "MarkRollback(" + xid.ToString() + ")"; + return Status::OK(); + } }; } @@ -308,6 +324,30 @@ TEST_F(WriteBatchTest, Blob) { handler.seen); } +TEST_F(WriteBatchTest, PrepareCommit) { + WriteBatch batch; + batch.Put(Slice("k1"), Slice("v1")); + batch.Put(Slice("k2"), Slice("v2")); + batch.SetSavePoint(); + WriteBatchInternal::MarkEndPrepare(&batch, Slice("xid1")); + Status s = batch.RollbackToSavePoint(); + ASSERT_EQ(s, Status::NotFound()); + WriteBatchInternal::MarkCommit(&batch, Slice("xid1")); + WriteBatchInternal::MarkRollback(&batch, Slice("xid1")); + ASSERT_EQ(2, batch.Count()); + + TestHandler handler; + batch.Iterate(&handler); + ASSERT_EQ( + "MarkBeginPrepare()" + "Put(k1, v1)" + "Put(k2, v2)" + "MarkEndPrepare(xid1)" + "MarkCommit(xid1)" + "MarkRollback(xid1)", + handler.seen); +} + // It requires more than 30GB of memory to run the test. With single memory // allocation of more than 30GB. // Not all platform can run it. Also it runs a long time. So disable it. diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index aab12ba02..ccfd67e5e 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -34,7 +34,8 @@ enum WriteType { kMergeRecord, kDeleteRecord, kSingleDeleteRecord, - kLogDataRecord + kLogDataRecord, + kXIDRecord, }; // an entry for Put, Merge, Delete, or SingleDelete entry for write batches. diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index e9bd72b58..2a6ed63bd 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -186,6 +186,23 @@ class WriteBatch : public WriteBatchBase { // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); + virtual Status MarkBeginPrepare() { + return Status::InvalidArgument("MarkBeginPrepare() handler not defined."); + } + + virtual Status MarkEndPrepare(const Slice& xid) { + return Status::InvalidArgument("MarkEndPrepare() handler not defined."); + } + + virtual Status MarkRollback(const Slice& xid) { + return Status::InvalidArgument( + "MarkRollbackPrepare() handler not defined."); + } + + virtual Status MarkCommit(const Slice& xid) { + return Status::InvalidArgument("MarkCommit() handler not defined."); + } + // Continue is called by WriteBatch::Iterate. If it returns false, // iteration is halted. Otherwise, it continues iterating. The default // implementation always returns true. @@ -214,6 +231,18 @@ class WriteBatch : public WriteBatchBase { // Returns trie if MergeCF will be called during Iterate bool HasMerge() const; + // Returns true if MarkBeginPrepare will be called during Iterate + bool HasBeginPrepare() const; + + // Returns true if MarkEndPrepare will be called during Iterate + bool HasEndPrepare() const; + + // Returns trie if MarkCommit will be called during Iterate + bool HasCommit() const; + + // Returns trie if MarkRollback will be called during Iterate + bool HasRollback() const; + using WriteBatchBase::GetWriteBatch; WriteBatch* GetWriteBatch() override { return this; } diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 83b07f4db..e16175449 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -337,13 +337,13 @@ class WBWIIteratorImpl : public WBWIIterator { virtual WriteEntry Entry() const override { WriteEntry ret; - Slice blob; + Slice blob, xid; const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); // this is guaranteed with Valid() assert(iter_entry != nullptr && iter_entry->column_family == column_family_id_); - auto s = write_batch_->GetEntryFromDataOffset(iter_entry->offset, &ret.type, - &ret.key, &ret.value, &blob); + auto s = write_batch_->GetEntryFromDataOffset( + iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid); assert(s.ok()); assert(ret.type == kPutRecord || ret.type == kDeleteRecord || ret.type == kSingleDeleteRecord || ret.type == kMergeRecord); @@ -501,7 +501,7 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { // Loop through all entries in Rep and add each one to the index int found = 0; while (s.ok() && !input.empty()) { - Slice key, value, blob; + Slice key, value, blob, xid; uint32_t column_family_id = 0; // default char tag = 0; @@ -509,7 +509,7 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { last_entry_offset = input.data() - write_batch.Data().data(); s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, - &value, &blob); + &value, &blob, &xid); if (!s.ok()) { break; } @@ -529,6 +529,11 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { } break; case kTypeLogData: + case kTypeBeginPrepareXID: + case kTypeEndPrepareXID: + case kTypeCommitXID: + case kTypeRollbackXID: + case kTypeNoop: break; default: return Status::Corruption("unknown WriteBatch tag"); diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index 89114f02d..e4ea104e3 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -24,10 +24,10 @@ class Statistics; Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, - Slice* value, - Slice* blob) const { + Slice* value, Slice* blob, + Slice* xid) const { if (type == nullptr || Key == nullptr || value == nullptr || - blob == nullptr) { + blob == nullptr || xid == nullptr) { return Status::InvalidArgument("Output parameters cannot be null"); } @@ -42,8 +42,8 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); char tag; uint32_t column_family; - Status s = - ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob); + Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, + blob, xid); switch (tag) { case kTypeColumnFamilyValue: @@ -65,6 +65,12 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, case kTypeLogData: *type = kLogDataRecord; break; + case kTypeBeginPrepareXID: + case kTypeEndPrepareXID: + case kTypeCommitXID: + case kTypeRollbackXID: + *type = kXIDRecord; + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -183,7 +189,8 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( result = WriteBatchWithIndexInternal::Result::kDeleted; break; } - case kLogDataRecord: { + case kLogDataRecord: + case kXIDRecord: { // ignore break; } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index b45dcadf8..95fdf5aaa 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -58,7 +58,7 @@ class ReadableWriteBatch : public WriteBatch { // Retrieve some information from a write entry in the write batch, given // the start offset of the write entry. Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, - Slice* value, Slice* blob) const; + Slice* value, Slice* blob, Slice* xid) const; }; class WriteBatchEntryComparator {