Fix race condition via concurrent FlushWAL

Summary:
Currently log_writer->AddRecord in WriteImpl is protected from concurrent calls via FlushWAL only if two_write_queues_ option is set. The patch fixes the problem by i) skip log_writer->AddRecord in FlushWAL if manual_wal_flush is not set, ii) protects log_writer->AddRecord in WriteImpl via log_write_mutex_ if manual_wal_flush_ is set but two_write_queues_ is not.

Fixes #3599
Closes https://github.com/facebook/rocksdb/pull/3656

Differential Revision: D7405608

Pulled By: maysamyabandeh

fbshipit-source-id: d6cc265051c77ae49c7c6df4f427350baaf46934
This commit is contained in:
Maysam Yabandeh 2018-03-26 16:16:59 -07:00 committed by Facebook Github Bot
parent 23f9d93f47
commit 35a4469bbf
6 changed files with 71 additions and 4 deletions

View File

@ -6,6 +6,7 @@
### Bug Fixes
* Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob.
* Fix WAL corruption caused by race condition between user write thread and FlushWAL when two_write_queue is not set.
### Java API Changes
* Add `BlockBasedTableConfig.setBlockCache` to allow sharing a block cache across DB instances.

View File

@ -693,7 +693,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(
}
Status DBImpl::FlushWAL(bool sync) {
{
if (manual_wal_flush_) {
// We need to lock log_write_mutex_ since logs_ might change concurrently
InstrumentedMutexLock wl(&log_write_mutex_);
log::Writer* cur_log_writer = logs_.back().writer;
@ -707,6 +707,9 @@ Status DBImpl::FlushWAL(bool sync) {
return s;
}
}
if (!sync) {
return Status::OK();
}
// sync = true
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
return SyncWAL();

View File

@ -737,6 +737,7 @@ class DBImpl : public DB {
#endif
friend struct SuperVersion;
friend class CompactedDBImpl;
friend class DBTest_ConcurrentFlushWAL_Test;
#ifndef NDEBUG
friend class DBTest2_ReadCallbackTest_Test;
friend class WriteCallbackTest_WriteWithCallbackTest_Test;

View File

@ -808,7 +808,21 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
assert(log_size != nullptr);
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
*log_size = log_entry.size();
// When two_write_queues_ WriteToWAL has to be protected from concurretn calls
// from the two queues anyway and log_write_mutex_ is already held. Otherwise
// if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord
// from possible concurrent calls via the FlushWAL by the application.
const bool needs_locking = manual_wal_flush_ && !two_write_queues_;
// Due to performance cocerns of missed branch prediction penalize the new
// manual_wal_flush_ feature (by UNLIKELY) instead of the more common case
// when we do not need any locking.
if (UNLIKELY(needs_locking)) {
log_write_mutex_.Lock();
}
Status status = log_writer->AddRecord(log_entry);
if (UNLIKELY(needs_locking)) {
log_write_mutex_.Unlock();
}
if (log_used != nullptr) {
*log_used = logfile_number_;
}

View File

@ -3348,6 +3348,56 @@ TEST_F(DBTest, WriteSingleThreadEntry) {
}
}
TEST_F(DBTest, ConcurrentFlushWAL) {
const size_t cnt = 100;
Options options;
WriteOptions wopt;
ReadOptions ropt;
for (bool two_write_queues : {false, true}) {
for (bool manual_wal_flush : {false, true}) {
options.two_write_queues = two_write_queues;
options.manual_wal_flush = manual_wal_flush;
options.create_if_missing = true;
DestroyAndReopen(options);
std::vector<port::Thread> threads;
threads.emplace_back([&] {
for (size_t i = 0; i < cnt; i++) {
auto istr = ToString(i);
db_->Put(wopt, db_->DefaultColumnFamily(), "a" + istr, "b" + istr);
}
});
if (two_write_queues) {
threads.emplace_back([&] {
for (size_t i = cnt; i < 2 * cnt; i++) {
auto istr = ToString(i);
WriteBatch batch;
batch.Put("a" + istr, "b" + istr);
dbfull()->WriteImpl(wopt, &batch, nullptr, nullptr, 0, true);
}
});
}
threads.emplace_back([&] {
for (size_t i = 0; i < cnt * 100; i++) { // FlushWAL is faster than Put
db_->FlushWAL(false);
}
});
for (auto& t : threads) {
t.join();
}
options.create_if_missing = false;
// Recover from the wal and make sure that it is not corrupted
Reopen(options);
for (size_t i = 0; i < cnt; i++) {
PinnableSlice pval;
auto istr = ToString(i);
ASSERT_OK(
db_->Get(ropt, db_->DefaultColumnFamily(), "a" + istr, &pval));
ASSERT_TRUE(pval == ("b" + istr));
}
}
}
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, DynamicMemtableOptions) {
const uint64_t k64KB = 1 << 16;

View File

@ -222,9 +222,7 @@ Status CheckpointImpl::CreateCustomCheckpoint(
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
if (db_options.manual_wal_flush) {
db_->FlushWAL(false /* sync */);
}
db_->FlushWAL(false /* sync */);
}
// if we have more than one column family, we need to also get WAL files
if (s.ok()) {