From 54b43563be4add99fcc49329769fcb715c26b8b9 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Wed, 15 Nov 2017 08:22:54 -0800 Subject: [PATCH] WritePrepared Txn: Refactoring WriteCallback Summary: Refactor the logic around WriteCallback in the write path to clarify when and how exactly we advance the sequence number and making sure it is consistent across the code. Closes https://github.com/facebook/rocksdb/pull/3168 Differential Revision: D6324312 Pulled By: maysamyabandeh fbshipit-source-id: 9a34f479561fdb2a5d01ef6d37a28908d03bbe33 --- db/db_impl.h | 6 ++++++ db/db_impl_write.cc | 35 ++++++++++++++++++++++++++++------- db/write_batch.cc | 7 +++++-- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index a7eddba24..4519b7169 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -662,6 +662,12 @@ class DBImpl : public DB { void EraseThreadStatusDbInfo() const; + // If disable_memtable is set the application logic must guarantee that the + // batch will still be skipped from memtable during the recovery. In + // WriteCommitted it is guarnateed since disable_memtable is used for prepare + // batch which will be written to memtable later during the commit, and in + // WritePrepared it is guaranteed since it will be used only for WAL markers + // which will never be written to memtable. Status WriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 51434ba8c..a569ddbae 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -195,9 +195,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool parallel = immutable_db_options_.allow_concurrent_memtable_write && write_group.size > 1; size_t total_count = 0; + size_t valid_batches = 0; uint64_t total_byte_size = 0; for (auto* writer : write_group) { if (writer->CheckCallback(this)) { + valid_batches++; if (writer->ShouldWriteToMemtable()) { total_count += WriteBatchInternal::Count(writer->batch); parallel = parallel && !writer->batch->HasMerge(); @@ -207,7 +209,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); } } - size_t seq_inc = seq_per_batch_ ? write_group.size : total_count; + // Note about seq_per_batch_: either disableWAL is set for the entire write + // group or not. In either case we inc seq for each write batch with no + // failed callback. This means that there could be a batch with + // disalbe_memtable in between; although we do not write this batch to + // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc + // the seq per valid written key to mem. + size_t seq_inc = seq_per_batch_ ? valid_batches : total_count; const bool concurrent_update = two_write_queues_; // Update stats while we are an exclusive group leader, so we know @@ -263,13 +271,21 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_GUARD(write_memtable_time); if (!parallel) { + // w.sequence will be set inside InsertInto w.status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*recovery_log_number*/, this, parallel, seq_per_batch_); } else { SequenceNumber next_sequence = current_sequence; + // Note: the logic for advancing seq here must be consistent with the + // logic in WriteBatchInternal::InsertInto(write_group...) as well as + // with WriteBatchInternal::InsertInto(write_batch...) that is called on + // the merged batch during recovery from the WAL. for (auto* writer : write_group) { + if (writer->CallbackFailed()) { + continue; + } writer->sequence = next_sequence; if (seq_per_batch_) { next_sequence++; @@ -538,14 +554,14 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); auto curr_seq = last_sequence + 1; for (auto* writer : write_group) { - if (writer->CheckCallback(this)) { - writer->sequence = curr_seq; + if (writer->CallbackFailed()) { + continue; } + writer->sequence = curr_seq; if (seq_per_batch_) { curr_seq++; - } else if (writer->CheckCallback(this)) { - curr_seq += WriteBatchInternal::Count(writer->batch); } + // else seq advances only by memtable writes } if (status.ok() && write_options.sync) { // Requesting sync with two_write_queues_ is expected to be very rare. We @@ -689,7 +705,8 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, WriteBatch* merged_batch = nullptr; *write_with_wal = 0; auto* leader = write_group.leader; - if (write_group.size == 1 && leader->ShouldWriteToWAL() && + assert(!leader->disable_wal); // Same holds for all in the batch group + if (write_group.size == 1 && !leader->CallbackFailed() && leader->batch->GetWalTerminationPoint().is_cleared()) { // we simply write the first WriteBatch to WAL if the group only // contains one batch, that batch should be written to the WAL, @@ -705,7 +722,7 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, // interface merged_batch = tmp_batch; for (auto writer : write_group) { - if (writer->ShouldWriteToWAL()) { + if (!writer->CallbackFailed()) { WriteBatchInternal::Append(merged_batch, writer->batch, /*WAL_only*/ true); if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) { @@ -745,6 +762,8 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, SequenceNumber sequence) { Status status; + assert(!write_group.leader->disable_wal); + // Same holds for all in the batch group size_t write_with_wal = 0; WriteBatch* to_be_cached_state = nullptr; WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_, @@ -812,6 +831,8 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, size_t seq_inc) { Status status; + assert(!write_group.leader->disable_wal); + // Same holds for all in the batch group WriteBatch tmp_batch; size_t write_with_wal = 0; WriteBatch* to_be_cached_state = nullptr; diff --git a/db/write_batch.cc b/db/write_batch.cc index 4bc4bae92..f87fd6f43 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1471,13 +1471,16 @@ Status WriteBatchInternal::InsertInto( db, concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch); for (auto w : write_group) { + if (w->CallbackFailed()) { + continue; + } + w->sequence = inserter.sequence(); if (!w->ShouldWriteToMemtable()) { - w->sequence = inserter.sequence(); + // In seq_per_batch_ mode this advances the seq by one. inserter.MaybeAdvanceSeq(true); continue; } SetSequence(w->batch, inserter.sequence()); - w->sequence = inserter.sequence(); inserter.set_log_number_ref(w->log_ref); w->status = w->batch->Iterate(&inserter); if (!w->status.ok()) {