Check conflict at output level in CompactFiles (#3926)
Summary: CompactFiles checked whether the existing files conflicted with the chosen compaction. But it missed checking whether future files would conflict, i.e., when another compaction was simultaneously writing new files to the same range at the same output level. Closes https://github.com/facebook/rocksdb/pull/3926 Differential Revision: D8218996 Pulled By: ajkr fbshipit-source-id: 21cb00a6fed4c8c62d3ed2ff810962e6bdc2fdfb
This commit is contained in:
parent
f1592a06c2
commit
4420df4b0e
@ -853,6 +853,11 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
|
||||
}
|
||||
}
|
||||
}
|
||||
if (RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) {
|
||||
return Status::Aborted(
|
||||
"A running compaction is writing to the same output level in an "
|
||||
"overlapping key range");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -3704,6 +3704,68 @@ TEST_F(DBCompactionTest, CompactionStatsTest) {
|
||||
VerifyCompactionStats(*cfd, *collector);
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
|
||||
// LSM setup:
|
||||
// L1: [ba bz]
|
||||
// L2: [a b] [c d]
|
||||
// L3: [a b] [c d]
|
||||
//
|
||||
// Thread 1: Thread 2:
|
||||
// Begin compacting all L2->L3
|
||||
// Compact [ba bz] L1->L3
|
||||
// End compacting all L2->L3
|
||||
//
|
||||
// The compaction operation in thread 2 should be disallowed because the range
|
||||
// overlaps with the compaction in thread 1, which also covers that range in
|
||||
// L3.
|
||||
Options options = CurrentOptions();
|
||||
FlushedFileCollector* collector = new FlushedFileCollector();
|
||||
options.listeners.emplace_back(collector);
|
||||
Reopen(options);
|
||||
|
||||
for (int level = 3; level >= 2; --level) {
|
||||
ASSERT_OK(Put("a", "val"));
|
||||
ASSERT_OK(Put("b", "val"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(Put("c", "val"));
|
||||
ASSERT_OK(Put("d", "val"));
|
||||
ASSERT_OK(Flush());
|
||||
MoveFilesToLevel(level);
|
||||
}
|
||||
ASSERT_OK(Put("ba", "val"));
|
||||
ASSERT_OK(Put("bz", "val"));
|
||||
ASSERT_OK(Flush());
|
||||
MoveFilesToLevel(1);
|
||||
|
||||
SyncPoint::GetInstance()->LoadDependency({
|
||||
{"CompactFilesImpl:0",
|
||||
"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"},
|
||||
{"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End",
|
||||
"CompactFilesImpl:1"},
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
auto bg_thread = port::Thread([&]() {
|
||||
// Thread 1
|
||||
std::vector<std::string> filenames = collector->GetFlushedFiles();
|
||||
filenames.pop_back();
|
||||
ASSERT_OK(db_->CompactFiles(CompactionOptions(), filenames,
|
||||
3 /* output_level */));
|
||||
});
|
||||
|
||||
// Thread 2
|
||||
TEST_SYNC_POINT(
|
||||
"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin");
|
||||
std::string filename = collector->GetFlushedFiles().back();
|
||||
ASSERT_FALSE(
|
||||
db_->CompactFiles(CompactionOptions(), {filename}, 3 /* output_level */)
|
||||
.ok());
|
||||
TEST_SYNC_POINT(
|
||||
"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End");
|
||||
|
||||
bg_thread.join();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
|
||||
::testing::Values(std::make_tuple(1, true),
|
||||
std::make_tuple(1, false),
|
||||
|
Loading…
x
Reference in New Issue
Block a user