From ccecf3f4fb8e6eeaa06504b9d477b6db4137831a Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Wed, 10 Aug 2016 12:37:43 -0700 Subject: [PATCH] UniversalCompaction should ignore sorted runs being compacted (when compacting for file num) Summary: If we have total number of sorted runs greater than level0_file_num_compaction_trigger, Universal compaction will always issue a compaction even if the number of sorted runs that are not being compacted is less than level0_file_num_compaction_trigger. This diff changes this behaviour to relay on the `number of sorted runs not being compacted` instead of `total number of sorted runs` Test Plan: New unit test Reviewers: sdong Reviewed By: sdong Subscribers: andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D61533 --- db/compaction_job_stats_test.cc | 5 +- db/compaction_picker.cc | 37 +++++-- db/db_test_util.cc | 2 +- db/db_universal_compaction_test.cc | 164 +++++++++++++++++++++++++---- 4 files changed, 175 insertions(+), 33 deletions(-) diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc index a05b0ba64..1130709dd 100644 --- a/db/compaction_job_stats_test.cc +++ b/db/compaction_job_stats_test.cc @@ -957,7 +957,7 @@ TEST_P(CompactionJobStatsTest, UniversalCompactionTest) { uint64_t key_base = 100000000l; // Note: key_base must be multiple of num_keys_per_L0_file int num_keys_per_table = 100; - const uint32_t kTestScale = 8; + const uint32_t kTestScale = 6; const int kKeySize = 10; const int kValueSize = 900; double compression_ratio = 1.0; @@ -1008,8 +1008,9 @@ TEST_P(CompactionJobStatsTest, UniversalCompactionTest) { num_input_units, num_keys_per_table * num_input_units, 1.0, 0, false)); + dbfull()->TEST_WaitForCompact(); } - ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 4U); + ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 3U); for (uint64_t start_key = key_base; start_key <= key_base * kTestScale; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 3d3e3e668..42cffbdd8 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1313,6 +1313,8 @@ Compaction* UniversalCompactionPicker::PickCompaction( sorted_runs.size() < (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) { LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n", cf_name.c_str()); + TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", + nullptr); return nullptr; } VersionStorageInfo::LevelSummaryStorage tmp; @@ -1347,19 +1349,34 @@ Compaction* UniversalCompactionPicker::PickCompaction( assert(sorted_runs.size() >= static_cast( mutable_cf_options.level0_file_num_compaction_trigger)); - unsigned int num_files = - static_cast(sorted_runs.size()) - - mutable_cf_options.level0_file_num_compaction_trigger; - if ((c = PickCompactionUniversalReadAmp( - cf_name, mutable_cf_options, vstorage, score, UINT_MAX, - num_files, sorted_runs, log_buffer)) != nullptr) { - LogToBuffer(log_buffer, - "[%s] Universal: compacting for file num -- %u\n", - cf_name.c_str(), num_files); + // Get the total number of sorted runs that are not being compacted + int num_sr_not_compacted = 0; + for (size_t i = 0; i < sorted_runs.size(); i++) { + if (sorted_runs[i].being_compacted == false) { + num_sr_not_compacted++; + } + } + + // The number of sorted runs that are not being compacted is greater than + // the maximum allowed number of sorted runs + if (num_sr_not_compacted > + mutable_cf_options.level0_file_num_compaction_trigger) { + unsigned int num_files = + num_sr_not_compacted - + mutable_cf_options.level0_file_num_compaction_trigger + 1; + if ((c = PickCompactionUniversalReadAmp( + cf_name, mutable_cf_options, vstorage, score, UINT_MAX, + num_files, sorted_runs, log_buffer)) != nullptr) { + LogToBuffer(log_buffer, + "[%s] Universal: compacting for file num -- %u\n", + cf_name.c_str(), num_files); + } } } } if (c == nullptr) { + TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", + nullptr); return nullptr; } @@ -1412,6 +1429,8 @@ Compaction* UniversalCompactionPicker::PickCompaction( level0_compactions_in_progress_.insert(c); + TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", + c); return c; } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 0ce19eabe..22f882ab6 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -730,7 +730,7 @@ double DBTestBase::CompressionRatioAtLevel(int level, int cf) { int DBTestBase::TotalTableFiles(int cf, int levels) { if (levels == -1) { - levels = CurrentOptions().num_levels; + levels = (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]); } int result = 0; for (int level = 0; level < levels; level++) { diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 7eab802d4..2cf8ed066 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -637,10 +637,132 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) { } } +TEST_P(DBTestUniversalCompactionParallel, PickByFileNumberBug) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = num_levels_; + options.write_buffer_size = 1 * 1024; // 1KB + options.level0_file_num_compaction_trigger = 7; + options.max_background_compactions = 2; + options.target_file_size_base = 1024 * 1024; // 1MB + + // Disable size amplifiction compaction + options.compaction_options_universal.max_size_amplification_percent = + UINT_MAX; + DestroyAndReopen(options); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBTestUniversalCompactionParallel::PickByFileNumberBug:0", + "BackgroundCallCompaction:0"}, + {"UniversalCompactionPicker::PickCompaction:Return", + "DBTestUniversalCompactionParallel::PickByFileNumberBug:1"}, + {"DBTestUniversalCompactionParallel::PickByFileNumberBug:2", + "CompactionJob::Run():Start"}}); + + int total_picked_compactions = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "UniversalCompactionPicker::PickCompaction:Return", [&](void* arg) { + if (arg) { + total_picked_compactions++; + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Write 7 files to trigger compaction + int key_idx = 1; + for (int i = 1; i <= 70; i++) { + std::string k = Key(key_idx++); + ASSERT_OK(Put(k, k)); + if (i % 10 == 0) { + ASSERT_OK(Flush()); + } + } + + // Wait for the 1st background compaction process to start + TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0"); + TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1"); + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + + // Write 3 files while 1st compaction is held + // These 3 files have different sizes to avoid compacting based on size_ratio + int num_keys = 1000; + for (int i = 0; i < 3; i++) { + for (int j = 1; j <= num_keys; j++) { + std::string k = Key(key_idx++); + ASSERT_OK(Put(k, k)); + } + ASSERT_OK(Flush()); + num_keys -= 100; + } + + // Wait for the 2nd background compaction process to start + TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0"); + TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1"); + + // Hold the 1st and 2nd compaction from finishing + TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2"); + dbfull()->TEST_WaitForCompact(); + + // Although 2 compaction threads started, the second one did not compact + // anything because the number of files not being compacted is less than + // level0_file_num_compaction_trigger + EXPECT_EQ(total_picked_compactions, 1); + EXPECT_EQ(TotalTableFiles(), 4); + + // Stop SyncPoint and destroy the DB and reopen it again + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + key_idx = 1; + total_picked_compactions = 0; + DestroyAndReopen(options); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Write 7 files to trigger compaction + for (int i = 1; i <= 70; i++) { + std::string k = Key(key_idx++); + ASSERT_OK(Put(k, k)); + if (i % 10 == 0) { + ASSERT_OK(Flush()); + } + } + + // Wait for the 1st background compaction process to start + TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0"); + TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1"); + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + + // Write 8 files while 1st compaction is held + // These 8 files have different sizes to avoid compacting based on size_ratio + num_keys = 1000; + for (int i = 0; i < 8; i++) { + for (int j = 1; j <= num_keys; j++) { + std::string k = Key(key_idx++); + ASSERT_OK(Put(k, k)); + } + ASSERT_OK(Flush()); + num_keys -= 100; + } + + // Wait for the 2nd background compaction process to start + TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0"); + TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1"); + + // Hold the 1st and 2nd compaction from finishing + TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2"); + dbfull()->TEST_WaitForCompact(); + + // This time we will trigger a compaction because of size ratio and + // another compaction because of number of files that are not compacted + // greater than 7 + EXPECT_GE(total_picked_compactions, 2); +} + INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel, DBTestUniversalCompactionParallel, ::testing::Combine(::testing::Values(1, 10), - ::testing::Bool())); + ::testing::Values(false))); TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) { Options options = CurrentOptions(); @@ -996,13 +1118,13 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) { ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(0, GetSstFileCount(dbname_)); - // (1, 2, 4) + // (1, 2, 4) -> (3, 4) GenerateNewFile(&rnd, &key_idx); ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); - ASSERT_EQ(1, GetSstFileCount(dbname_)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); - // (1, 1, 2, 4) -> (8) + // (1, 3, 4) -> (8) GenerateNewFile(&rnd, &key_idx); ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); @@ -1016,22 +1138,22 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) { ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); - // (1, 2, 8) + // (1, 2, 8) -> (3, 8) GenerateNewFile(&rnd, &key_idx); ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); - ASSERT_EQ(1, GetSstFileCount(dbname_)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); - // (1, 1, 2, 8) -> (4, 8) + // (1, 3, 8) -> (4, 8) GenerateNewFile(&rnd, &key_idx); ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); - // (1, 4, 8) + // (1, 4, 8) -> (5, 8) GenerateNewFile(&rnd, &key_idx); ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); - ASSERT_EQ(1, GetSstFileCount(dbname_)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); for (int i = 0; i < key_idx; i++) { auto v = Get(Key(i)); @@ -1195,12 +1317,12 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) { ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(1, GetSstFileCount(dbname_)); - // (1, 2, 4) + // (1, 2, 4) -> (3, 4) GenerateNewFile(&rnd, &key_idx); - ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); - ASSERT_EQ(2, GetSstFileCount(dbname_)); + ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); - // (1, 1, 2, 4) -> (8) + // (1, 3, 4) -> (8) GenerateNewFile(&rnd, &key_idx); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(0, GetSstFileCount(dbname_)); @@ -1215,20 +1337,20 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) { ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(1, GetSstFileCount(dbname_)); - // (1, 2, 8) - GenerateNewFile(&rnd, &key_idx); - ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); - ASSERT_EQ(2, GetSstFileCount(dbname_)); - - // (1, 1, 2, 8) -> (4, 8) + // (1, 2, 8) -> (3, 8) GenerateNewFile(&rnd, &key_idx); ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(0, GetSstFileCount(dbname_)); - // (1, 4, 8) + // (1, 3, 8) -> (4, 8) GenerateNewFile(&rnd, &key_idx); ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); - ASSERT_EQ(1, GetSstFileCount(dbname_)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + // (1, 4, 8) -> (5, 8) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); for (int i = 0; i < key_idx; i++) { auto v = Get(Key(i));