Fix race condition in SwitchMemtable

Summary:
MemTableList::current_ could be written by background flush thread and
simultaneously read in the user thread (NumNotFlushed() is used in
SwitchMemtable()). Use the lock to prevent this case. Found the error from tsan.

Related: D58833

Test Plan:
  $ OPT=-g COMPILE_WITH_TSAN=1 make -j64 db_test
  $ TEST_TMPDIR=/dev/shm/rocksdb ./db_test --gtest_filter=DBTest.RepeatedWritesToSameKey

Reviewers: lightmark, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D59139
This commit is contained in:
Andrew Kryczka 2016-06-02 17:11:45 -07:00
parent 88acd932f6
commit 842958651f

View File

@ -4874,7 +4874,7 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* cfd,
for (auto listener : db_options_.listeners) { for (auto listener : db_options_.listeners) {
listener->OnMemTableSealed(mem_table_info); listener->OnMemTableSealed(mem_table_info);
} }
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
@ -4910,7 +4910,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
memtable_info.num_entries = cfd->mem()->num_entries(); memtable_info.num_entries = cfd->mem()->num_entries();
memtable_info.num_deletes = cfd->mem()->num_deletes(); memtable_info.num_deletes = cfd->mem()->num_deletes();
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Log this later after lock release. It may be outdated, e.g., if background
// flush happens before logging, but that should be ok.
int num_imm_unflushed = cfd->imm()->NumNotFlushed();
mutex_.Unlock(); mutex_.Unlock();
Status s; Status s;
{ {
@ -4958,7 +4960,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] New memtable created with log file: #%" PRIu64 "[%s] New memtable created with log file: #%" PRIu64
". Immutable memtables: %d.\n", ". Immutable memtables: %d.\n",
cfd->GetName().c_str(), new_log_number, cfd->imm()->NumNotFlushed()); cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
mutex_.Lock(); mutex_.Lock();
if (!s.ok()) { if (!s.ok()) {
// how do we fail if we're not creating new log? // how do we fail if we're not creating new log?