diff --git a/HISTORY.md b/HISTORY.md index 64137894d..f24b33d15 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -15,6 +15,7 @@ * Fixed a bug that DB flush uses `options.compression` even `options.compression_per_level` is set. * Fixed a bug that DisableManualCompaction may assert when disable an unscheduled manual compaction. * Fixed a potential timer crash when open close DB concurrently. +* Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled. ### Public API changes * Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect. diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 8e4b9d048..320c07463 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1789,7 +1789,8 @@ class DBImpl : public DB { // associated with this WriteToWAL IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - Env::IOPriority rate_limiter_priority); + Env::IOPriority rate_limiter_priority, + bool with_db_mutex = false, bool with_log_mutex = false); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, @@ -2115,12 +2116,15 @@ class DBImpl : public DB { bool persistent_stats_cfd_exists_ = true; // Without two_write_queues, read and writes to alive_log_files_ are - // protected by mutex_. However since back() is never popped, and push_back() - // is done only from write_thread_, the same thread can access the item - // reffered by back() without mutex_. With two_write_queues_, writes + // protected by mutex_. With two_write_queues_, writes // are protected by locking both mutex_ and log_write_mutex_, and reads must // be under either mutex_ or log_write_mutex_. std::deque alive_log_files_; + // Caching the result of `alive_log_files_.back()` so that we do not have to + // call `alive_log_files_.back()` in the write thread (WriteToWAL()) which + // requires locking db mutex if log_mutex_ is not already held in + // two-write-queues mode. + std::deque::reverse_iterator alive_log_files_tail_; // Log files that aren't fully synced, and the current log file. // Synchronization: // - push_back() is done from write_thread_ with locked mutex_ and diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 101da5fa1..94ad8bf85 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1364,6 +1364,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { total_log_size_ += log.size; alive_log_files_.push_back(log); } + alive_log_files_tail_ = alive_log_files_.rbegin(); if (two_write_queues_) { log_write_mutex_.Unlock(); } @@ -1775,6 +1776,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); + impl->alive_log_files_tail_ = impl->alive_log_files_.rbegin(); if (impl->two_write_queues_) { impl->log_write_mutex_.Unlock(); } @@ -1796,7 +1798,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, uint64_t log_used, log_size; log::Writer* log_writer = impl->logs_.back().writer; s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size, - Env::IO_TOTAL); + Env::IO_TOTAL, /*with_db_mutex==*/true); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index bb9eae90f..f8200a26e 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1171,8 +1171,22 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - Env::IOPriority rate_limiter_priority) { + Env::IOPriority rate_limiter_priority, + bool with_db_mutex, bool with_log_mutex) { assert(log_size != nullptr); + + // Assert mutex explicitly. + if (with_db_mutex) { + mutex_.AssertHeld(); + } else if (two_write_queues_) { + log_write_mutex_.AssertHeld(); + assert(with_log_mutex); + } + +#ifdef NDEBUG + (void)with_log_mutex; +#endif + Slice log_entry = WriteBatchInternal::Contents(&merged_batch); *log_size = log_entry.size(); // When two_write_queues_ WriteToWAL has to be protected from concurretn calls @@ -1195,9 +1209,20 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, *log_used = logfile_number_; } total_log_size_ += log_entry.size(); - // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here - // since alive_log_files_ might be modified concurrently - alive_log_files_.back().AddSize(log_entry.size()); +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) + if (with_db_mutex || with_log_mutex) { +#endif // __has_feature(thread_sanitizer) +#endif // defined(__has_feature) + assert(alive_log_files_tail_ != alive_log_files_.rend()); + assert(alive_log_files_tail_ == alive_log_files_.rbegin()); +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) + } +#endif // __has_feature(thread_sanitizer) +#endif // defined(__has_feature) + LogFileNumberSize& last_alive_log = *alive_log_files_tail_; + last_alive_log.AddSize(*log_size); log_empty_ = false; return io_s; } @@ -1207,6 +1232,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, bool need_log_sync, bool need_log_dir_sync, SequenceNumber sequence) { IOStatus io_s; + assert(!two_write_queues_); assert(!write_group.leader->disable_wal); // Same holds for all in the batch group size_t write_with_wal = 0; @@ -1295,6 +1321,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL( SequenceNumber* last_sequence, size_t seq_inc) { IOStatus io_s; + assert(two_write_queues_ || immutable_db_options_.unordered_write); assert(!write_group.leader->disable_wal); // Same holds for all in the batch group WriteBatch tmp_batch; @@ -1320,7 +1347,8 @@ IOStatus DBImpl::ConcurrentWriteToWAL( log::Writer* log_writer = logs_.back().writer; uint64_t log_size; io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, - write_group.leader->rate_limiter_priority); + write_group.leader->rate_limiter_priority, + /*with_db_mutex=*/false, /*with_log_mutex=*/true); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1974,6 +2002,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log_dir_synced_ = false; logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); + alive_log_files_tail_ = alive_log_files_.rbegin(); } log_write_mutex_.Unlock(); }