fix DBTest.AutomaticConflictsWithManualCompaction

Summary:
After af92d4ad112f192693f6017f24f9ae1b00e1f053, only exclusive manual compaction can have conflict. dc360df81ec48e56a5d9cee4adb7f11ef0ca82ac updated the conflict-checking test case accordingly. But we missed the point that exclusive manual compaction can only conflict with automatic compactions scheduled after it, since it waits on pending automatic compactions before it begins running.

This PR updates the test case to ensure the automatic compactions are scheduled after the manual compaction starts but before it finishes, thus ensuring a conflict. I also cleaned up the test case to use less space as I saw it cause out-of-space error on travis.
Closes https://github.com/facebook/rocksdb/pull/3375

Differential Revision: D6735162

Pulled By: ajkr

fbshipit-source-id: 020530a4e150a4786792dce7cec5d66b420cb884
This commit is contained in:
Andrew Kryczka 2018-01-16 22:56:47 -08:00 committed by Facebook Github Bot
parent dc360df81e
commit 266d85fbec
3 changed files with 43 additions and 136 deletions

View File

@ -1868,126 +1868,6 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) {
}
}
// This test checks for automatic getting a conflict if there is a
// manual which has not yet been scheduled.
// The manual compaction waits in NotScheduled
// We generate more files and then trigger an automatic compaction
// This will wait because there is an unscheduled manual compaction.
// Once the conflict is hit, the manual compaction starts and ends
// Then another automatic will start and end.
TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) {
Open();
CreateColumnFamilies({"one"});
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.max_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
default_cf.write_buffer_size = 64 << 10; // 64KB
default_cf.target_file_size_base = 30 << 10;
default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleUniversal;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.write_buffer_size = 120000;
Reopen({default_cf, one});
// make sure all background compaction jobs can be scheduled
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(i + 1), 1);
}
bool cf_1_1 = true;
bool cf_1_2 = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BackgroundCompaction()::Conflict",
"ColumnFamilyTest::ManualAutoCon:7"},
{"ColumnFamilyTest::ManualAutoCon:9",
"ColumnFamilyTest::ManualAutoCon:8"},
{"ColumnFamilyTest::ManualAutoCon:2",
"ColumnFamilyTest::ManualAutoCon:6"},
{"ColumnFamilyTest::ManualAutoCon:4",
"ColumnFamilyTest::ManualAutoCon:5"},
{"ColumnFamilyTest::ManualAutoCon:1",
"ColumnFamilyTest::ManualAutoCon:2"},
{"ColumnFamilyTest::ManualAutoCon:1",
"ColumnFamilyTest::ManualAutoCon:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (cf_1_1) {
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:4");
cf_1_1 = false;
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:3");
} else if (cf_1_2) {
cf_1_2 = false;
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:2");
}
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RunManualCompaction:NotScheduled", [&](void* arg) {
InstrumentedMutex* mutex = static_cast<InstrumentedMutex*>(arg);
mutex->Unlock();
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:9");
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:7");
mutex->Lock();
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::port::Thread threads([&] {
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false;
ASSERT_OK(
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:6");
});
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:8");
WaitForFlush(1);
// Add more L0 files and force automatic compaction
for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1);
}
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:5");
// Add more L0 files and force automatic compaction
for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
}
TEST_SYNC_POINT("ColumnFamilyTest::ManualAutoCon:1");
threads.join();
WaitForCompaction();
// VERIFY compaction "one"
ASSERT_LE(NumTableFilesAtLevel(0, 1), 3);
// Compare against saved keys
std::set<std::string>::iterator key_iter = keys_.begin();
while (key_iter != keys_.end()) {
ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
key_iter++;
}
}
// In this test, we generate enough files to trigger automatic compactions.
// The automatic compaction waits in NonTrivial:AfterRun
// We generate more files and then trigger an automatic compaction

View File

@ -1060,6 +1060,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
if (HasExclusiveManualCompaction()) {
// only manual compactions are allowed to run. don't schedule automatic
// compactions
TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
return;
}

View File

@ -5083,33 +5083,59 @@ TEST_F(DBTest, CompactRangeWithEmptyBottomLevel) {
#endif // ROCKSDB_LITE
TEST_F(DBTest, AutomaticConflictsWithManualCompaction) {
const int kNumL0Files = 50;
Options options = CurrentOptions();
options.write_buffer_size = 2 * 1024 * 1024; // 2MB
options.max_bytes_for_level_base = 2 * 1024 * 1024; // 2MB
options.num_levels = 12;
options.level0_file_num_compaction_trigger = 4;
// never slowdown / stop
options.level0_slowdown_writes_trigger = 999999;
options.level0_stop_writes_trigger = 999999;
options.max_background_compactions = 10;
options.max_bytes_for_level_multiplier = 2;
options.level_compaction_dynamic_level_bytes = true;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 300000; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
}
// schedule automatic compactions after the manual one starts, but before it
// finishes to ensure conflict.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BackgroundCompaction:Start",
"DBTest::AutomaticConflictsWithManualCompaction:PrePuts"},
{"DBTest::AutomaticConflictsWithManualCompaction:PostPuts",
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
std::atomic<int> callback_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction()::Conflict",
"DBImpl::MaybeScheduleFlushOrCompaction:Conflict",
[&](void* arg) { callback_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
CompactRangeOptions croptions;
croptions.exclusive_manual_compaction = true;
ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
Random rnd(301);
for (int i = 0; i < 2; ++i) {
// put two keys to ensure no trivial move
for (int j = 0; j < 2; ++j) {
ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
}
ASSERT_OK(Flush());
}
std::thread manual_compaction_thread([this]() {
CompactRangeOptions croptions;
croptions.exclusive_manual_compaction = true;
ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
});
TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PrePuts");
for (int i = 0; i < kNumL0Files; ++i) {
// put two keys to ensure no trivial move
for (int j = 0; j < 2; ++j) {
ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
}
ASSERT_OK(Flush());
}
TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PostPuts");
ASSERT_GE(callback_count.load(), 1);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
for (int i = 0; i < 300000; ++i) {
for (int i = 0; i < 2; ++i) {
ASSERT_NE("NOT_FOUND", Get(Key(i)));
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
manual_compaction_thread.join();
dbfull()->TEST_WaitForCompact();
}
// Github issue #595