diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index f74232244..e5fd30e86 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1532,8 +1532,12 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, InstrumentedMutexLock guard_lock(&mutex_); WriteThread::Writer w; + WriteThread::Writer nonmem_w; if (!writes_stopped) { write_thread_.EnterUnbatched(&w, &mutex_); + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } } if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { @@ -1596,6 +1600,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, if (!writes_stopped) { write_thread_.ExitUnbatched(&w); + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } } } TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush"); @@ -1650,8 +1657,12 @@ Status DBImpl::AtomicFlushMemTables( InstrumentedMutexLock guard_lock(&mutex_); WriteThread::Writer w; + WriteThread::Writer nonmem_w; if (!writes_stopped) { write_thread_.EnterUnbatched(&w, &mutex_); + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } } for (auto cfd : column_family_datas) { @@ -1695,6 +1706,9 @@ Status DBImpl::AtomicFlushMemTables( if (!writes_stopped) { write_thread_.ExitUnbatched(&w); + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } } } TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush"); diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index d783355ce..566c17573 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -105,7 +105,16 @@ Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) { if (cfd == nullptr) { cfd = default_cf_handle_->cfd(); } - return SwitchMemtable(cfd, &write_context); + + if (two_write_queues_) { + WriteThread::Writer nonmem_w; + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + Status s = SwitchMemtable(cfd, &write_context); + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + return s; + } else { + return SwitchMemtable(cfd, &write_context); + } } Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 4ab9de8c4..34193cabc 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1246,6 +1246,11 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { } MaybeFlushStatsCF(&cfds); } + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + for (const auto cfd : cfds) { cfd->Ref(); status = SwitchMemtable(cfd, write_context); @@ -1254,6 +1259,10 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { break; } } + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + if (status.ok()) { if (immutable_db_options_.atomic_flush) { AssignAtomicFlushSeq(cfds); @@ -1314,6 +1323,10 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { MaybeFlushStatsCF(&cfds); } + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } for (const auto cfd : cfds) { if (cfd->mem()->IsEmpty()) { continue; @@ -1325,6 +1338,10 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { break; } } + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + if (status.ok()) { if (immutable_db_options_.atomic_flush) { AssignAtomicFlushSeq(cfds); @@ -1530,6 +1547,11 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { MaybeFlushStatsCF(&cfds); } Status status; + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + for (auto& cfd : cfds) { if (!cfd->mem()->IsEmpty()) { status = SwitchMemtable(cfd, context); @@ -1542,6 +1564,11 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { break; } } + + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + if (status.ok()) { if (immutable_db_options_.atomic_flush) { AssignAtomicFlushSeq(cfds); @@ -1572,15 +1599,11 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/, // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue +// REQUIRES: this thread is currently at the front of the 2nd writer queue if +// two_write_queues_ is true (This is to simplify the reasoning.) Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld(); WriteThread::Writer nonmem_w; - if (two_write_queues_) { - // SwitchMemtable is a rare event. To simply the reasoning, we make sure - // that there is no concurrent thread writing to WAL. - nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); - } - std::unique_ptr lfile; log::Writer* new_log = nullptr; MemTable* new_mem = nullptr; @@ -1687,10 +1710,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable); // Read back bg_error in order to get the right severity s = error_handler_.GetBGError(); - - if (two_write_queues_) { - nonmem_write_thread_.ExitUnbatched(&nonmem_w); - } return s; } @@ -1721,9 +1740,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { NotifyOnMemTableSealed(cfd, memtable_info); mutex_.Lock(); #endif // ROCKSDB_LITE - if (two_write_queues_) { - nonmem_write_thread_.ExitUnbatched(&nonmem_w); - } return s; } diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 7fe325e3c..3a059773f 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -2750,6 +2750,26 @@ TEST_P(ExternalSSTFileTest, Destroy(options, true /* delete_cf_paths */); } +TEST_P(ExternalSSTFileTest, IngestFilesTriggerFlushingWithTwoWriteQueue) { + Options options = CurrentOptions(); + // Use large buffer to avoid memtable flush + options.write_buffer_size = 1024 * 1024; + options.two_write_queues = true; + DestroyAndReopen(options); + + ASSERT_OK(dbfull()->Put(WriteOptions(), "1000", "v1")); + ASSERT_OK(dbfull()->Put(WriteOptions(), "1001", "v1")); + ASSERT_OK(dbfull()->Put(WriteOptions(), "9999", "v1")); + + // Put one key which is overlap with keys in memtable. + // It will trigger flushing memtable and require this thread is + // currently at the front of the 2nd writer queue. We must make + // sure that it won't enter the 2nd writer queue for the second time. + std::vector> data; + data.push_back(std::make_pair("1001", "v2")); + GenerateAndAddExternalFile(options, data); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest, testing::Values(std::make_tuple(false, false), std::make_tuple(false, true),