From 4420df4b0e15ae88911e960c4fbafbaf8450fcf7 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 5 Jun 2018 14:03:42 -0700 Subject: [PATCH] Check conflict at output level in CompactFiles (#3926) Summary: CompactFiles checked whether the existing files conflicted with the chosen compaction. But it missed checking whether future files would conflict, i.e., when another compaction was simultaneously writing new files to the same range at the same output level. Closes https://github.com/facebook/rocksdb/pull/3926 Differential Revision: D8218996 Pulled By: ajkr fbshipit-source-id: 21cb00a6fed4c8c62d3ed2ff810962e6bdc2fdfb --- db/compaction_picker.cc | 5 ++++ db/db_compaction_test.cc | 62 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 9ac749779..658487534 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -853,6 +853,11 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( } } } + if (RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) { + return Status::Aborted( + "A running compaction is writing to the same output level in an " + "overlapping key range"); + } return Status::OK(); } diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index bd8c10ae6..20dd09541 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3704,6 +3704,68 @@ TEST_F(DBCompactionTest, CompactionStatsTest) { VerifyCompactionStats(*cfd, *collector); } +TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) { + // LSM setup: + // L1: [ba bz] + // L2: [a b] [c d] + // L3: [a b] [c d] + // + // Thread 1: Thread 2: + // Begin compacting all L2->L3 + // Compact [ba bz] L1->L3 + // End compacting all L2->L3 + // + // The compaction operation in thread 2 should be disallowed because the range + // overlaps with the compaction in thread 1, which also covers that range in + // L3. + Options options = CurrentOptions(); + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + Reopen(options); + + for (int level = 3; level >= 2; --level) { + ASSERT_OK(Put("a", "val")); + ASSERT_OK(Put("b", "val")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("c", "val")); + ASSERT_OK(Put("d", "val")); + ASSERT_OK(Flush()); + MoveFilesToLevel(level); + } + ASSERT_OK(Put("ba", "val")); + ASSERT_OK(Put("bz", "val")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + SyncPoint::GetInstance()->LoadDependency({ + {"CompactFilesImpl:0", + "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"}, + {"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End", + "CompactFilesImpl:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + auto bg_thread = port::Thread([&]() { + // Thread 1 + std::vector filenames = collector->GetFlushedFiles(); + filenames.pop_back(); + ASSERT_OK(db_->CompactFiles(CompactionOptions(), filenames, + 3 /* output_level */)); + }); + + // Thread 2 + TEST_SYNC_POINT( + "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"); + std::string filename = collector->GetFlushedFiles().back(); + ASSERT_FALSE( + db_->CompactFiles(CompactionOptions(), {filename}, 3 /* output_level */) + .ok()); + TEST_SYNC_POINT( + "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End"); + + bg_thread.join(); +} + INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, ::testing::Values(std::make_tuple(1, true), std::make_tuple(1, false),