diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 2ca1659cb..f8e6a1529 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -705,7 +705,7 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, immutable_db_options_.info_log.get()); // Perform CompactFiles - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2"); { InstrumentedMutexLock l(&mutex_); @@ -713,15 +713,16 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, // IngestExternalFile() calls to finish. WaitForIngestFile(); - s = CompactFilesImpl(compact_options, cfd, sv->current, input_file_names, + // We need to get current after `WaitForIngestFile`, because + // `IngestExternalFile` may add files that overlap with `input_file_names` + auto* current = cfd->current(); + current->Ref(); + + s = CompactFilesImpl(compact_options, cfd, current, input_file_names, output_file_names, output_level, output_path_id, &job_context, &log_buffer); - } - if (sv->Unref()) { - mutex_.Lock(); - sv->Cleanup(); - mutex_.Unlock(); - delete sv; + + current->Unref(); } // Find and delete obsolete files diff --git a/db/db_test2.cc b/db/db_test2.cc index b60ad3afc..09d7f220e 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2888,6 +2888,65 @@ TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) { rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_F(DBTest2, TestCompactFiles) { + // Setup sync point dependency to reproduce the race condition of + // DBImpl::GetColumnFamilyHandleUnlocked + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"TestCompactFiles::IngestExternalFile1", + "TestCompactFiles::IngestExternalFile2"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Options options; + options.num_levels = 2; + options.disable_auto_compactions = true; + Reopen(options); + auto* handle = db_->DefaultColumnFamily(); + ASSERT_EQ(db_->NumberLevels(handle), 2); + + rocksdb::SstFileWriter sst_file_writer{rocksdb::EnvOptions(), options}; + std::string external_file1 = dbname_ + "/test_compact_files1.sst_t"; + std::string external_file2 = dbname_ + "/test_compact_files2.sst_t"; + std::string external_file3 = dbname_ + "/test_compact_files3.sst_t"; + + ASSERT_OK(sst_file_writer.Open(external_file1)); + ASSERT_OK(sst_file_writer.Put("1", "1")); + ASSERT_OK(sst_file_writer.Put("2", "2")); + ASSERT_OK(sst_file_writer.Finish()); + + ASSERT_OK(sst_file_writer.Open(external_file2)); + ASSERT_OK(sst_file_writer.Put("3", "3")); + ASSERT_OK(sst_file_writer.Put("4", "4")); + ASSERT_OK(sst_file_writer.Finish()); + + ASSERT_OK(sst_file_writer.Open(external_file3)); + ASSERT_OK(sst_file_writer.Put("5", "5")); + ASSERT_OK(sst_file_writer.Put("6", "6")); + ASSERT_OK(sst_file_writer.Finish()); + + ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file3}, + IngestExternalFileOptions())); + ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2); + std::vector files; + GetSstFiles(env_, dbname_, &files); + ASSERT_EQ(files.size(), 2); + + port::Thread user_thread1( + [&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); }); + + port::Thread user_thread2([&]() { + ASSERT_OK(db_->IngestExternalFile(handle, {external_file2}, + IngestExternalFileOptions())); + TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1"); + }); + + user_thread1.join(); + user_thread2.join(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + } // namespace rocksdb int main(int argc, char** argv) {