diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index d0a22321a..6d16b8331 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -252,7 +252,10 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } job_context->size_log_to_delete += earliest.size; total_log_size_ -= earliest.size; - alive_log_files_.pop_front(); + { + InstrumentedMutexLock wl(&log_write_mutex_); + alive_log_files_.pop_front(); + } // Current log should always stay alive since it can't have // number < MinLogNumber(). assert(alive_log_files_.size()); diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 55b19d8e4..74f0fea5e 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -656,7 +656,6 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, // contains one batch, that batch should be written to the WAL, // and the batch is not wanting to be truncated merged_batch = leader->batch; - leader->log_used = logfile_number_; *write_with_wal = 1; } else { // WAL needs all of the batches flattened into a single batch. @@ -669,7 +668,6 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, /*WAL_only*/ true); (*write_with_wal)++; } - writer->log_used = logfile_number_; } } return merged_batch; @@ -700,6 +698,13 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, size_t write_with_wal = 0; WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_, &write_with_wal); + if (merged_batch == write_group.leader->batch) { + write_group.leader->log_used = logfile_number_; + } else if (write_with_wal > 1) { + for (auto writer : write_group) { + writer->log_used = logfile_number_; + } + } WriteBatchInternal::SetSequence(merged_batch, sequence); @@ -760,6 +765,13 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, // We need to lock log_write_mutex_ since logs_ and alive_log_files might be // pushed back concurrently log_write_mutex_.Lock(); + if (merged_batch == write_group.leader->batch) { + write_group.leader->log_used = logfile_number_; + } else if (write_with_wal > 1) { + for (auto writer : write_group) { + writer->log_used = logfile_number_; + } + } *last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count); auto sequence = *last_sequence + 1; WriteBatchInternal::SetSequence(merged_batch, sequence); @@ -1008,6 +1020,11 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* cfd, // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld(); + WriteThread::Writer nonmem_w; + if (concurrent_prepare_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + unique_ptr lfile; log::Writer* new_log = nullptr; MemTable* new_mem = nullptr; @@ -1021,7 +1038,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { // Attempt to switch to a new memtable and trigger flush of old. // Do this without holding the dbmutex lock. assert(versions_->prev_log_number() == 0); + //log_write_mutex_.Lock(); bool creating_new_log = !log_empty_; + //log_write_mutex_.Unlock(); uint64_t recycle_log_number = 0; if (creating_new_log && immutable_db_options_.recycle_log_file_num && !log_recycle_files.empty()) { @@ -1106,14 +1125,17 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { assert(creating_new_log); assert(!new_mem); assert(!new_log); + if (concurrent_prepare_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } return s; } if (creating_new_log) { + log_write_mutex_.Lock(); logfile_number_ = new_log_number; assert(new_log != nullptr); log_empty_ = true; log_dir_synced_ = false; - log_write_mutex_.Lock(); if (!logs_.empty()) { // Alway flush the buffer of the last log before switching to a new one log::Writer* cur_log_writer = logs_.back().writer; @@ -1143,6 +1165,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { cfd->SetMemtable(new_mem); context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork( cfd, new_superversion, mutable_cf_options)); + if (concurrent_prepare_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } return s; }