From ccaadd8705ea7c5a6e2d0df4358791f47e6db296 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 14 Mar 2022 18:49:55 -0700 Subject: [PATCH] Fix a TSAN-reported bug caused by concurrent accesss to std::deque (#9686) Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/9686 According to https://www.cplusplus.com/reference/deque/deque/back/, " The container is accessed (neither the const nor the non-const versions modify the container). The last element is potentially accessed or modified by the caller. Concurrently accessing or modifying other elements is safe. " Also according to https://www.cplusplus.com/reference/deque/deque/pop_front/, " The container is modified. The first element is modified. Concurrently accessing or modifying other elements is safe (although see iterator validity above). " In RocksDB, we never pop the last element of `DBImpl::alive_log_files_`. We have been exploiting this fact and the above two properties when ensuring correctness when `DBImpl::alive_log_files_` may be accessed concurrently. Specifically, it can be accessed in the write path when db mutex is released. Sometimes, the log_mute_ is held. It can also be accessed in `FindObsoleteFiles()` when db mutex is always held. It can also be accessed during recovery when db mutex is also held. Given the fact that we never pop the last element of alive_log_files_, we currently do not acquire additional locks when accessing it in `WriteToWAL()` as follows ``` alive_log_files_.back().AddSize(log_entry.size()); ``` This is problematic. Check source code of deque.h ``` back() _GLIBCXX_NOEXCEPT { __glibcxx_requires_nonempty(); ... } pop_front() _GLIBCXX_NOEXCEPT { ... if (this->_M_impl._M_start._M_cur != this->_M_impl._M_start._M_last - 1) { ... ++this->_M_impl._M_start._M_cur; } ... } ``` `back()` will actually call `__glibcxx_requires_nonempty()` first. If `__glibcxx_requires_nonempty()` is enabled and not an empty macro, it will call `empty()` ``` bool empty() { return this->_M_impl._M_finish == this->_M_impl._M_start; } ``` You can see that it will access `this->_M_impl._M_start`, racing with `pop_front()`. Therefore, TSAN will actually catch the bug in this case. To be able to use TSAN on our library and unit tests, we should always coordinate concurrent accesses to STL containers properly. We need to pass information about db mutex and log mutex into `WriteToWAL()`, otherwise it's impossible to know which mutex to acquire inside the function. To fix this, we can catch the tail of `alive_log_files_` by reference, so that we do not have to call `back()` in `WriteToWAL()`. Reviewed By: pdillinger Differential Revision: D34780309 fbshipit-source-id: 1def9821f0c437f2736c6a26445d75890377889b --- HISTORY.md | 4 ++++ db/db_impl/db_impl.h | 12 ++++++++---- db/db_impl/db_impl_open.cc | 5 ++++- db/db_impl/db_impl_write.cc | 39 ++++++++++++++++++++++++++++++++----- 4 files changed, 50 insertions(+), 10 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 6695e76fb..f2a6c58b3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,8 @@ # Rocksdb Change Log +## Unreleased +### Bug Fixes +* Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled. + ## 6.29.4 (03/22/2022) ### Bug Fixes * Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data. diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 54aab499a..036b9ba72 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1760,7 +1760,8 @@ class DBImpl : public DB { WriteBatch** to_be_cached_state); IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, - uint64_t* log_used, uint64_t* log_size); + uint64_t* log_used, uint64_t* log_size, + bool with_db_mutex = false, bool with_log_mutex = false); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, @@ -2086,12 +2087,15 @@ class DBImpl : public DB { bool persistent_stats_cfd_exists_ = true; // Without two_write_queues, read and writes to alive_log_files_ are - // protected by mutex_. However since back() is never popped, and push_back() - // is done only from write_thread_, the same thread can access the item - // reffered by back() without mutex_. With two_write_queues_, writes + // protected by mutex_. With two_write_queues_, writes // are protected by locking both mutex_ and log_write_mutex_, and reads must // be under either mutex_ or log_write_mutex_. std::deque alive_log_files_; + // Caching the result of `alive_log_files_.back()` so that we do not have to + // call `alive_log_files_.back()` in the write thread (WriteToWAL()) which + // requires locking db mutex if log_mutex_ is not already held in + // two-write-queues mode. + std::deque::reverse_iterator alive_log_files_tail_; // Log files that aren't fully synced, and the current log file. // Synchronization: // - push_back() is done from write_thread_ with locked mutex_ and diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 2fe4000dd..98d53a7d8 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1362,6 +1362,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { total_log_size_ += log.size; alive_log_files_.push_back(log); } + alive_log_files_tail_ = alive_log_files_.rbegin(); if (two_write_queues_) { log_write_mutex_.Unlock(); } @@ -1705,6 +1706,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); + impl->alive_log_files_tail_ = impl->alive_log_files_.rbegin(); if (impl->two_write_queues_) { impl->log_write_mutex_.Unlock(); } @@ -1725,7 +1727,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, WriteOptions write_options; uint64_t log_used, log_size; log::Writer* log_writer = impl->logs_.back().writer; - s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size); + s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size, + /*with_db_mutex==*/true); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 72a47d83a..2cc4da472 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1085,8 +1085,22 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, // write thread. Otherwise this must be called holding log_write_mutex_. IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, - uint64_t* log_size) { + uint64_t* log_size, + bool with_db_mutex, bool with_log_mutex) { assert(log_size != nullptr); + + // Assert mutex explicitly. + if (with_db_mutex) { + mutex_.AssertHeld(); + } else if (two_write_queues_) { + log_write_mutex_.AssertHeld(); + assert(with_log_mutex); + } + +#ifdef NDEBUG + (void)with_log_mutex; +#endif + Slice log_entry = WriteBatchInternal::Contents(&merged_batch); *log_size = log_entry.size(); // When two_write_queues_ WriteToWAL has to be protected from concurretn calls @@ -1109,9 +1123,20 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, *log_used = logfile_number_; } total_log_size_ += log_entry.size(); - // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here - // since alive_log_files_ might be modified concurrently - alive_log_files_.back().AddSize(log_entry.size()); +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) + if (with_db_mutex || with_log_mutex) { +#endif // __has_feature(thread_sanitizer) +#endif // defined(__has_feature) + assert(alive_log_files_tail_ != alive_log_files_.rend()); + assert(alive_log_files_tail_ == alive_log_files_.rbegin()); +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) + } +#endif // __has_feature(thread_sanitizer) +#endif // defined(__has_feature) + LogFileNumberSize& last_alive_log = *alive_log_files_tail_; + last_alive_log.AddSize(*log_size); log_empty_ = false; return io_s; } @@ -1121,6 +1146,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, bool need_log_sync, bool need_log_dir_sync, SequenceNumber sequence) { IOStatus io_s; + assert(!two_write_queues_); assert(!write_group.leader->disable_wal); // Same holds for all in the batch group size_t write_with_wal = 0; @@ -1208,6 +1234,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL( SequenceNumber* last_sequence, size_t seq_inc) { IOStatus io_s; + assert(two_write_queues_ || immutable_db_options_.unordered_write); assert(!write_group.leader->disable_wal); // Same holds for all in the batch group WriteBatch tmp_batch; @@ -1232,7 +1259,8 @@ IOStatus DBImpl::ConcurrentWriteToWAL( log::Writer* log_writer = logs_.back().writer; uint64_t log_size; - io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, + /*with_db_mutex=*/false, /*with_log_mutex=*/true); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1886,6 +1914,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log_dir_synced_ = false; logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); + alive_log_files_tail_ = alive_log_files_.rbegin(); } log_write_mutex_.Unlock(); }