Fix a TSAN-reported bug caused by concurrent accesss to std::deque (#9686)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9686

According to https://www.cplusplus.com/reference/deque/deque/back/,
"
The container is accessed (neither the const nor the non-const versions modify the container).
The last element is potentially accessed or modified by the caller. Concurrently accessing or modifying other elements is safe.
"

Also according to https://www.cplusplus.com/reference/deque/deque/pop_front/,
"
The container is modified.
The first element is modified. Concurrently accessing or modifying other elements is safe (although see iterator validity above).
"
In RocksDB, we never pop the last element of `DBImpl::alive_log_files_`. We have been
exploiting this fact and the above two properties when ensuring correctness when
`DBImpl::alive_log_files_` may be accessed concurrently. Specifically, it can be accessed
in the write path when db mutex is released. Sometimes, the log_mute_ is held. It can also be accessed in `FindObsoleteFiles()`
when db mutex is always held. It can also be accessed
during recovery when db mutex is also held.
Given the fact that we never pop the last element of alive_log_files_, we currently do not
acquire additional locks when accessing it in `WriteToWAL()` as follows
```
alive_log_files_.back().AddSize(log_entry.size());
```

This is problematic.

Check source code of deque.h
```
  back() _GLIBCXX_NOEXCEPT
  {
__glibcxx_requires_nonempty();
...
  }

  pop_front() _GLIBCXX_NOEXCEPT
  {
...
  if (this->_M_impl._M_start._M_cur
      != this->_M_impl._M_start._M_last - 1)
    {
      ...
      ++this->_M_impl._M_start._M_cur;
    }
  ...
  }
```

`back()` will actually call `__glibcxx_requires_nonempty()` first.
If `__glibcxx_requires_nonempty()` is enabled and not an empty macro,
it will call `empty()`
```
bool empty() {
return this->_M_impl._M_finish == this->_M_impl._M_start;
}
```
You can see that it will access `this->_M_impl._M_start`, racing with `pop_front()`.
Therefore, TSAN will actually catch the bug in this case.

To be able to use TSAN on our library and unit tests, we should always coordinate
concurrent accesses to STL containers properly.

We need to pass information about db mutex and log mutex into `WriteToWAL()`, otherwise
it's impossible to know which mutex to acquire inside the function.

To fix this, we can catch the tail of `alive_log_files_` by reference, so that we do not have to call `back()` in `WriteToWAL()`.

Reviewed By: pdillinger

Differential Revision: D34780309

fbshipit-source-id: 1def9821f0c437f2736c6a26445d75890377889b
This commit is contained in:
Yanqin Jin 2022-03-14 18:49:55 -07:00 committed by Facebook GitHub Bot
parent 9e05c5e251
commit bbdaf63d0f
4 changed files with 46 additions and 10 deletions

View File

@ -15,6 +15,7 @@
* Fixed a bug that DB flush uses `options.compression` even `options.compression_per_level` is set. * Fixed a bug that DB flush uses `options.compression` even `options.compression_per_level` is set.
* Fixed a bug that DisableManualCompaction may assert when disable an unscheduled manual compaction. * Fixed a bug that DisableManualCompaction may assert when disable an unscheduled manual compaction.
* Fixed a potential timer crash when open close DB concurrently. * Fixed a potential timer crash when open close DB concurrently.
* Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled.
### Public API changes ### Public API changes
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect. * Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.

View File

@ -1789,7 +1789,8 @@ class DBImpl : public DB {
// associated with this WriteToWAL // associated with this WriteToWAL
IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size, uint64_t* log_used, uint64_t* log_size,
Env::IOPriority rate_limiter_priority); Env::IOPriority rate_limiter_priority,
bool with_db_mutex = false, bool with_log_mutex = false);
IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used, log::Writer* log_writer, uint64_t* log_used,
@ -2115,12 +2116,15 @@ class DBImpl : public DB {
bool persistent_stats_cfd_exists_ = true; bool persistent_stats_cfd_exists_ = true;
// Without two_write_queues, read and writes to alive_log_files_ are // Without two_write_queues, read and writes to alive_log_files_ are
// protected by mutex_. However since back() is never popped, and push_back() // protected by mutex_. With two_write_queues_, writes
// is done only from write_thread_, the same thread can access the item
// reffered by back() without mutex_. With two_write_queues_, writes
// are protected by locking both mutex_ and log_write_mutex_, and reads must // are protected by locking both mutex_ and log_write_mutex_, and reads must
// be under either mutex_ or log_write_mutex_. // be under either mutex_ or log_write_mutex_.
std::deque<LogFileNumberSize> alive_log_files_; std::deque<LogFileNumberSize> alive_log_files_;
// Caching the result of `alive_log_files_.back()` so that we do not have to
// call `alive_log_files_.back()` in the write thread (WriteToWAL()) which
// requires locking db mutex if log_mutex_ is not already held in
// two-write-queues mode.
std::deque<LogFileNumberSize>::reverse_iterator alive_log_files_tail_;
// Log files that aren't fully synced, and the current log file. // Log files that aren't fully synced, and the current log file.
// Synchronization: // Synchronization:
// - push_back() is done from write_thread_ with locked mutex_ and // - push_back() is done from write_thread_ with locked mutex_ and

View File

@ -1364,6 +1364,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
total_log_size_ += log.size; total_log_size_ += log.size;
alive_log_files_.push_back(log); alive_log_files_.push_back(log);
} }
alive_log_files_tail_ = alive_log_files_.rbegin();
if (two_write_queues_) { if (two_write_queues_) {
log_write_mutex_.Unlock(); log_write_mutex_.Unlock();
} }
@ -1775,6 +1776,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
} }
impl->alive_log_files_.push_back( impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_)); DBImpl::LogFileNumberSize(impl->logfile_number_));
impl->alive_log_files_tail_ = impl->alive_log_files_.rbegin();
if (impl->two_write_queues_) { if (impl->two_write_queues_) {
impl->log_write_mutex_.Unlock(); impl->log_write_mutex_.Unlock();
} }
@ -1796,7 +1798,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
uint64_t log_used, log_size; uint64_t log_used, log_size;
log::Writer* log_writer = impl->logs_.back().writer; log::Writer* log_writer = impl->logs_.back().writer;
s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size, s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
Env::IO_TOTAL); Env::IO_TOTAL, /*with_db_mutex==*/true);
if (s.ok()) { if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset. // Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(false); s = impl->FlushWAL(false);

View File

@ -1171,8 +1171,22 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used, log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size, uint64_t* log_size,
Env::IOPriority rate_limiter_priority) { Env::IOPriority rate_limiter_priority,
bool with_db_mutex, bool with_log_mutex) {
assert(log_size != nullptr); assert(log_size != nullptr);
// Assert mutex explicitly.
if (with_db_mutex) {
mutex_.AssertHeld();
} else if (two_write_queues_) {
log_write_mutex_.AssertHeld();
assert(with_log_mutex);
}
#ifdef NDEBUG
(void)with_log_mutex;
#endif
Slice log_entry = WriteBatchInternal::Contents(&merged_batch); Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
*log_size = log_entry.size(); *log_size = log_entry.size();
// When two_write_queues_ WriteToWAL has to be protected from concurretn calls // When two_write_queues_ WriteToWAL has to be protected from concurretn calls
@ -1195,9 +1209,20 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
*log_used = logfile_number_; *log_used = logfile_number_;
} }
total_log_size_ += log_entry.size(); total_log_size_ += log_entry.size();
// TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here #if defined(__has_feature)
// since alive_log_files_ might be modified concurrently #if __has_feature(thread_sanitizer)
alive_log_files_.back().AddSize(log_entry.size()); if (with_db_mutex || with_log_mutex) {
#endif // __has_feature(thread_sanitizer)
#endif // defined(__has_feature)
assert(alive_log_files_tail_ != alive_log_files_.rend());
assert(alive_log_files_tail_ == alive_log_files_.rbegin());
#if defined(__has_feature)
#if __has_feature(thread_sanitizer)
}
#endif // __has_feature(thread_sanitizer)
#endif // defined(__has_feature)
LogFileNumberSize& last_alive_log = *alive_log_files_tail_;
last_alive_log.AddSize(*log_size);
log_empty_ = false; log_empty_ = false;
return io_s; return io_s;
} }
@ -1207,6 +1232,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
bool need_log_sync, bool need_log_dir_sync, bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence) { SequenceNumber sequence) {
IOStatus io_s; IOStatus io_s;
assert(!two_write_queues_);
assert(!write_group.leader->disable_wal); assert(!write_group.leader->disable_wal);
// Same holds for all in the batch group // Same holds for all in the batch group
size_t write_with_wal = 0; size_t write_with_wal = 0;
@ -1295,6 +1321,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL(
SequenceNumber* last_sequence, size_t seq_inc) { SequenceNumber* last_sequence, size_t seq_inc) {
IOStatus io_s; IOStatus io_s;
assert(two_write_queues_ || immutable_db_options_.unordered_write);
assert(!write_group.leader->disable_wal); assert(!write_group.leader->disable_wal);
// Same holds for all in the batch group // Same holds for all in the batch group
WriteBatch tmp_batch; WriteBatch tmp_batch;
@ -1320,7 +1347,8 @@ IOStatus DBImpl::ConcurrentWriteToWAL(
log::Writer* log_writer = logs_.back().writer; log::Writer* log_writer = logs_.back().writer;
uint64_t log_size; uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
write_group.leader->rate_limiter_priority); write_group.leader->rate_limiter_priority,
/*with_db_mutex=*/false, /*with_log_mutex=*/true);
if (to_be_cached_state) { if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false; cached_recoverable_state_empty_ = false;
@ -1974,6 +2002,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
log_dir_synced_ = false; log_dir_synced_ = false;
logs_.emplace_back(logfile_number_, new_log); logs_.emplace_back(logfile_number_, new_log);
alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
alive_log_files_tail_ = alive_log_files_.rbegin();
} }
log_write_mutex_.Unlock(); log_write_mutex_.Unlock();
} }