diff --git a/db/db_impl.cc b/db/db_impl.cc index 80dc3ff76..7510f3d61 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -139,7 +139,7 @@ int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024; } // namespace DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, - const bool seq_per_batch) + const bool seq_per_batch, const bool batch_per_txn) : env_(options.env), dbname_(dbname), own_info_log_(options.info_log == nullptr), @@ -200,6 +200,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, two_write_queues_(options.two_write_queues), manual_wal_flush_(options.manual_wal_flush), seq_per_batch_(seq_per_batch), + batch_per_txn_(batch_per_txn), // last_sequencee_ is always maintained by the main queue that also writes // to the memtable. When two_write_queues_ is disabled last seq in // memtable is the same as last seq published to the readers. When it is @@ -218,6 +219,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, preserve_deletes_(options.preserve_deletes), closed_(false), error_handler_(immutable_db_options_, &mutex_) { + // !batch_per_trx_ implies seq_per_batch_ because it is only unset for + // WriteUnprepared, which should use seq_per_batch_. + assert(batch_per_txn_ || seq_per_batch_); env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. diff --git a/db/db_impl.h b/db/db_impl.h index 8ff6db637..a27e45c38 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -72,7 +72,7 @@ struct MemTableInfo; class DBImpl : public DB { public: DBImpl(const DBOptions& options, const std::string& dbname, - const bool seq_per_batch = false); + const bool seq_per_batch = false, const bool batch_per_txn = true); virtual ~DBImpl(); using DB::Resume; @@ -640,7 +640,7 @@ class DBImpl : public DB { static Status Open(const DBOptions& db_options, const std::string& name, const std::vector& column_families, std::vector* handles, DB** dbptr, - const bool seq_per_batch); + const bool seq_per_batch, const bool batch_per_txn); virtual Status Close() override; @@ -1410,6 +1410,13 @@ class DBImpl : public DB { // // Default: false const bool seq_per_batch_; + // This determines during recovery whether we expect one writebatch per + // recovered transaction, or potentially multiple writebatches per + // transaction. For WriteUnprepared, this is set to false, since multiple + // batches can exist per transaction. + // + // Default: true + const bool batch_per_txn_; // LastSequence also indicates last published sequence visibile to the // readers. Otherwise LastPublishedSequence should be used. const bool last_seq_same_as_publish_seq_; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 31d014568..27c01658c 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -720,7 +720,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), &flush_scheduler_, true, log_number, this, false /* concurrent_memtable_writes */, - next_sequence, &has_valid_writes, seq_per_batch_); + next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); MaybeIgnoreError(&status); if (!status.ok()) { // We are treating this as a failure while reading since we read valid @@ -1004,15 +1004,16 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { Status DB::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DB** dbptr) { - const bool seq_per_batch = true; + const bool kSeqPerBatch = true; + const bool kBatchPerTxn = true; return DBImpl::Open(db_options, dbname, column_families, handles, dbptr, - !seq_per_batch); + !kSeqPerBatch, kBatchPerTxn); } Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DB** dbptr, - const bool seq_per_batch) { + const bool seq_per_batch, const bool batch_per_txn) { Status s = SanitizeOptionsByTable(db_options, column_families); if (!s.ok()) { return s; @@ -1032,7 +1033,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, std::max(max_write_buffer_size, cf.options.write_buffer_size); } - DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch); + DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn); s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir); if (s.ok()) { std::vector paths; diff --git a/db/dbformat.h b/db/dbformat.h index 7262bb24b..221959036 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -55,6 +55,10 @@ enum ValueType : unsigned char { // is not mistakenly read by another, which would result into data // inconsistency. kTypeBeginPersistedPrepareXID = 0x12, // WAL only. + // Similar to kTypeBeginPersistedPrepareXID, this is to ensure that WAL + // generated by WriteUnprepared write policy is not mistakenly read by + // another. + kTypeBeginUnprepareXID = 0x13, // WAL only. kMaxValue = 0x7F // Not used for storing records. }; diff --git a/db/write_batch.cc b/db/write_batch.cc index 021af9c0d..6e2db1cf3 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -27,6 +27,7 @@ // kTypeCommitXID varstring // kTypeRollbackXID varstring // kTypeBeginPersistedPrepareXID varstring +// kTypeBeginUnprepareXID varstring // kTypeNoop // varstring := // len: varint32 @@ -366,6 +367,8 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, // This indicates that the prepared batch is also persisted in the db. // This is used in WritePreparedTxn case kTypeBeginPersistedPrepareXID: + // This is used in WriteUnpreparedTxn + case kTypeBeginUnprepareXID: break; case kTypeEndPrepareXID: if (!GetLengthPrefixedSlice(input, xid)) { @@ -503,8 +506,15 @@ Status WriteBatch::Iterate(Handler* handler) const { if (!handler->WriteAfterCommit()) { s = Status::NotSupported( "WriteCommitted txn tag when write_after_commit_ is disabled (in " - "WritePrepared mode). If it is not due to corruption, the WAL " - "must be emptied before changing the WritePolicy."); + "WritePrepared/WriteUnprepared mode). If it is not due to " + "corruption, the WAL must be emptied before changing the " + "WritePolicy."); + } + if (handler->WriteBeforePrepare()) { + s = Status::NotSupported( + "WriteCommitted txn tag when write_before_prepare_ is enabled " + "(in WriteUnprepared mode). If it is not due to corruption, the " + "WAL must be emptied before changing the WritePolicy."); } break; case kTypeBeginPersistedPrepareXID: @@ -514,10 +524,30 @@ Status WriteBatch::Iterate(Handler* handler) const { empty_batch = false; if (handler->WriteAfterCommit()) { s = Status::NotSupported( - "WritePrepared txn tag when write_after_commit_ is enabled (in " + "WritePrepared/WriteUnprepared txn tag when write_after_commit_ " + "is enabled (in default WriteCommitted mode). If it is not due " + "to corruption, the WAL must be emptied before changing the " + "WritePolicy."); + } + break; + case kTypeBeginUnprepareXID: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); + handler->MarkBeginPrepare(); + empty_batch = false; + if (handler->WriteAfterCommit()) { + s = Status::NotSupported( + "WriteUnprepared txn tag when write_after_commit_ is enabled (in " "default WriteCommitted mode). If it is not due to corruption, " "the WAL must be emptied before changing the WritePolicy."); } + if (!handler->WriteBeforePrepare()) { + s = Status::NotSupported( + "WriteUnprepared txn tag when write_before_prepare_ is disabled " + "(in WriteCommitted/WritePrepared mode). If it is not due to " + "corruption, the WAL must be emptied before changing the " + "WritePolicy."); + } break; case kTypeEndPrepareXID: assert(content_flags_.load(std::memory_order_relaxed) & @@ -669,7 +699,8 @@ Status WriteBatchInternal::InsertNoop(WriteBatch* b) { } Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, - bool write_after_commit) { + bool write_after_commit, + bool unprepared_batch) { // a manually constructed batch can only contain one prepare section assert(b->rep_[12] == static_cast(kTypeNoop)); @@ -681,9 +712,10 @@ Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, } // rewrite noop as begin marker - b->rep_[12] = - static_cast(write_after_commit ? kTypeBeginPrepareXID - : kTypeBeginPersistedPrepareXID); + b->rep_[12] = static_cast( + write_after_commit ? kTypeBeginPrepareXID + : (unprepared_batch ? kTypeBeginUnprepareXID + : kTypeBeginPersistedPrepareXID)); b->rep_.push_back(static_cast(kTypeEndPrepareXID)); PutLengthPrefixedSlice(&b->rep_, xid); b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | @@ -1018,6 +1050,8 @@ class MemTableInserter : public WriteBatch::Handler { bool seq_per_batch_; // Whether the memtable write will be done only after the commit bool write_after_commit_; + // Whether memtable write can be done before prepare + bool write_before_prepare_; using DupDetector = std::aligned_storage::type; DupDetector duplicate_detector_; bool dup_dectector_on_; @@ -1043,6 +1077,9 @@ class MemTableInserter : public WriteBatch::Handler { } protected: + virtual bool WriteBeforePrepare() const override { + return write_before_prepare_; + } virtual bool WriteAfterCommit() const override { return write_after_commit_; } public: @@ -1052,7 +1089,8 @@ class MemTableInserter : public WriteBatch::Handler { bool ignore_missing_column_families, uint64_t recovering_log_number, DB* db, bool concurrent_memtable_writes, - bool* has_valid_writes = nullptr, bool seq_per_batch = false) + bool* has_valid_writes = nullptr, bool seq_per_batch = false, + bool batch_per_txn = true) : sequence_(_sequence), cf_mems_(cf_mems), flush_scheduler_(flush_scheduler), @@ -1070,6 +1108,9 @@ class MemTableInserter : public WriteBatch::Handler { // batch). So seq_per_batch being false indicates write_after_commit // approach. write_after_commit_(!seq_per_batch), + // WriteUnprepared can write WriteBatches per transaction, so + // batch_per_txn being false indicates write_before_prepare. + write_before_prepare_(!batch_per_txn), duplicate_detector_(), dup_dectector_on_(false) { assert(cf_mems_); @@ -1090,8 +1131,6 @@ class MemTableInserter : public WriteBatch::Handler { MemTableInserter(const MemTableInserter&) = delete; MemTableInserter& operator=(const MemTableInserter&) = delete; - virtual bool WriterAfterCommit() const { return write_after_commit_; } - // The batch seq is regularly restarted; In normal mode it is set when // MemTableInserter is constructed in the write thread and in recovery mode it // is set when a batch, which is tagged with seq, is read from the WAL. @@ -1693,11 +1732,11 @@ Status WriteBatchInternal::InsertInto( WriteThread::WriteGroup& write_group, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db, - bool concurrent_memtable_writes, bool seq_per_batch) { - MemTableInserter inserter(sequence, memtables, flush_scheduler, - ignore_missing_column_families, recovery_log_number, - db, concurrent_memtable_writes, - nullptr /*has_valid_writes*/, seq_per_batch); + bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) { + MemTableInserter inserter( + sequence, memtables, flush_scheduler, ignore_missing_column_families, + recovery_log_number, db, concurrent_memtable_writes, + nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn); for (auto w : write_group) { if (w->CallbackFailed()) { continue; @@ -1724,15 +1763,16 @@ Status WriteBatchInternal::InsertInto( WriteThread::Writer* writer, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, - bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt) { + bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt, + bool batch_per_txn) { #ifdef NDEBUG (void)batch_cnt; #endif assert(writer->ShouldWriteToMemtable()); - MemTableInserter inserter(sequence, memtables, flush_scheduler, - ignore_missing_column_families, log_number, db, - concurrent_memtable_writes, - nullptr /*has_valid_writes*/, seq_per_batch); + MemTableInserter inserter( + sequence, memtables, flush_scheduler, ignore_missing_column_families, + log_number, db, concurrent_memtable_writes, nullptr /*has_valid_writes*/, + seq_per_batch, batch_per_txn); SetSequence(writer->batch, sequence); inserter.set_log_number_ref(writer->log_ref); Status s = writer->batch->Iterate(&inserter); @@ -1748,11 +1788,12 @@ Status WriteBatchInternal::InsertInto( const WriteBatch* batch, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, bool concurrent_memtable_writes, - SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch) { + SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch, + bool batch_per_txn) { MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler, ignore_missing_column_families, log_number, db, concurrent_memtable_writes, has_valid_writes, - seq_per_batch); + seq_per_batch, batch_per_txn); Status s = batch->Iterate(&inserter); if (next_seq != nullptr) { *next_seq = inserter.sequence(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index ba0e9ffe4..bae62bf03 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -103,7 +103,8 @@ class WriteBatchInternal { const Slice& key, const Slice& value); static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid, - const bool write_after_commit = true); + const bool write_after_commit = true, + const bool unprepared_batch = false); static Status MarkRollback(WriteBatch* batch, const Slice& xid); @@ -158,26 +159,22 @@ class WriteBatchInternal { // // Under concurrent use, the caller is responsible for making sure that // the memtables object itself is thread-local. - static Status InsertInto(WriteThread::WriteGroup& write_group, - SequenceNumber sequence, - ColumnFamilyMemTables* memtables, - FlushScheduler* flush_scheduler, - bool ignore_missing_column_families = false, - uint64_t log_number = 0, DB* db = nullptr, - bool concurrent_memtable_writes = false, - bool seq_per_batch = false); + static Status InsertInto( + WriteThread::WriteGroup& write_group, SequenceNumber sequence, + ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + bool ignore_missing_column_families = false, uint64_t log_number = 0, + DB* db = nullptr, bool concurrent_memtable_writes = false, + bool seq_per_batch = false, bool batch_per_txn = true); // Convenience form of InsertInto when you have only one batch // next_seq returns the seq after last sequence number used in MemTable insert - static Status InsertInto(const WriteBatch* batch, - ColumnFamilyMemTables* memtables, - FlushScheduler* flush_scheduler, - bool ignore_missing_column_families = false, - uint64_t log_number = 0, DB* db = nullptr, - bool concurrent_memtable_writes = false, - SequenceNumber* next_seq = nullptr, - bool* has_valid_writes = nullptr, - bool seq_per_batch = false); + static Status InsertInto( + const WriteBatch* batch, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, + bool ignore_missing_column_families = false, uint64_t log_number = 0, + DB* db = nullptr, bool concurrent_memtable_writes = false, + SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr, + bool seq_per_batch = false, bool batch_per_txn = true); static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence, ColumnFamilyMemTables* memtables, @@ -185,7 +182,8 @@ class WriteBatchInternal { bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, - bool seq_per_batch = false, size_t batch_cnt = 0); + bool seq_per_batch = false, size_t batch_cnt = 0, + bool batch_per_txn = true); static Status Append(WriteBatch* dst, const WriteBatch* src, const bool WAL_only = false); diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index f62450f77..1909b8e39 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -157,8 +157,9 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); std::vector handles; - auto open_s = DBImpl::Open(db_options, dbname, column_families, - &handles, &db, seq_per_batch); + auto open_s = + DBImpl::Open(db_options, dbname, column_families, &handles, + &db, seq_per_batch, true /* batch_per_txn */); ASSERT_OK(open_s); assert(handles.size() == 1); delete handles[0]; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 38bdcfc69..3abf53b9d 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -272,6 +272,7 @@ class WriteBatch : public WriteBatchBase { protected: friend class WriteBatch; virtual bool WriteAfterCommit() const { return true; } + virtual bool WriteBeforePrepare() const { return false; } }; Status Iterate(Handler* handler) const; diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 38ac6067c..ff8121e9e 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -220,8 +220,11 @@ Status TransactionDB::Open( const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED || txn_db_options.write_policy == WRITE_UNPREPARED; + const bool use_batch_per_txn = + txn_db_options.write_policy == WRITE_COMMITTED || + txn_db_options.write_policy == WRITE_PREPARED; s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db, - use_seq_per_batch); + use_seq_per_batch, use_batch_per_txn); if (s.ok()) { s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, dbptr); diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index a4b363c51..2d91f5b2e 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -146,8 +146,11 @@ class TransactionTestBase : public ::testing::Test { const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED || txn_db_options.write_policy == WRITE_UNPREPARED; + const bool use_batch_per_txn = + txn_db_options.write_policy == WRITE_COMMITTED || + txn_db_options.write_policy == WRITE_PREPARED; Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db, - use_seq_per_batch); + use_seq_per_batch, use_batch_per_txn); StackableDB* stackable_db = new StackableDB(root_db); if (s.ok()) { assert(root_db != nullptr); @@ -176,8 +179,11 @@ class TransactionTestBase : public ::testing::Test { const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED || txn_db_options.write_policy == WRITE_UNPREPARED; + const bool use_batch_per_txn = + txn_db_options.write_policy == WRITE_COMMITTED || + txn_db_options.write_policy == WRITE_PREPARED; Status s = DBImpl::Open(options_copy, dbname, column_families, &handles, - &root_db, use_seq_per_batch); + &root_db, use_seq_per_batch, use_batch_per_txn); StackableDB* stackable_db = new StackableDB(root_db); if (s.ok()) { assert(root_db != nullptr); diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 2849e4bbf..6715023f3 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -594,6 +594,7 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() { case kTypeLogData: case kTypeBeginPrepareXID: case kTypeBeginPersistedPrepareXID: + case kTypeBeginUnprepareXID: case kTypeEndPrepareXID: case kTypeCommitXID: case kTypeRollbackXID: diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index b3811a862..14e5f2147 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -72,6 +72,7 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, case kTypeNoop: case kTypeBeginPrepareXID: case kTypeBeginPersistedPrepareXID: + case kTypeBeginUnprepareXID: case kTypeEndPrepareXID: case kTypeCommitXID: case kTypeRollbackXID: