diff --git a/db/column_family.cc b/db/column_family.cc index 68c1875c7..88bf0339b 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -454,6 +454,16 @@ void ColumnFamilyData::RecalculateWriteStallConditions( Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] Stopping writes because we have %d level-0 files", name_.c_str(), vstorage->l0_delay_trigger_count()); + } else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && + vstorage->estimated_compaction_needed_bytes() >= + mutable_cf_options.hard_pending_compaction_bytes_limit) { + write_controller_token_ = write_controller->GetStopToken(); + internal_stats_->AddCFStats( + InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1); + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "[%s] Stopping writes because estimated pending compaction " + "bytes exceed %" PRIu64, + name_.c_str(), vstorage->estimated_compaction_needed_bytes()); } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_slowdown_writes_trigger) { diff --git a/db/db_bench.cc b/db/db_bench.cc index a98e7c7d3..9f9239734 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -611,9 +611,10 @@ static bool ValidateRateLimit(const char* flagname, double value) { } DEFINE_double(soft_rate_limit, 0.0, ""); -DEFINE_double(hard_rate_limit, 0.0, "When not equal to 0 this make threads " - "sleep at each stats reporting interval until the compaction" - " score for all levels is less than or equal to this value."); +DEFINE_double(hard_rate_limit, 0.0, "DEPRECATED"); + +DEFINE_uint64(hard_pending_compaction_bytes_limit, 128u * 1024 * 1024 * 1024, + "Stop writes if pending compaction bytes exceed this number"); DEFINE_uint64(delayed_write_rate, 2097152u, "Limited bytes allowed to DB when soft_rate_limit or " @@ -2431,6 +2432,8 @@ class Benchmark { } options.soft_rate_limit = FLAGS_soft_rate_limit; options.hard_rate_limit = FLAGS_hard_rate_limit; + options.hard_pending_compaction_bytes_limit = + FLAGS_hard_pending_compaction_bytes_limit; options.delayed_write_rate = FLAGS_delayed_write_rate; options.rate_limit_delay_max_milliseconds = FLAGS_rate_limit_delay_max_milliseconds; diff --git a/db/db_test.cc b/db/db_test.cc index f84c31904..6f2307889 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8513,6 +8513,53 @@ TEST_F(DBTest, DelayedWriteRate) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBTest, HardLimit) { + Options options; + options.env = env_; + env_->SetBackgroundThreads(1, Env::LOW); + options = CurrentOptions(options); + options.max_write_buffer_number = 256; + options.write_buffer_size = 110 << 10; // 110KB + options.arena_block_size = 4 * 1024; + options.level0_file_num_compaction_trigger = 4; + options.level0_slowdown_writes_trigger = 999999; + options.level0_stop_writes_trigger = 999999; + options.hard_pending_compaction_bytes_limit = 800 << 10; + options.max_bytes_for_level_base = 10000000000u; + options.max_background_compactions = 1; + + env_->SetBackgroundThreads(1, Env::LOW); + SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + + CreateAndReopenWithCF({"pikachu"}, options); + + std::atomic callback_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack("DBImpl::DelayWrite:Wait", + [&](void* arg) { + callback_count.fetch_add(1); + sleeping_task_low.WakeUp(); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + int key_idx = 0; + for (int num = 0; num < 5; num++) { + GenerateNewFile(&rnd, &key_idx, true); + } + + ASSERT_EQ(0, callback_count.load()); + + for (int num = 0; num < 5; num++) { + GenerateNewFile(&rnd, &key_idx, true); + dbfull()->TEST_WaitForFlushMemTable(); + } + ASSERT_GE(callback_count.load(), 1); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBTest, SoftLimit) { Options options; options.env = env_; diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 370dc4ed3..41f065b9e 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -669,11 +669,13 @@ void InternalStats::DumpCFStats(std::string* value) { total_files += files; total_files_being_compacted += files_being_compacted[level]; if (comp_stats_[level].micros > 0 || files > 0) { - uint64_t stalls = level == 0 ? (cf_stats_count_[LEVEL0_SLOWDOWN_TOTAL] + - cf_stats_count_[LEVEL0_NUM_FILES_TOTAL] + - cf_stats_count_[MEMTABLE_COMPACTION]) - : (stall_leveln_slowdown_count_soft_[level] + - stall_leveln_slowdown_count_hard_[level]); + uint64_t stalls = + level == 0 ? (cf_stats_count_[LEVEL0_SLOWDOWN_TOTAL] + + cf_stats_count_[LEVEL0_NUM_FILES_TOTAL] + + cf_stats_count_[HARD_PENDING_COMPACTION_BYTES_LIMIT] + + cf_stats_count_[MEMTABLE_COMPACTION]) + : (stall_leveln_slowdown_count_soft_[level] + + stall_leveln_slowdown_count_hard_[level]); stats_sum.Add(comp_stats_[level]); total_file_size += vstorage->NumLevelBytes(level); @@ -715,20 +717,28 @@ void InternalStats::DumpCFStats(std::string* value) { curr_ingest / kGB, interval_ingest / kGB); value->append(buf); - snprintf(buf, sizeof(buf), - "Stalls(count): %" PRIu64 " level0_slowdown, " - "%" PRIu64 " level0_slowdown_with_compaction, " - "%" PRIu64 " level0_numfiles, " - "%" PRIu64 " level0_numfiles_with_compaction, " - "%" PRIu64 " memtable_compaction, " - "%" PRIu64 " leveln_slowdown_soft, " - "%" PRIu64 " leveln_slowdown_hard\n", + snprintf(buf, sizeof(buf), "Stalls(count): %" PRIu64 + " level0_slowdown, " + "%" PRIu64 + " level0_slowdown_with_compaction, " + "%" PRIu64 + " level0_numfiles, " + "%" PRIu64 + " level0_numfiles_with_compaction, " + "%" PRIu64 + " pending_compaction_bytes, " + "%" PRIu64 + " memtable_compaction, " + "%" PRIu64 + " leveln_slowdown_soft, " + "%" PRIu64 " leveln_slowdown_hard\n", cf_stats_count_[LEVEL0_SLOWDOWN_TOTAL], cf_stats_count_[LEVEL0_SLOWDOWN_WITH_COMPACTION], cf_stats_count_[LEVEL0_NUM_FILES_TOTAL], cf_stats_count_[LEVEL0_NUM_FILES_WITH_COMPACTION], - cf_stats_count_[MEMTABLE_COMPACTION], - total_slowdown_count_soft, total_slowdown_count_hard); + cf_stats_count_[HARD_PENDING_COMPACTION_BYTES_LIMIT], + cf_stats_count_[MEMTABLE_COMPACTION], total_slowdown_count_soft, + total_slowdown_count_hard); value->append(buf); cf_stats_snapshot_.ingest_bytes = curr_ingest; diff --git a/db/internal_stats.h b/db/internal_stats.h index 139a4544b..eeb226e5e 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -85,6 +85,7 @@ class InternalStats { MEMTABLE_COMPACTION, LEVEL0_NUM_FILES_TOTAL, LEVEL0_NUM_FILES_WITH_COMPACTION, + HARD_PENDING_COMPACTION_BYTES_LIMIT, WRITE_STALLS_ENUM_MAX, BYTES_FLUSHED, INTERNAL_CF_STATS_ENUM_MAX, @@ -357,6 +358,7 @@ class InternalStats { MEMTABLE_COMPACTION, LEVEL0_NUM_FILES_TOTAL, LEVEL0_NUM_FILES_WITH_COMPACTION, + HARD_PENDING_COMPACTION_BYTES_LIMIT, WRITE_STALLS_ENUM_MAX, BYTES_FLUSHED, INTERNAL_CF_STATS_ENUM_MAX, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 23b8507e1..12053b039 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -499,8 +499,6 @@ struct ColumnFamilyOptions { // Puts are delayed to options.delayed_write_rate when any level has a // compaction score that exceeds soft_rate_limit. This is ignored when == 0.0. - // CONSTRAINT: soft_rate_limit <= hard_rate_limit. If this constraint does not - // hold, RocksDB will set soft_rate_limit = hard_rate_limit // // Default: 0 (disabled) // @@ -510,6 +508,12 @@ struct ColumnFamilyOptions { // DEPRECATED -- this options is no longer usde double hard_rate_limit; + // All writes are stopped if estimated bytes needed to be compaction exceed + // this threshold. + // + // Default: 0 (disabled) + uint64_t hard_pending_compaction_bytes_limit; + // DEPRECATED -- this options is no longer used unsigned int rate_limit_delay_max_milliseconds; diff --git a/util/mutable_cf_options.cc b/util/mutable_cf_options.cc index b73f0c6d0..fafd15415 100644 --- a/util/mutable_cf_options.cc +++ b/util/mutable_cf_options.cc @@ -3,6 +3,8 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include "util/mutable_cf_options.h" + #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS #endif @@ -15,7 +17,6 @@ #include "rocksdb/env.h" #include "rocksdb/options.h" #include "rocksdb/immutable_options.h" -#include "util/mutable_cf_options.h" namespace rocksdb { @@ -83,8 +84,8 @@ void MutableCFOptions::Dump(Logger* log) const { disable_auto_compactions); Log(log, " soft_rate_limit: %lf", soft_rate_limit); - Log(log, " hard_rate_limit: %lf", - hard_rate_limit); + Log(log, " hard_pending_compaction_bytes_limit: %" PRIu64, + hard_pending_compaction_bytes_limit); Log(log, " level0_file_num_compaction_trigger: %d", level0_file_num_compaction_trigger); Log(log, " level0_slowdown_writes_trigger: %d", diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index 4110eccd8..3ce5ce36e 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -25,7 +25,8 @@ struct MutableCFOptions { inplace_update_num_locks(options.inplace_update_num_locks), disable_auto_compactions(options.disable_auto_compactions), soft_rate_limit(options.soft_rate_limit), - hard_rate_limit(options.hard_rate_limit), + hard_pending_compaction_bytes_limit( + options.hard_pending_compaction_bytes_limit), level0_file_num_compaction_trigger( options.level0_file_num_compaction_trigger), level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger), @@ -61,7 +62,7 @@ struct MutableCFOptions { inplace_update_num_locks(0), disable_auto_compactions(false), soft_rate_limit(0), - hard_rate_limit(0), + hard_pending_compaction_bytes_limit(0), level0_file_num_compaction_trigger(0), level0_slowdown_writes_trigger(0), level0_stop_writes_trigger(0), @@ -112,7 +113,7 @@ struct MutableCFOptions { // Compaction related options bool disable_auto_compactions; double soft_rate_limit; - double hard_rate_limit; + uint64_t hard_pending_compaction_bytes_limit; int level0_file_num_compaction_trigger; int level0_slowdown_writes_trigger; int level0_stop_writes_trigger; diff --git a/util/options.cc b/util/options.cc index 7f3bf75c7..9098221ff 100644 --- a/util/options.cc +++ b/util/options.cc @@ -102,7 +102,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() source_compaction_factor(1), max_grandparent_overlap_factor(10), soft_rate_limit(0.0), - hard_rate_limit(0.0), + hard_pending_compaction_bytes_limit(0), rate_limit_delay_max_milliseconds(1000), arena_block_size(0), disable_auto_compactions(false), @@ -161,7 +161,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) source_compaction_factor(options.source_compaction_factor), max_grandparent_overlap_factor(options.max_grandparent_overlap_factor), soft_rate_limit(options.soft_rate_limit), - hard_rate_limit(options.hard_rate_limit), + hard_pending_compaction_bytes_limit( + options.hard_pending_compaction_bytes_limit), rate_limit_delay_max_milliseconds( options.rate_limit_delay_max_milliseconds), arena_block_size(options.arena_block_size), @@ -473,8 +474,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { arena_block_size); Warn(log, " Options.soft_rate_limit: %.2f", soft_rate_limit); - Warn(log, " Options.hard_rate_limit: %.2f", - hard_rate_limit); + Warn(log, " Options.hard_pending_compaction_bytes_limit: %" PRIu64, + hard_pending_compaction_bytes_limit); Warn(log, " Options.rate_limit_delay_max_milliseconds: %u", rate_limit_delay_max_milliseconds); Warn(log, " Options.disable_auto_compactions: %d", diff --git a/util/options_helper.cc b/util/options_helper.cc index ccbad3e0b..5ddd9a708 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -281,8 +281,11 @@ bool ParseCompactionOptions(const std::string& name, const std::string& value, new_options->disable_auto_compactions = ParseBoolean(name, value); } else if (name == "soft_rate_limit") { new_options->soft_rate_limit = ParseDouble(value); + } else if (name == "hard_pending_compaction_bytes_limit") { + new_options->hard_pending_compaction_bytes_limit = ParseUint64(value); } else if (name == "hard_rate_limit") { - new_options->hard_rate_limit = ParseDouble(value); + // Deprecated options but still leave it here to avoid older options + // strings can be consumed. } else if (name == "level0_file_num_compaction_trigger") { new_options->level0_file_num_compaction_trigger = ParseInt(value); } else if (name == "level0_slowdown_writes_trigger") { diff --git a/util/options_helper.h b/util/options_helper.h index c5b2e8e49..814cc23ff 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -194,6 +194,9 @@ static std::unordered_map cf_options_type_info = { {"verify_checksums_in_compaction", {offsetof(struct ColumnFamilyOptions, verify_checksums_in_compaction), OptionType::kBoolean}}, + {"hard_pending_compaction_bytes_limit", + {offsetof(struct ColumnFamilyOptions, hard_pending_compaction_bytes_limit), + OptionType::kUInt64T}}, {"hard_rate_limit", {offsetof(struct ColumnFamilyOptions, hard_rate_limit), OptionType::kDouble}}, diff --git a/util/options_test.cc b/util/options_test.cc index b46c878e1..49095e17e 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -122,6 +122,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"max_grandparent_overlap_factor", "21"}, {"soft_rate_limit", "1.1"}, {"hard_rate_limit", "2.1"}, + {"hard_pending_compaction_bytes_limit", "211"}, {"arena_block_size", "22"}, {"disable_auto_compactions", "true"}, {"compaction_style", "kCompactionStyleLevel"}, @@ -214,7 +215,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.source_compaction_factor, 20); ASSERT_EQ(new_cf_opt.max_grandparent_overlap_factor, 21); ASSERT_EQ(new_cf_opt.soft_rate_limit, 1.1); - ASSERT_EQ(new_cf_opt.hard_rate_limit, 2.1); + ASSERT_EQ(new_cf_opt.hard_pending_compaction_bytes_limit, 211); ASSERT_EQ(new_cf_opt.arena_block_size, 22U); ASSERT_EQ(new_cf_opt.disable_auto_compactions, true); ASSERT_EQ(new_cf_opt.compaction_style, kCompactionStyleLevel); @@ -667,7 +668,8 @@ void VerifyColumnFamilyOptions(const ColumnFamilyOptions& base_opt, new_opt.verify_checksums_in_compaction); // double options - VerifyDouble(base_opt.hard_rate_limit, new_opt.hard_rate_limit); + ASSERT_EQ(base_opt.hard_pending_compaction_bytes_limit, + new_opt.hard_pending_compaction_bytes_limit); VerifyDouble(base_opt.soft_rate_limit, new_opt.soft_rate_limit); // int options @@ -746,7 +748,6 @@ TEST_F(OptionsTest, ColumnFamilyOptionsSerialization) { base_opt.verify_checksums_in_compaction = rnd.Uniform(2); // double options - base_opt.hard_rate_limit = static_cast(rnd.Uniform(10000)) / 13; base_opt.soft_rate_limit = static_cast(rnd.Uniform(10000)) / 13; // int options @@ -782,6 +783,7 @@ TEST_F(OptionsTest, ColumnFamilyOptionsSerialization) { static const uint64_t uint_max = static_cast(UINT_MAX); base_opt.max_sequential_skip_in_iterations = uint_max + rnd.Uniform(10000); base_opt.target_file_size_base = uint_max + rnd.Uniform(10000); + base_opt.hard_pending_compaction_bytes_limit = uint_max + rnd.Uniform(10000); // unsigned int options base_opt.rate_limit_delay_max_milliseconds = rnd.Uniform(10000);