diff --git a/db/db_impl.cc b/db/db_impl.cc index 1562a0da6..4b4d3364a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2320,12 +2320,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { // no need to refcount in iteration since it's always under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->options()->disable_auto_compactions) { + // Pick up latest mutable CF Options and use it throughout the + // compaction job + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + if (!mutable_cf_options->disable_auto_compactions) { // NOTE: try to avoid unnecessary copy of MutableCFOptions if // compaction is not necessary. Need to make sure mutex is held // until we make a copy in the following code - c.reset(cfd->PickCompaction( - *cfd->GetLatestMutableCFOptions(), log_buffer)); + c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); if (c != nullptr) { // update statistics MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, diff --git a/db/db_test.cc b/db/db_test.cc index a3aa42bda..23e5a1fa1 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8756,14 +8756,13 @@ TEST(DBTest, DynamicCompactionOptions) { options.env = env_; options.create_if_missing = true; options.compression = kNoCompression; - options.max_background_compactions = 4; options.hard_rate_limit = 1.1; options.write_buffer_size = k128KB; options.max_write_buffer_number = 2; // Compaction related options options.level0_file_num_compaction_trigger = 3; - options.level0_slowdown_writes_trigger = 10; - options.level0_stop_writes_trigger = 20; + options.level0_slowdown_writes_trigger = 4; + options.level0_stop_writes_trigger = 8; options.max_grandparent_overlap_factor = 10; options.expanded_compaction_factor = 25; options.source_compaction_factor = 1; @@ -8771,6 +8770,10 @@ TEST(DBTest, DynamicCompactionOptions) { options.target_file_size_multiplier = 1; options.max_bytes_for_level_base = k256KB; options.max_bytes_for_level_multiplier = 4; + + // Block flush thread and disable compaction thread + env_->SetBackgroundThreads(1, Env::LOW); + env_->SetBackgroundThreads(1, Env::HIGH); DestroyAndReopen(&options); auto gen_l0_kb = [this](int start, int size, int stride) { @@ -8842,6 +8845,77 @@ TEST(DBTest, DynamicCompactionOptions) { ASSERT_TRUE(SizeAtLevel(1) < 262144 * 1.1); ASSERT_TRUE(SizeAtLevel(2) < 2 * 262144 * 1.1); ASSERT_TRUE(SizeAtLevel(3) < 4 * 262144 * 1.1); + // Clean up memtable and L0 + dbfull()->CompactRange(nullptr, nullptr); + // Block compaction + SleepingBackgroundTask sleeping_task_low1; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1, + Env::Priority::LOW); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + int count = 0; + Random rnd(301); + WriteOptions wo; + wo.timeout_hint_us = 10000; + while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) { + // Wait for compaction so that put won't timeout + dbfull()->TEST_FlushMemTable(true); + count++; + } + ASSERT_EQ(count, 8); + // Unblock + sleeping_task_low1.WakeUp(); + sleeping_task_low1.WaitUntilDone(); + + // Reduce stop trigger + ASSERT_TRUE(dbfull()->SetOptions({ + {"level0_stop_writes_trigger", "6"} + })); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + + // Block compaction + SleepingBackgroundTask sleeping_task_low2; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, + Env::Priority::LOW); + count = 0; + while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) { + // Wait for compaction so that put won't timeout + dbfull()->TEST_FlushMemTable(true); + count++; + } + ASSERT_EQ(count, 6); + // Unblock + sleeping_task_low2.WakeUp(); + sleeping_task_low2.WaitUntilDone(); + + // Test disable_auto_compactions + ASSERT_TRUE(dbfull()->SetOptions({ + {"disable_auto_compactions", "true"} + })); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); + // Wait for compaction so that put won't timeout + dbfull()->TEST_FlushMemTable(true); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(NumTableFilesAtLevel(0), 4); + + ASSERT_TRUE(dbfull()->SetOptions({ + {"disable_auto_compactions", "false"} + })); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); + // Wait for compaction so that put won't timeout + dbfull()->TEST_FlushMemTable(true); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_LT(NumTableFilesAtLevel(0), 4); } } // namespace rocksdb diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index 02f63fed4..c94f0a497 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -21,6 +21,7 @@ struct MutableCFOptions { options.memtable_prefix_bloom_huge_page_tlb_size), max_successive_merges(options.max_successive_merges), filter_deletes(options.filter_deletes), + disable_auto_compactions(options.disable_auto_compactions), level0_file_num_compaction_trigger( options.level0_file_num_compaction_trigger), level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger), @@ -45,6 +46,7 @@ struct MutableCFOptions { memtable_prefix_bloom_huge_page_tlb_size(0), max_successive_merges(0), filter_deletes(false), + disable_auto_compactions(false), level0_file_num_compaction_trigger(0), level0_slowdown_writes_trigger(0), level0_stop_writes_trigger(0), @@ -80,6 +82,7 @@ struct MutableCFOptions { bool filter_deletes; // Compaction related options + bool disable_auto_compactions; int level0_file_num_compaction_trigger; int level0_slowdown_writes_trigger; int level0_stop_writes_trigger; diff --git a/util/options_helper.cc b/util/options_helper.cc index 2a61c8b69..259666c7b 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -100,7 +100,9 @@ bool ParseMemtableOptions(const std::string& name, const std::string& value, template bool ParseCompactionOptions(const std::string& name, const std::string& value, OptionsType* new_options) { - if (name == "level0_file_num_compaction_trigger") { + if (name == "disable_auto_compactions") { + new_options->disable_auto_compactions = ParseBoolean(name, value); + } else if (name == "level0_file_num_compaction_trigger") { new_options->level0_file_num_compaction_trigger = ParseInt(value); } else if (name == "level0_slowdown_writes_trigger") { new_options->level0_slowdown_writes_trigger = ParseInt(value); @@ -221,8 +223,6 @@ bool GetOptionsFromStrings( new_options->soft_rate_limit = ParseDouble(o.second); } else if (o.first == "hard_rate_limit") { new_options->hard_rate_limit = ParseDouble(o.second); - } else if (o.first == "disable_auto_compactions") { - new_options->disable_auto_compactions = ParseBoolean(o.first, o.second); } else if (o.first == "purge_redundant_kvs_while_flush") { new_options->purge_redundant_kvs_while_flush = ParseBoolean(o.first, o.second);