diff --git a/db/db_impl.cc b/db/db_impl.cc index e7bda3cf8..9a5cc3d93 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -191,7 +191,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) refitting_level_(false), opened_successfully_(false), concurrent_prepare_(options.concurrent_prepare), - manual_wal_flush_(options.manual_wal_flush) { + manual_wal_flush_(options.manual_wal_flush), + seq_per_batch_(options.seq_per_batch) { env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. diff --git a/db/db_impl.h b/db/db_impl.h index c5a7e2493..64f5dee67 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -772,7 +772,7 @@ class DBImpl : public DB { Status ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, uint64_t* log_used, SequenceNumber* last_sequence, - int total_count); + size_t seq_inc); // Used by WriteImpl to update bg_error_ if paranoid check is enabled. void WriteCallbackStatusCheck(const Status& status); @@ -1267,6 +1267,7 @@ class DBImpl : public DB { // 2PC these are the writes at Prepare phase. const bool concurrent_prepare_; const bool manual_wal_flush_; + const bool seq_per_batch_; }; extern Options SanitizeOptions(const std::string& db, diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index bc94b6095..d95c1bf7d 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -674,7 +674,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), &flush_scheduler_, true, log_number, this, false /* concurrent_memtable_writes */, - next_sequence, &has_valid_writes); + next_sequence, &has_valid_writes, seq_per_batch_); MaybeIgnoreError(&status); if (!status.ok()) { // We are treating this as a failure while reading since we read valid diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 71ee80c49..246d39a47 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -68,6 +68,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::NotSupported( "pipelined_writes is not compatible with concurrent prepares"); } + if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) { + return Status::NotSupported( + "pipelined_writes is not compatible with seq_per_batch"); + } Status status; if (write_options.low_pri) { @@ -184,7 +188,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // more than once to a particular key. bool parallel = immutable_db_options_.allow_concurrent_memtable_write && write_group.size > 1; - int total_count = 0; + size_t total_count = 0; uint64_t total_byte_size = 0; for (auto* writer : write_group) { if (writer->CheckCallback(this)) { @@ -197,6 +201,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); } } + size_t seq_inc = seq_per_batch_ ? write_group.size : total_count; const bool concurrent_update = concurrent_prepare_; // Update stats while we are an exclusive group leader, so we know @@ -238,15 +243,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // LastToBeWrittenSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, - total_count); + seq_inc); } else { // Otherwise we inc seq number for memtable writes - last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count); + last_sequence = versions_->FetchAddLastToBeWrittenSequence(seq_inc); } } assert(last_sequence != kMaxSequenceNumber); const SequenceNumber current_sequence = last_sequence + 1; - last_sequence += total_count; + last_sequence += seq_inc; if (status.ok()) { PERF_TIMER_GUARD(write_memtable_time); @@ -255,12 +260,16 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, - 0 /*recovery_log_number*/, this); + 0 /*recovery_log_number*/, this, parallel, seq_per_batch_); } else { SequenceNumber next_sequence = current_sequence; for (auto* writer : write_group) { if (writer->ShouldWriteToMemtable()) { writer->sequence = next_sequence; + } + if (seq_per_batch_) { + next_sequence++; + } else if (writer->ShouldWriteToMemtable()) { next_sequence += WriteBatchInternal::Count(writer->batch); } } @@ -281,9 +290,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); } - if (seq_used != nullptr) { - *seq_used = w.sequence; - } + } + if (seq_used != nullptr) { + *seq_used = w.sequence; } } } @@ -427,7 +436,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, memtable_write_group.status = WriteBatchInternal::InsertInto( memtable_write_group, w.sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, - 0 /*log_number*/, this); + 0 /*log_number*/, this, seq_per_batch_); versions_->SetLastSequence(memtable_write_group.last_sequence); write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); } @@ -521,12 +530,16 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, PERF_TIMER_GUARD(write_wal_time); // LastToBeWrittenSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL - status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, - 0 /*total_count*/); + size_t seq_inc = seq_per_batch_ ? write_group.size : 0 /*total_count*/; + status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); auto curr_seq = last_sequence + 1; for (auto* writer : write_group) { if (writer->CheckCallback(this)) { writer->sequence = curr_seq; + } + if (seq_per_batch_) { + curr_seq++; + } else if (writer->CheckCallback(this)) { curr_seq += WriteBatchInternal::Count(writer->batch); } } @@ -778,7 +791,7 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, uint64_t* log_used, SequenceNumber* last_sequence, - int total_count) { + size_t seq_inc) { Status status; WriteBatch tmp_batch; @@ -796,7 +809,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, writer->log_used = logfile_number_; } } - *last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count); + *last_sequence = versions_->FetchAddLastToBeWrittenSequence(seq_inc); auto sequence = *last_sequence + 1; WriteBatchInternal::SetSequence(merged_batch, sequence); diff --git a/db/write_batch.cc b/db/write_batch.cc index 43639ac23..90f361140 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -144,7 +144,7 @@ WriteBatch::WriteBatch(const WriteBatch& src) max_bytes_(src.max_bytes_), rep_(src.rep_) {} -WriteBatch::WriteBatch(WriteBatch&& src) +WriteBatch::WriteBatch(WriteBatch&& src) noexcept : save_points_(std::move(src.save_points_)), wal_term_point_(std::move(src.wal_term_point_)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)), @@ -366,6 +366,7 @@ Status WriteBatch::Iterate(Handler* handler) const { input.remove_prefix(WriteBatchInternal::kHeader); Slice key, value, blob, xid; + bool first_tag = true; int found = 0; Status s; while (s.ok() && !input.empty() && handler->Continue()) { @@ -438,10 +439,12 @@ Status WriteBatch::Iterate(Handler* handler) const { handler->MarkRollback(xid); break; case kTypeNoop: + handler->MarkNoop(first_tag); break; default: return Status::Corruption("unknown WriteBatch tag"); } + first_tag = false; } if (!s.ok()) { return s; @@ -838,6 +841,9 @@ class MemTableInserter : public WriteBatch::Handler { PostMapType mem_post_info_map_; // current recovered transaction we are rebuilding (recovery) WriteBatch* rebuilding_trx_; + // Increase seq number once per each write batch. Otherwise increase it once + // per key. + bool seq_per_batch_; MemPostInfoMap& GetPostMap() { assert(concurrent_memtable_writes_); @@ -848,26 +854,27 @@ class MemTableInserter : public WriteBatch::Handler { return *reinterpret_cast(&mem_post_info_map_); } -public: + public: // cf_mems should not be shared with concurrent inserters - MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems, - FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, - uint64_t recovering_log_number, DB* db, - bool concurrent_memtable_writes, - bool* has_valid_writes = nullptr) - : sequence_(_sequence), - cf_mems_(cf_mems), - flush_scheduler_(flush_scheduler), - ignore_missing_column_families_(ignore_missing_column_families), - recovering_log_number_(recovering_log_number), - log_number_ref_(0), - db_(reinterpret_cast(db)), - concurrent_memtable_writes_(concurrent_memtable_writes), - post_info_created_(false), - has_valid_writes_(has_valid_writes), - rebuilding_trx_(nullptr) { - assert(cf_mems_); + MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems, + FlushScheduler* flush_scheduler, + bool ignore_missing_column_families, + uint64_t recovering_log_number, DB* db, + bool concurrent_memtable_writes, + bool* has_valid_writes = nullptr, bool seq_per_batch = false) + : sequence_(_sequence), + cf_mems_(cf_mems), + flush_scheduler_(flush_scheduler), + ignore_missing_column_families_(ignore_missing_column_families), + recovering_log_number_(recovering_log_number), + log_number_ref_(0), + db_(reinterpret_cast(db)), + concurrent_memtable_writes_(concurrent_memtable_writes), + post_info_created_(false), + has_valid_writes_(has_valid_writes), + rebuilding_trx_(nullptr), + seq_per_batch_(seq_per_batch) { + assert(cf_mems_); } ~MemTableInserter() { @@ -880,6 +887,12 @@ public: MemTableInserter(const MemTableInserter&) = delete; MemTableInserter& operator=(const MemTableInserter&) = delete; + void MaybeAdvanceSeq(bool batch_boundry = false) { + if (batch_boundry == seq_per_batch_) { + sequence_++; + } + } + void set_log_number_ref(uint64_t log) { log_number_ref_ = log; } SequenceNumber sequence() const { return sequence_; } @@ -944,7 +957,7 @@ public: Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { - ++sequence_; + MaybeAdvanceSeq(); return seek_status; } @@ -998,7 +1011,7 @@ public: // Since all Puts are logged in trasaction logs (if enabled), always bump // sequence number. Even if the update eventually fails and does not result // in memtable add/update. - sequence_++; + MaybeAdvanceSeq(); CheckMemtableFull(); return Status::OK(); } @@ -1008,7 +1021,7 @@ public: MemTable* mem = cf_mems_->GetMemTable(); mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_, get_post_process_info(mem)); - sequence_++; + MaybeAdvanceSeq(); CheckMemtableFull(); return Status::OK(); } @@ -1022,7 +1035,7 @@ public: Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { - ++sequence_; + MaybeAdvanceSeq(); return seek_status; } @@ -1038,7 +1051,7 @@ public: Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { - ++sequence_; + MaybeAdvanceSeq(); return seek_status; } @@ -1056,7 +1069,7 @@ public: Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { - ++sequence_; + MaybeAdvanceSeq(); return seek_status; } if (db_ != nullptr) { @@ -1086,7 +1099,7 @@ public: Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { - ++sequence_; + MaybeAdvanceSeq(); return seek_status; } @@ -1154,7 +1167,7 @@ public: mem->Add(sequence_, kTypeMerge, key, value); } - sequence_++; + MaybeAdvanceSeq(); CheckMemtableFull(); return Status::OK(); } @@ -1190,11 +1203,6 @@ public: if (has_valid_writes_ != nullptr) { *has_valid_writes_ = true; } - } else { - // in non-recovery we ignore prepare markers - // and insert the values directly. making sure we have a - // log for each insertion to reference. - assert(log_number_ref_ > 0); } return Status::OK(); @@ -1211,12 +1219,26 @@ public: rebuilding_trx_ = nullptr; } else { assert(rebuilding_trx_ == nullptr); - assert(log_number_ref_ > 0); } + const bool batch_boundry = true; + MaybeAdvanceSeq(batch_boundry); return Status::OK(); } + Status MarkNoop(bool first_tag) override { + // A hack in pessimistic transaction could result into a noop at the start + // of the write batch, that should be ignored. + if (!first_tag) { + // In the absence of Prepare markers, a kTypeNoop tag indicates the end of + // a batch. This happens when write batch commits skipping the prepare + // phase. + const bool batch_boundry = true; + MaybeAdvanceSeq(batch_boundry); + } + return Status::OK(); + } + Status MarkCommit(const Slice& name) override { assert(db_); @@ -1238,6 +1260,8 @@ public: // all insertes must reference this trx log number log_number_ref_ = trx->log_number_; s = trx->batch_->Iterate(this); + // TODO(myabandeh): In WritePrepared txn, a commit marker should + // reference the log that contains the prepare marker. log_number_ref_ = 0; if (s.ok()) { @@ -1248,8 +1272,15 @@ public: } } } else { - // in non recovery we simply ignore this tag + // TODO(myabandeh): In WritePrepared txn, a commit marker should + // reference the log that contains the prepare marker. This is to be able + // to reconsutrct the prepared list after recovery. + // TODO(myabandeh): In WritePrepared txn, we do not reach here since + // disable_memtable is set for commit. + assert(log_number_ref_ > 0); } + const bool batch_boundry = true; + MaybeAdvanceSeq(batch_boundry); return s; } @@ -1288,16 +1319,15 @@ public: // 2) During Write(), in a single-threaded write thread // 3) During Write(), in a concurrent context where memtables has been cloned // The reason is that it calls memtables->Seek(), which has a stateful cache -Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group, - SequenceNumber sequence, - ColumnFamilyMemTables* memtables, - FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, - uint64_t recovery_log_number, DB* db, - bool concurrent_memtable_writes) { +Status WriteBatchInternal::InsertInto( + WriteThread::WriteGroup& write_group, SequenceNumber sequence, + ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db, + bool concurrent_memtable_writes, bool seq_per_batch) { MemTableInserter inserter(sequence, memtables, flush_scheduler, ignore_missing_column_families, recovery_log_number, - db, concurrent_memtable_writes); + db, concurrent_memtable_writes, + nullptr /*has_valid_writes*/, seq_per_batch); for (auto w : write_group) { if (!w->ShouldWriteToMemtable()) { continue; @@ -1337,13 +1367,14 @@ Status WriteBatchInternal::InsertInto( const WriteBatch* batch, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, bool concurrent_memtable_writes, - SequenceNumber* last_seq_used, bool* has_valid_writes) { + SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch) { MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler, ignore_missing_column_families, log_number, db, - concurrent_memtable_writes, has_valid_writes); + concurrent_memtable_writes, has_valid_writes, + seq_per_batch); Status s = batch->Iterate(&inserter); - if (last_seq_used != nullptr) { - *last_seq_used = inserter.sequence(); + if (next_seq != nullptr) { + *next_seq = inserter.sequence(); } if (concurrent_memtable_writes) { inserter.PostProcess(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 48a417ce8..a74d8ea0e 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -157,18 +157,20 @@ class WriteBatchInternal { FlushScheduler* flush_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, - bool concurrent_memtable_writes = false); + bool concurrent_memtable_writes = false, + bool seq_per_batch = false); // Convenience form of InsertInto when you have only one batch - // last_seq_used returns the last sequnce number used in a MemTable insert + // next_seq returns the seq after last sequnce number used in MemTable insert static Status InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, - SequenceNumber* last_seq_used = nullptr, - bool* has_valid_writes = nullptr); + SequenceNumber* next_seq = nullptr, + bool* has_valid_writes = nullptr, + bool seq_per_batch = false); static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence, ColumnFamilyMemTables* memtables, diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 4584793ab..624d39598 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -299,6 +299,10 @@ namespace { seen += "MarkEndPrepare(" + xid.ToString() + ")"; return Status::OK(); } + virtual Status MarkNoop(bool first_tag) override { + seen += "MarkNoop(" + std::string(first_tag ? "true" : "false") + ")"; + return Status::OK(); + } virtual Status MarkCommit(const Slice& xid) override { seen += "MarkCommit(" + xid.ToString() + ")"; return Status::OK(); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 4d2f143a0..635a932f8 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -899,6 +899,16 @@ struct DBOptions { // relies on manual invocation of FlushWAL to write the WAL buffer to its // file. bool manual_wal_flush = false; + + // Increase the sequence number after writing each batch, whether memtable is + // disabled for that or not. Otherwise the sequence number is increased after + // writing each key into memtable. This implies that when memtable_disable is + // set, the seq is not increased at all. + // + // Default: false + // Note: This option is experimental and meant to be used only for internal + // projects. + bool seq_per_batch = false; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 8bd93d36c..2ca40bbac 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -244,6 +244,10 @@ class WriteBatch : public WriteBatchBase { return Status::InvalidArgument("MarkEndPrepare() handler not defined."); } + virtual Status MarkNoop(bool first_tag) { + return Status::InvalidArgument("MarkNoop() handler not defined."); + } + virtual Status MarkRollback(const Slice& xid) { return Status::InvalidArgument( "MarkRollbackPrepare() handler not defined."); @@ -303,7 +307,7 @@ class WriteBatch : public WriteBatchBase { explicit WriteBatch(const std::string& rep); WriteBatch(const WriteBatch& src); - WriteBatch(WriteBatch&& src); + WriteBatch(WriteBatch&& src) noexcept; WriteBatch& operator=(const WriteBatch& src); WriteBatch& operator=(WriteBatch&& src); diff --git a/options/db_options.cc b/options/db_options.cc index 6ca07372c..c8c55f43e 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -88,7 +88,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) avoid_flush_during_recovery(options.avoid_flush_during_recovery), allow_ingest_behind(options.allow_ingest_behind), concurrent_prepare(options.concurrent_prepare), - manual_wal_flush(options.manual_wal_flush) { + manual_wal_flush(options.manual_wal_flush), + seq_per_batch(options.seq_per_batch) { } void ImmutableDBOptions::Dump(Logger* log) const { @@ -229,6 +230,7 @@ void ImmutableDBOptions::Dump(Logger* log) const { concurrent_prepare); ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d", manual_wal_flush); + ROCKS_LOG_HEADER(log, " Options.seq_per_batch: %d", seq_per_batch); } MutableDBOptions::MutableDBOptions() diff --git a/options/db_options.h b/options/db_options.h index 18d1a5fb6..99ca7a595 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -81,6 +81,7 @@ struct ImmutableDBOptions { bool allow_ingest_behind; bool concurrent_prepare; bool manual_wal_flush; + bool seq_per_batch; }; struct MutableDBOptions { diff --git a/options/options_helper.h b/options/options_helper.h index cad57edf2..c59367779 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -354,7 +354,11 @@ static std::unordered_map db_options_type_info = { {"manual_wal_flush", {offsetof(struct DBOptions, manual_wal_flush), OptionType::kBoolean, OptionVerificationType::kNormal, false, - offsetof(struct ImmutableDBOptions, manual_wal_flush)}}}; + offsetof(struct ImmutableDBOptions, manual_wal_flush)}}, + {"seq_per_batch", + {offsetof(struct DBOptions, seq_per_batch), OptionType::kBoolean, + OptionVerificationType::kNormal, false, + offsetof(struct ImmutableDBOptions, seq_per_batch)}}}; // offset_of is used to get the offset of a class data member // ex: offset_of(&ColumnFamilyOptions::num_levels) diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index eacfd9057..73008bbe4 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -283,7 +283,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "avoid_flush_during_shutdown=false;" "allow_ingest_behind=false;" "concurrent_prepare=false;" - "manual_wal_flush=false;", + "manual_wal_flush=false;" + "seq_per_batch=false;", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index db23e0b8e..3bda0f2d7 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -76,6 +76,8 @@ PessimisticTransactionDB::PessimisticTransactionDB( PessimisticTransactionDB::~PessimisticTransactionDB() { while (!transactions_.empty()) { delete transactions_.begin()->second; + // TODO(myabandeh): this seems to be an unsafe approach as it is not quite + // clear whether delete would also remove the entry from transactions_. } } @@ -196,6 +198,9 @@ Status TransactionDB::Open( std::vector column_families_copy = column_families; std::vector compaction_enabled_cf_indices; DBOptions db_options_2pc = db_options; + if (txn_db_options.write_policy == WRITE_PREPARED) { + db_options_2pc.seq_per_batch = true; + } PrepareWrap(&db_options_2pc, &column_families_copy, &compaction_enabled_cf_indices); s = DB::Open(db_options_2pc, dbname, column_families_copy, handles, &db); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index ab88da40b..cde5aec2c 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -4640,6 +4640,90 @@ TEST_P(TransactionTest, MemoryLimitTest) { delete txn; } +// This test clarfies the existing expectation from the sequence number +// algorithm. It could detect mistakes in updating the code but it is not +// necessarily the one acceptable way. If the algorithm is legitimately changed, +// this unit test should be updated as well. +TEST_P(TransactionTest, SeqAdvanceTest) { + auto pdb = reinterpret_cast(db); + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + auto seq = db_impl->GetLatestSequenceNumber(); + auto exp_seq = seq; + + // Test DB's internal txn. It involves no prepare phase nor a commit marker. + WriteOptions wopts; + auto s = db->Put(wopts, "key", "value"); + // Consume one seq per key + exp_seq++; + ASSERT_OK(s); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + // Doing it twice might detect some bugs + s = db->Put(wopts, "key", "value"); + exp_seq++; + ASSERT_OK(s); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + // Testing directly writing a write batch. Functionality-wise it is equivalent + // to commit without prepare. + WriteBatch wb; + wb.Put("k1", "v1"); + wb.Put("k2", "v2"); + wb.Put("k3", "v3"); + s = db->Write(wopts, &wb); + // One seq per key. + exp_seq += 3; + ASSERT_OK(s); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + // A full 2pc txn that also involves a commit marker. + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + s = txn->Put(Slice("foo"), Slice("bar")); + s = txn->Put(Slice("foo2"), Slice("bar2")); + s = txn->Put(Slice("foo3"), Slice("bar3")); + s = txn->Put(Slice("foo4"), Slice("bar4")); + s = txn->Put(Slice("foo5"), Slice("bar5")); + ASSERT_OK(s); + s = txn->Prepare(); + ASSERT_OK(s); + // Consume one seq per key + exp_seq += 5; + s = txn->Commit(); + ASSERT_OK(s); + + s = db->Put(wopts, "key", "value"); + exp_seq++; + ASSERT_OK(s); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + // Commit without prepare. It shoudl write to DB without a commit marker. + txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid2"); + ASSERT_OK(s); + s = txn->Put(Slice("foo"), Slice("bar")); + s = txn->Put(Slice("foo2"), Slice("bar2")); + s = txn->Put(Slice("foo3"), Slice("bar3")); + s = txn->Put(Slice("foo4"), Slice("bar4")); + s = txn->Put(Slice("foo5"), Slice("bar5")); + ASSERT_OK(s); + s = txn->Commit(); + ASSERT_OK(s); + // One seq per key + exp_seq += 5; + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + pdb->UnregisterTransaction(txn); + delete txn; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 9e167722c..66ba5f6ce 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -552,6 +552,96 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) { } } +// This test clarfies the existing expectation from the sequence number +// algorithm. It could detect mistakes in updating the code but it is not +// necessarily the one acceptable way. If the algorithm is legitimately changed, +// this unit test should be updated as well. +TEST_P(WritePreparedTransactionTest, SeqAdvanceTest) { + auto pdb = reinterpret_cast(db); + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + auto seq = db_impl->GetLatestSequenceNumber(); + auto exp_seq = seq; + + // Test DB's internal txn. It involves no prepare phase nor a commit marker. + WriteOptions wopts; + auto s = db->Put(wopts, "key", "value"); + // Consume one seq per batch + exp_seq++; + ASSERT_OK(s); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + // Doing it twice might detect some bugs + s = db->Put(wopts, "key", "value"); + exp_seq++; + ASSERT_OK(s); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + // Testing directly writing a write batch. Functionality-wise it is equivalent + // to commit without prepare. + WriteBatch wb; + wb.Put("k1", "v1"); + wb.Put("k2", "v2"); + wb.Put("k3", "v3"); + s = pdb->Write(wopts, &wb); + // Consume one seq per batch + exp_seq++; + ASSERT_OK(s); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + // A full 2pc txn that also involves a commit marker. + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + s = txn->Put(Slice("foo"), Slice("bar")); + s = txn->Put(Slice("foo2"), Slice("bar2")); + s = txn->Put(Slice("foo3"), Slice("bar3")); + s = txn->Put(Slice("foo4"), Slice("bar4")); + s = txn->Put(Slice("foo5"), Slice("bar5")); + ASSERT_OK(s); + s = txn->Prepare(); + ASSERT_OK(s); + // Consume one seq per batch + exp_seq++; + s = txn->Commit(); + ASSERT_OK(s); + // Consume one seq per commit marker + exp_seq++; + // Since commit marker does not write to memtable, the last seq number is not + // updated immedaitely. But the advance should be visible after the next + // write. + + s = db->Put(wopts, "key", "value"); + // Consume one seq per batch + exp_seq++; + ASSERT_OK(s); + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + + // Commit without prepare. It shoudl write to DB without a commit marker. + txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid2"); + ASSERT_OK(s); + s = txn->Put(Slice("foo"), Slice("bar")); + s = txn->Put(Slice("foo2"), Slice("bar2")); + s = txn->Put(Slice("foo3"), Slice("bar3")); + s = txn->Put(Slice("foo4"), Slice("bar4")); + s = txn->Put(Slice("foo5"), Slice("bar5")); + ASSERT_OK(s); + s = txn->Commit(); + ASSERT_OK(s); + // Consume one seq per batch + exp_seq++; + seq = db_impl->GetLatestSequenceNumber(); + ASSERT_EQ(exp_seq, seq); + pdb->UnregisterTransaction(txn); + delete txn; +} + // Test WritePreparedTxnDB's IsInSnapshot against different ordering of // snapshot, max_committed_seq_, prepared, and commit entries. TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 263f5a99b..78671aad8 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -46,11 +46,12 @@ Status WritePreparedTxn::PrepareInternal() { write_options.disableWAL = false; WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); const bool disable_memtable = true; - uint64_t seq_used; + uint64_t seq_used = kMaxSequenceNumber; Status s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !disable_memtable, &seq_used); + assert(seq_used != kMaxSequenceNumber); prepare_seq_ = seq_used; wpt_db_->AddPrepared(prepare_seq_); return s; @@ -61,6 +62,8 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { } Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { + // In the absenese of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(batch); const bool disable_memtable = true; const uint64_t no_log_ref = 0; uint64_t seq_used; diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index 385d16fe6..e311f4f42 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -69,6 +69,7 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, case kTypeLogData: *type = kLogDataRecord; break; + case kTypeNoop: case kTypeBeginPrepareXID: case kTypeEndPrepareXID: case kTypeCommitXID: