diff --git a/db/db_impl.h b/db/db_impl.h index a7eddba24..4519b7169 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -662,6 +662,12 @@ class DBImpl : public DB { void EraseThreadStatusDbInfo() const; + // If disable_memtable is set the application logic must guarantee that the + // batch will still be skipped from memtable during the recovery. In + // WriteCommitted it is guarnateed since disable_memtable is used for prepare + // batch which will be written to memtable later during the commit, and in + // WritePrepared it is guaranteed since it will be used only for WAL markers + // which will never be written to memtable. Status WriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 51434ba8c..a569ddbae 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -195,9 +195,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool parallel = immutable_db_options_.allow_concurrent_memtable_write && write_group.size > 1; size_t total_count = 0; + size_t valid_batches = 0; uint64_t total_byte_size = 0; for (auto* writer : write_group) { if (writer->CheckCallback(this)) { + valid_batches++; if (writer->ShouldWriteToMemtable()) { total_count += WriteBatchInternal::Count(writer->batch); parallel = parallel && !writer->batch->HasMerge(); @@ -207,7 +209,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); } } - size_t seq_inc = seq_per_batch_ ? write_group.size : total_count; + // Note about seq_per_batch_: either disableWAL is set for the entire write + // group or not. In either case we inc seq for each write batch with no + // failed callback. This means that there could be a batch with + // disalbe_memtable in between; although we do not write this batch to + // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc + // the seq per valid written key to mem. + size_t seq_inc = seq_per_batch_ ? valid_batches : total_count; const bool concurrent_update = two_write_queues_; // Update stats while we are an exclusive group leader, so we know @@ -263,13 +271,21 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_GUARD(write_memtable_time); if (!parallel) { + // w.sequence will be set inside InsertInto w.status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*recovery_log_number*/, this, parallel, seq_per_batch_); } else { SequenceNumber next_sequence = current_sequence; + // Note: the logic for advancing seq here must be consistent with the + // logic in WriteBatchInternal::InsertInto(write_group...) as well as + // with WriteBatchInternal::InsertInto(write_batch...) that is called on + // the merged batch during recovery from the WAL. for (auto* writer : write_group) { + if (writer->CallbackFailed()) { + continue; + } writer->sequence = next_sequence; if (seq_per_batch_) { next_sequence++; @@ -538,14 +554,14 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); auto curr_seq = last_sequence + 1; for (auto* writer : write_group) { - if (writer->CheckCallback(this)) { - writer->sequence = curr_seq; + if (writer->CallbackFailed()) { + continue; } + writer->sequence = curr_seq; if (seq_per_batch_) { curr_seq++; - } else if (writer->CheckCallback(this)) { - curr_seq += WriteBatchInternal::Count(writer->batch); } + // else seq advances only by memtable writes } if (status.ok() && write_options.sync) { // Requesting sync with two_write_queues_ is expected to be very rare. We @@ -689,7 +705,8 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, WriteBatch* merged_batch = nullptr; *write_with_wal = 0; auto* leader = write_group.leader; - if (write_group.size == 1 && leader->ShouldWriteToWAL() && + assert(!leader->disable_wal); // Same holds for all in the batch group + if (write_group.size == 1 && !leader->CallbackFailed() && leader->batch->GetWalTerminationPoint().is_cleared()) { // we simply write the first WriteBatch to WAL if the group only // contains one batch, that batch should be written to the WAL, @@ -705,7 +722,7 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, // interface merged_batch = tmp_batch; for (auto writer : write_group) { - if (writer->ShouldWriteToWAL()) { + if (!writer->CallbackFailed()) { WriteBatchInternal::Append(merged_batch, writer->batch, /*WAL_only*/ true); if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) { @@ -745,6 +762,8 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, SequenceNumber sequence) { Status status; + assert(!write_group.leader->disable_wal); + // Same holds for all in the batch group size_t write_with_wal = 0; WriteBatch* to_be_cached_state = nullptr; WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_, @@ -812,6 +831,8 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, size_t seq_inc) { Status status; + assert(!write_group.leader->disable_wal); + // Same holds for all in the batch group WriteBatch tmp_batch; size_t write_with_wal = 0; WriteBatch* to_be_cached_state = nullptr; diff --git a/db/write_batch.cc b/db/write_batch.cc index 4bc4bae92..f87fd6f43 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1471,13 +1471,16 @@ Status WriteBatchInternal::InsertInto( db, concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch); for (auto w : write_group) { + if (w->CallbackFailed()) { + continue; + } + w->sequence = inserter.sequence(); if (!w->ShouldWriteToMemtable()) { - w->sequence = inserter.sequence(); + // In seq_per_batch_ mode this advances the seq by one. inserter.MaybeAdvanceSeq(true); continue; } SetSequence(w->batch, inserter.sequence()); - w->sequence = inserter.sequence(); inserter.set_log_number_ref(w->log_ref); w->status = w->batch->Iterate(&inserter); if (!w->status.ok()) {