From eef63ef807eaf13b55d07d044ac35152371963ff Mon Sep 17 00:00:00 2001 From: Mike Kolupaev Date: Mon, 22 Feb 2016 13:54:58 -0800 Subject: [PATCH] Fixed CompactFiles() spuriously failing or corrupting DB Summary: We started getting two kinds of crashes since we started using `DB::CompactFiles()`: (1) `CompactFiles()` fails saying something like "/data/logdevice/4440/shard12/012302.sst: No such file or directory", and presumably makes DB read-only, (2) DB fails to open saying "Corruption: Can't access /267000.sst: IO error: /data/logdevice/4440/shard1/267000.sst: No such file or directory". AFAICT, both can be explained by background thread deleting compaction output as "obsolete" while it's being written, before it's committed to manifest. If it ends up committed to the manifest, we get (2); if compaction notices the disappearance and fails, we get (1). The internal tasks t10068021 and t10134177 have some details about the investigation that led to this. Test Plan: `make -j check`; the new test fails to reopen the DB without the fix Reviewers: yhchiang Reviewed By: yhchiang Subscribers: dhruba, sdong Differential Revision: https://reviews.facebook.net/D54561 --- db/compact_files_test.cc | 60 ++++++++++++++++++++++++++++++++++++++-- db/db_impl.cc | 7 +++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 1b8c5b942..794defb11 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -7,6 +7,7 @@ #include #include +#include #include #include "rocksdb/db.h" @@ -142,9 +143,6 @@ TEST_F(CompactFilesTest, ObsoleteFiles) { } auto l0_files = collector->GetFlushedFiles(); - CompactionOptions compact_opt; - compact_opt.compression = kNoCompression; - compact_opt.output_file_size_limit = kWriteBufferSize * 5; ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); // verify all compaction input files are deleted @@ -154,6 +152,62 @@ TEST_F(CompactFilesTest, ObsoleteFiles) { delete db; } +TEST_F(CompactFilesTest, CapturingPendingFiles) { + Options options; + options.create_if_missing = true; + // Disable RocksDB background compaction. + options.compaction_style = kCompactionStyleNone; + // Always do full scans for obsolete files (needed to reproduce the issue). + options.delete_obsolete_files_period_micros = 0; + + // Add listener. + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + + DB* db = nullptr; + DestroyDB(db_name_, options); + Status s = DB::Open(options, db_name_, &db); + assert(s.ok()); + assert(db); + + // Create 5 files. + for (int i = 0; i < 5; ++i) { + db->Put(WriteOptions(), "key" + ToString(i), "value"); + db->Flush(FlushOptions()); + } + + auto l0_files = collector->GetFlushedFiles(); + EXPECT_EQ(5, l0_files.size()); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"CompactFilesImpl:2", "CompactFilesTest.CapturingPendingFiles:0"}, + {"CompactFilesTest.CapturingPendingFiles:1", "CompactFilesImpl:3"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Start compacting files. + std::thread compaction_thread( + [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); }); + + // In the meantime flush another file. + TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0"); + db->Put(WriteOptions(), "key5", "value"); + db->Flush(FlushOptions()); + TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1"); + + compaction_thread.join(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + delete db; + + // Make sure we can reopen the DB. + s = DB::Open(options, db_name_, &db); + ASSERT_TRUE(s.ok()); + assert(db); + delete db; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index afe1a9c9d..d29df88c1 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1819,6 +1819,9 @@ Status DBImpl::CompactFilesImpl( std::vector snapshot_seqs = snapshots_.GetAll(&earliest_write_conflict_snapshot); + auto pending_outputs_inserted_elem = + CaptureCurrentFileNumberInPendingOutputs(); + assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), @@ -1846,6 +1849,8 @@ Status DBImpl::CompactFilesImpl( TEST_SYNC_POINT("CompactFilesImpl:0"); TEST_SYNC_POINT("CompactFilesImpl:1"); compaction_job.Run(); + TEST_SYNC_POINT("CompactFilesImpl:2"); + TEST_SYNC_POINT("CompactFilesImpl:3"); mutex_.Lock(); Status status = compaction_job.Install(*c->mutable_cf_options()); @@ -1855,6 +1860,8 @@ Status DBImpl::CompactFilesImpl( } c->ReleaseCompactionFiles(s); + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); + if (status.ok()) { // Done } else if (status.IsShutdownInProgress()) {