diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 1b8c5b942..794defb11 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -7,6 +7,7 @@ #include #include +#include #include #include "rocksdb/db.h" @@ -142,9 +143,6 @@ TEST_F(CompactFilesTest, ObsoleteFiles) { } auto l0_files = collector->GetFlushedFiles(); - CompactionOptions compact_opt; - compact_opt.compression = kNoCompression; - compact_opt.output_file_size_limit = kWriteBufferSize * 5; ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); // verify all compaction input files are deleted @@ -154,6 +152,62 @@ TEST_F(CompactFilesTest, ObsoleteFiles) { delete db; } +TEST_F(CompactFilesTest, CapturingPendingFiles) { + Options options; + options.create_if_missing = true; + // Disable RocksDB background compaction. + options.compaction_style = kCompactionStyleNone; + // Always do full scans for obsolete files (needed to reproduce the issue). + options.delete_obsolete_files_period_micros = 0; + + // Add listener. + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + + DB* db = nullptr; + DestroyDB(db_name_, options); + Status s = DB::Open(options, db_name_, &db); + assert(s.ok()); + assert(db); + + // Create 5 files. + for (int i = 0; i < 5; ++i) { + db->Put(WriteOptions(), "key" + ToString(i), "value"); + db->Flush(FlushOptions()); + } + + auto l0_files = collector->GetFlushedFiles(); + EXPECT_EQ(5, l0_files.size()); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"}, + {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Start compacting files. + std::thread compaction_thread( + [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); }); + + // In the meantime flush another file. + TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0"); + db->Put(WriteOptions(), "key5", "value"); + db->Flush(FlushOptions()); + TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1"); + + compaction_thread.join(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + delete db; + + // Make sure we can reopen the DB. + s = DB::Open(options, db_name_, &db); + ASSERT_TRUE(s.ok()); + assert(db); + delete db; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index afe1a9c9d..d29df88c1 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1819,6 +1819,9 @@ Status DBImpl::CompactFilesImpl( std::vector snapshot_seqs = snapshots_.GetAll(&earliest_write_conflict_snapshot); + auto pending_outputs_inserted_elem = + CaptureCurrentFileNumberInPendingOutputs(); + assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), @@ -1846,6 +1849,8 @@ Status DBImpl::CompactFilesImpl( TEST_SYNC_POINT("CompactFilesImpl:0"); TEST_SYNC_POINT("CompactFilesImpl:1"); compaction_job.Run(); + TEST_SYNC_POINT("CompactFilesImpl:2"); + TEST_SYNC_POINT("CompactFilesImpl:3"); mutex_.Lock(); Status status = compaction_job.Install(*c->mutable_cf_options()); @@ -1855,6 +1860,8 @@ Status DBImpl::CompactFilesImpl( } c->ReleaseCompactionFiles(s); + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); + if (status.ok()) { // Done } else if (status.IsShutdownInProgress()) {