diff --git a/db/db_impl.cc b/db/db_impl.cc index 9a1d40d57..98adb8ee9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -6405,16 +6405,18 @@ Status DBImpl::IngestExternalFile( return status; } + TEST_SYNC_POINT("DBImpl::AddFile:Start"); { // Lock db mutex InstrumentedMutexLock l(&mutex_); TEST_SYNC_POINT("DBImpl::AddFile:MutexLock"); - num_running_ingest_file_++; // Stop writes to the DB WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); + num_running_ingest_file_++; + // Figure out if we need to flush the memtable first bool need_flush = false; status = ingestion_job.NeedsFlush(&need_flush); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 2762249c9..04681275e 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1685,6 +1685,82 @@ TEST_F(ExternalSSTFileTest, L0SortingIssue) { ASSERT_EQ(Get(Key(7)), "memtable"); ASSERT_EQ(Get(Key(8)), "memtable"); } + +TEST_F(ExternalSSTFileTest, CompactionDeadlock) { + Options options = CurrentOptions(); + options.num_levels = 2; + options.level0_file_num_compaction_trigger = 4; + options.level0_slowdown_writes_trigger = 4; + options.level0_stop_writes_trigger = 4; + DestroyAndReopen(options); + + // atomic conter of currently running bg threads + std::atomic running_threads(0); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::DelayWrite:Wait", "ExternalSSTFileTest::DeadLock:0"}, + {"ExternalSSTFileTest::DeadLock:1", "DBImpl::AddFile:Start"}, + {"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::DeadLock:2"}, + {"ExternalSSTFileTest::DeadLock:3", "BackgroundCallCompaction:0"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Start ingesting and extrnal file in the background + std::thread bg_ingest_file([&]() { + running_threads += 1; + ASSERT_OK(GenerateAndAddExternalFile(options, {5, 6})); + running_threads -= 1; + }); + + ASSERT_OK(Put(Key(1), "memtable")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(2), "memtable")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(3), "memtable")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(4), "memtable")); + ASSERT_OK(Flush()); + + // This thread will try to insert into the memtable but since we have 4 L0 + // files this thread will be blocked and hold the writer thread + std::thread bg_block_put([&]() { + running_threads += 1; + ASSERT_OK(Put(Key(10), "memtable")); + running_threads -= 1; + }); + + // Make sure DelayWrite is called first + TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:0"); + + // `DBImpl::AddFile:Start` will wait until we be here + TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:1"); + + ASSERT_EQ(running_threads.load(), 2); + + // Wait for IngestExternalFile() to start and aquire mutex + TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:2"); + + // Now let compaction start + TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:3"); + + // Wait for max 5 seconds, if we did not finish all bg threads + // then we hit the deadlock bug + for (int i = 0; i < 10; i++) { + if (running_threads.load() == 0) { + break; + } + env_->SleepForMicroseconds(500000); + } + + ASSERT_EQ(running_threads.load(), 0); + + bg_ingest_file.join(); + bg_block_put.join(); +} + #endif // ROCKSDB_LITE } // namespace rocksdb