Fix and Improve DBTest.DynamicLevelCompressionPerLevel2
Summary: Recent change of DBTest.DynamicLevelCompressionPerLevel2 has a bug that the second sync point is not enabled. Fix it. Also add an assert for that. Also, flush compression is not tracked in the test. Add it. Test Plan: Build everything Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D37101
This commit is contained in:
parent
a1271c6c6f
commit
12d7d3d28d
@ -11243,13 +11243,23 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
|
|||||||
|
|
||||||
DestroyAndReopen(options);
|
DestroyAndReopen(options);
|
||||||
// When base level is L4, L4 is LZ4.
|
// When base level is L4, L4 is LZ4.
|
||||||
|
std::atomic<int> num_zlib(0);
|
||||||
|
std::atomic<int> num_lz4(0);
|
||||||
|
std::atomic<int> num_no(0);
|
||||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
||||||
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
||||||
if (compaction->output_level() == 4) {
|
if (compaction->output_level() == 4) {
|
||||||
ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression);
|
ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression);
|
||||||
|
num_lz4.fetch_add(1);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
|
||||||
|
auto* compression = reinterpret_cast<CompressionType*>(arg);
|
||||||
|
ASSERT_TRUE(*compression == kNoCompression);
|
||||||
|
num_no.fetch_add(1);
|
||||||
|
});
|
||||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
@ -11264,18 +11274,31 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
|
|||||||
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
|
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
|
||||||
ASSERT_EQ(NumTableFilesAtLevel(3), 0);
|
ASSERT_EQ(NumTableFilesAtLevel(3), 0);
|
||||||
ASSERT_GT(NumTableFilesAtLevel(4), 0);
|
ASSERT_GT(NumTableFilesAtLevel(4), 0);
|
||||||
|
ASSERT_GT(num_no.load(), 2);
|
||||||
|
ASSERT_GT(num_lz4.load(), 0);
|
||||||
int prev_num_files_l4 = NumTableFilesAtLevel(4);
|
int prev_num_files_l4 = NumTableFilesAtLevel(4);
|
||||||
|
|
||||||
// After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
|
// After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
|
||||||
|
num_lz4.store(0);
|
||||||
|
num_no.store(0);
|
||||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
||||||
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
||||||
if (compaction->output_level() == 4 && compaction->start_level() == 3) {
|
if (compaction->output_level() == 4 && compaction->start_level() == 3) {
|
||||||
ASSERT_TRUE(compaction->OutputCompressionType() == kZlibCompression);
|
ASSERT_TRUE(compaction->OutputCompressionType() == kZlibCompression);
|
||||||
|
num_zlib.fetch_add(1);
|
||||||
} else {
|
} else {
|
||||||
ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression);
|
ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression);
|
||||||
|
num_lz4.fetch_add(1);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"FlushJob::WriteLevel0Table:output_compression", [&](void* arg) {
|
||||||
|
auto* compression = reinterpret_cast<CompressionType*>(arg);
|
||||||
|
ASSERT_TRUE(*compression == kNoCompression);
|
||||||
|
num_no.fetch_add(1);
|
||||||
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
for (int i = 101; i < 500; i++) {
|
for (int i = 101; i < 500; i++) {
|
||||||
ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
|
ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
|
||||||
@ -11291,6 +11314,9 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
|
|||||||
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
|
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
|
||||||
ASSERT_GT(NumTableFilesAtLevel(3), 0);
|
ASSERT_GT(NumTableFilesAtLevel(3), 0);
|
||||||
ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4);
|
ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4);
|
||||||
|
ASSERT_GT(num_no.load(), 2);
|
||||||
|
ASSERT_GT(num_lz4.load(), 0);
|
||||||
|
ASSERT_GT(num_zlib.load(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DBTest, DynamicCompactionOptions) {
|
TEST_F(DBTest, DynamicCompactionOptions) {
|
||||||
|
@ -180,6 +180,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
|
|||||||
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
|
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
|
||||||
cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber());
|
cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber());
|
||||||
|
|
||||||
|
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
|
||||||
|
&output_compression_);
|
||||||
s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
|
s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
|
||||||
cfd_->table_cache(), iter.get(), &meta,
|
cfd_->table_cache(), iter.get(), &meta,
|
||||||
cfd_->internal_comparator(),
|
cfd_->internal_comparator(),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user