fix unstable unittest caused by #5958 (#6061)

Summary:
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

This PR is to fix unstable unit test added by  (https://github.com/facebook/rocksdb/pull/5958).
I set SYNC_POINT in PickCompaction before. If IntraL0Compaction was trigger,  the compact job which compact sst to base level would start instantly. If the compaction thread run faster than unittest main thread, we may observe the number of files in L0 reduce.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6061

Differential Revision: D18642301

fbshipit-source-id: 3e4da2ee963532b6e142336951ea3f47d46df148
This commit is contained in:
Little-Wallace 2019-11-21 15:22:38 -08:00 committed by Facebook Github Bot
parent 0ce0edbe12
commit e50b64bdba
2 changed files with 18 additions and 3 deletions

View File

@ -43,6 +43,7 @@ bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
SequenceNumber earliest_mem_seqno) { SequenceNumber earliest_mem_seqno) {
// Do not pick ingested file when there is at least one memtable not flushed // Do not pick ingested file when there is at least one memtable not flushed
// which of seqno is overlap with the sst. // which of seqno is overlap with the sst.
TEST_SYNC_POINT("FindIntraL0Compaction");
size_t start = 0; size_t start = 0;
for (; start < level_files.size(); start++) { for (; start < level_files.size(); start++) {
if (level_files[start]->being_compacted) { if (level_files[start]->being_compacted) {

View File

@ -4876,7 +4876,7 @@ void IngestOneKeyValue(DBImpl* db, const std::string& key,
} }
TEST_P(DBCompactionTestWithParam, TEST_P(DBCompactionTestWithParam,
FlushAfterL0IntraCompactionCheckConsistencyFail) { FlushAfterIntraL0CompactionCheckConsistencyFail) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.force_consistency_checks = true; options.force_consistency_checks = true;
options.compression = kNoCompression; options.compression = kNoCompression;
@ -4887,11 +4887,16 @@ TEST_P(DBCompactionTestWithParam,
const size_t kValueSize = 1 << 20; const size_t kValueSize = 1 << 20;
Random rnd(301); Random rnd(301);
std::atomic<int> pick_intra_l0_count(0);
std::string value(RandomString(&rnd, kValueSize)); std::string value(RandomString(&rnd, kValueSize));
rocksdb::SyncPoint::GetInstance()->LoadDependency( rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"LevelCompactionPicker::PickCompactionBySize:0", {{"DBCompactionTestWithParam::FlushAfterIntraL0:1",
"CompactionJob::Run():Start"}}); "CompactionJob::Run():Start"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"FindIntraL0Compaction",
[&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// prevents trivial move // prevents trivial move
@ -4921,6 +4926,7 @@ TEST_P(DBCompactionTestWithParam,
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
} }
TEST_SYNC_POINT("DBCompactionTestWithParam::FlushAfterIntraL0:1");
// Put one key, to make biggest log sequence number in this memtable is bigger // Put one key, to make biggest log sequence number in this memtable is bigger
// than sst which would be ingested in next step. // than sst which would be ingested in next step.
ASSERT_OK(Put(Key(2), "b")); ASSERT_OK(Put(Key(2), "b"));
@ -4931,6 +4937,7 @@ TEST_P(DBCompactionTestWithParam,
dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(), dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
&level_to_files); &level_to_files);
ASSERT_GT(level_to_files[0].size(), 0); ASSERT_GT(level_to_files[0].size(), 0);
ASSERT_GT(pick_intra_l0_count.load(), 0);
ASSERT_OK(Flush()); ASSERT_OK(Flush());
} }
@ -4960,9 +4967,14 @@ TEST_P(DBCompactionTestWithParam,
ASSERT_OK(Flush()); ASSERT_OK(Flush());
Compact("", Key(99)); Compact("", Key(99));
ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_EQ(0, NumTableFilesAtLevel(0));
std::atomic<int> pick_intra_l0_count(0);
rocksdb::SyncPoint::GetInstance()->LoadDependency( rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"LevelCompactionPicker::PickCompactionBySize:0", {{"DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1",
"CompactionJob::Run():Start"}}); "CompactionJob::Run():Start"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"FindIntraL0Compaction",
[&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Make 6 L0 sst. // Make 6 L0 sst.
for (int i = 0; i < 6; ++i) { for (int i = 0; i < 6; ++i) {
@ -4999,12 +5011,14 @@ TEST_P(DBCompactionTestWithParam,
// Wake up flush job // Wake up flush job
sleeping_tasks.WakeUp(); sleeping_tasks.WakeUp();
sleeping_tasks.WaitUntilDone(); sleeping_tasks.WaitUntilDone();
TEST_SYNC_POINT("DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1");
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
uint64_t error_count = 0; uint64_t error_count = 0;
db_->GetIntProperty("rocksdb.background-errors", &error_count); db_->GetIntProperty("rocksdb.background-errors", &error_count);
ASSERT_EQ(error_count, 0); ASSERT_EQ(error_count, 0);
ASSERT_GT(pick_intra_l0_count.load(), 0);
for (int i = 0; i < 6; ++i) { for (int i = 0; i < 6; ++i) {
ASSERT_EQ(bigvalue, Get(Key(i))); ASSERT_EQ(bigvalue, Get(Key(i)));
} }