dynamic soft_rate_limit and hard_rate_limit
Summary: as title Test Plan: unit test I am only able to build the test case for hard_rate_limit. soft_rate_limit is essentially the same thing as hard_rate_limit Reviewers: igor, sdong, yhchiang Reviewed By: yhchiang Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D24759
This commit is contained in:
parent
065a67c4f0
commit
4d5708aa56
@ -354,8 +354,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64
|
||||
"us)",
|
||||
name_.c_str(), current_->NumLevelFiles(0), slowdown);
|
||||
} else if (options_.hard_rate_limit > 1.0 &&
|
||||
score > options_.hard_rate_limit) {
|
||||
} else if (mutable_cf_options.hard_rate_limit > 1.0 &&
|
||||
score > mutable_cf_options.hard_rate_limit) {
|
||||
uint64_t kHardLimitSlowdown = 1000;
|
||||
write_controller_token_ =
|
||||
write_controller->GetDelayToken(kHardLimitSlowdown);
|
||||
@ -365,10 +365,11 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
|
||||
"[%s] Stalling writes because we hit hard limit on level %d. "
|
||||
"(%" PRIu64 "us)",
|
||||
name_.c_str(), max_level, kHardLimitSlowdown);
|
||||
} else if (options_.soft_rate_limit > 0.0 &&
|
||||
score > options_.soft_rate_limit) {
|
||||
uint64_t slowdown = SlowdownAmount(score, options_.soft_rate_limit,
|
||||
options_.hard_rate_limit);
|
||||
} else if (mutable_cf_options.soft_rate_limit > 0.0 &&
|
||||
score > mutable_cf_options.soft_rate_limit) {
|
||||
uint64_t slowdown = SlowdownAmount(score,
|
||||
mutable_cf_options.soft_rate_limit,
|
||||
mutable_cf_options.hard_rate_limit);
|
||||
write_controller_token_ = write_controller->GetDelayToken(slowdown);
|
||||
internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
|
||||
Log(ioptions_.info_log,
|
||||
|
141
db/db_test.cc
141
db/db_test.cc
@ -8651,16 +8651,18 @@ TEST(DBTest, DynamicMemtableOptions) {
|
||||
}
|
||||
|
||||
TEST(DBTest, DynamicCompactionOptions) {
|
||||
// minimum write buffer size is enforced at 64KB
|
||||
const uint64_t k32KB = 1 << 15;
|
||||
const uint64_t k64KB = 1 << 16;
|
||||
const uint64_t k128KB = 1 << 17;
|
||||
const uint64_t k256KB = 1 << 18;
|
||||
const uint64_t k5KB = 5 * 1024;
|
||||
const uint64_t k4KB = 1 << 12;
|
||||
Options options;
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.compression = kNoCompression;
|
||||
options.hard_rate_limit = 1.1;
|
||||
options.write_buffer_size = k128KB;
|
||||
options.write_buffer_size = k64KB;
|
||||
options.max_write_buffer_number = 2;
|
||||
// Compaction related options
|
||||
options.level0_file_num_compaction_trigger = 3;
|
||||
@ -8669,9 +8671,9 @@ TEST(DBTest, DynamicCompactionOptions) {
|
||||
options.max_grandparent_overlap_factor = 10;
|
||||
options.expanded_compaction_factor = 25;
|
||||
options.source_compaction_factor = 1;
|
||||
options.target_file_size_base = k128KB;
|
||||
options.target_file_size_base = k64KB;
|
||||
options.target_file_size_multiplier = 1;
|
||||
options.max_bytes_for_level_base = k256KB;
|
||||
options.max_bytes_for_level_base = k128KB;
|
||||
options.max_bytes_for_level_multiplier = 4;
|
||||
|
||||
// Block flush thread and disable compaction thread
|
||||
@ -8689,65 +8691,71 @@ TEST(DBTest, DynamicCompactionOptions) {
|
||||
|
||||
// Write 3 files that have the same key range, trigger compaction and
|
||||
// result in one L1 file
|
||||
gen_l0_kb(0, 128, 1);
|
||||
gen_l0_kb(0, 64);
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
|
||||
gen_l0_kb(0, 128, 1);
|
||||
gen_l0_kb(0, 64);
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
|
||||
gen_l0_kb(0, 128, 1);
|
||||
gen_l0_kb(0, 64);
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("0,1", FilesPerLevel());
|
||||
std::vector<LiveFileMetaData> metadata;
|
||||
db_->GetLiveFilesMetaData(&metadata);
|
||||
ASSERT_EQ(1U, metadata.size());
|
||||
ASSERT_LE(metadata[0].size, k128KB + k5KB); // < 128KB + 5KB
|
||||
ASSERT_GE(metadata[0].size, k128KB - k5KB); // > 128B - 5KB
|
||||
ASSERT_LE(metadata[0].size, k64KB + k4KB);
|
||||
ASSERT_GE(metadata[0].size, k64KB - k4KB);
|
||||
|
||||
// Make compaction trigger and file size smaller
|
||||
// Test compaction trigger and target_file_size_base
|
||||
ASSERT_TRUE(dbfull()->SetOptions({
|
||||
{"level0_file_num_compaction_trigger", "2"},
|
||||
{"target_file_size_base", "65536"}
|
||||
{"target_file_size_base", std::to_string(k32KB) }
|
||||
}));
|
||||
|
||||
gen_l0_kb(0, 128, 1);
|
||||
gen_l0_kb(0, 64);
|
||||
ASSERT_EQ("1,1", FilesPerLevel());
|
||||
gen_l0_kb(0, 128, 1);
|
||||
gen_l0_kb(0, 64);
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("0,2", FilesPerLevel());
|
||||
metadata.clear();
|
||||
db_->GetLiveFilesMetaData(&metadata);
|
||||
ASSERT_EQ(2U, metadata.size());
|
||||
ASSERT_LE(metadata[0].size, k64KB + k5KB); // < 64KB + 5KB
|
||||
ASSERT_GE(metadata[0].size, k64KB - k5KB); // > 64KB - 5KB
|
||||
ASSERT_LE(metadata[0].size, k32KB + k4KB);
|
||||
ASSERT_GE(metadata[0].size, k32KB - k4KB);
|
||||
|
||||
// Change base level size to 1MB
|
||||
ASSERT_TRUE(dbfull()->SetOptions({ {"max_bytes_for_level_base", "1048576"} }));
|
||||
|
||||
// writing 56 x 128KB => 7MB
|
||||
// (L1 + L2) = (1 + 4) * 1MB = 5MB
|
||||
for (int i = 0; i < 56; ++i) {
|
||||
gen_l0_kb(i, 128, 56);
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_TRUE(SizeAtLevel(1) > 1048576 * 0.9 &&
|
||||
SizeAtLevel(1) < 1048576 * 1.1);
|
||||
ASSERT_TRUE(SizeAtLevel(2) > 4 * 1048576 * 0.9 &&
|
||||
SizeAtLevel(2) < 4 * 1048576 * 1.1);
|
||||
|
||||
// Change multiplier to 2 with smaller base
|
||||
// Test max_bytes_for_level_base
|
||||
ASSERT_TRUE(dbfull()->SetOptions({
|
||||
{"max_bytes_for_level_multiplier", "2"},
|
||||
{"max_bytes_for_level_base", "262144"}
|
||||
{"max_bytes_for_level_base", std::to_string(k256KB) }
|
||||
}));
|
||||
|
||||
// writing 16 x 128KB
|
||||
// (L1 + L2 + L3) = (1 + 2 + 4) * 256KB
|
||||
for (int i = 0; i < 16; ++i) {
|
||||
gen_l0_kb(i, 128, 50);
|
||||
// writing 24 x 64KB => 6 * 256KB
|
||||
// (L1 + L2) = (1 + 4) * 256KB
|
||||
for (int i = 0; i < 24; ++i) {
|
||||
gen_l0_kb(i, 64, 32);
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_TRUE(SizeAtLevel(1) < 262144 * 1.1);
|
||||
ASSERT_TRUE(SizeAtLevel(2) < 2 * 262144 * 1.1);
|
||||
ASSERT_TRUE(SizeAtLevel(3) < 4 * 262144 * 1.1);
|
||||
ASSERT_TRUE(SizeAtLevel(1) > k256KB * 0.8 &&
|
||||
SizeAtLevel(1) < k256KB * 1.2);
|
||||
ASSERT_TRUE(SizeAtLevel(2) > 4 * k256KB * 0.8 &&
|
||||
SizeAtLevel(2) < 4 * k256KB * 1.2);
|
||||
|
||||
// Test max_bytes_for_level_multiplier and
|
||||
// max_bytes_for_level_base (reduce)
|
||||
ASSERT_TRUE(dbfull()->SetOptions({
|
||||
{"max_bytes_for_level_multiplier", "2"},
|
||||
{"max_bytes_for_level_base", std::to_string(k128KB) }
|
||||
}));
|
||||
|
||||
// writing 20 x 64KB = 10 x 128KB
|
||||
// (L1 + L2 + L3) = (1 + 2 + 4) * 128KB
|
||||
for (int i = 0; i < 20; ++i) {
|
||||
gen_l0_kb(i, 64, 32);
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_TRUE(SizeAtLevel(1) > k128KB * 0.8 &&
|
||||
SizeAtLevel(1) < k128KB * 1.2);
|
||||
ASSERT_TRUE(SizeAtLevel(2) > 2 * k128KB * 0.8 &&
|
||||
SizeAtLevel(2) < 2 * k128KB * 1.2);
|
||||
ASSERT_TRUE(SizeAtLevel(3) > 4 * k128KB * 0.8 &&
|
||||
SizeAtLevel(3) < 4 * k128KB * 1.2);
|
||||
|
||||
// Clean up memtable and L0
|
||||
dbfull()->CompactRange(nullptr, nullptr);
|
||||
@ -8761,16 +8769,16 @@ TEST(DBTest, DynamicCompactionOptions) {
|
||||
WriteOptions wo;
|
||||
wo.timeout_hint_us = 10000;
|
||||
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) {
|
||||
// Wait for compaction so that put won't timeout
|
||||
dbfull()->TEST_FlushMemTable(true);
|
||||
count++;
|
||||
}
|
||||
// Stop trigger = 8
|
||||
ASSERT_EQ(count, 8);
|
||||
// Unblock
|
||||
sleeping_task_low1.WakeUp();
|
||||
sleeping_task_low1.WaitUntilDone();
|
||||
|
||||
// Reduce stop trigger
|
||||
// Test: stop trigger (reduce)
|
||||
ASSERT_TRUE(dbfull()->SetOptions({
|
||||
{"level0_stop_writes_trigger", "6"}
|
||||
}));
|
||||
@ -8783,7 +8791,6 @@ TEST(DBTest, DynamicCompactionOptions) {
|
||||
Env::Priority::LOW);
|
||||
count = 0;
|
||||
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) {
|
||||
// Wait for compaction so that put won't timeout
|
||||
dbfull()->TEST_FlushMemTable(true);
|
||||
count++;
|
||||
}
|
||||
@ -8820,6 +8827,56 @@ TEST(DBTest, DynamicCompactionOptions) {
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_LT(NumTableFilesAtLevel(0), 4);
|
||||
|
||||
// Test for hard_rate_limit, change max_bytes_for_level_base to make level
|
||||
// size big
|
||||
ASSERT_TRUE(dbfull()->SetOptions({
|
||||
{"max_bytes_for_level_base", std::to_string(k256KB) }
|
||||
}));
|
||||
// writing 40 x 64KB = 10 x 256KB
|
||||
// (L1 + L2 + L3) = (1 + 2 + 4) * 256KB
|
||||
for (int i = 0; i < 40; ++i) {
|
||||
gen_l0_kb(i, 64, 32);
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_TRUE(SizeAtLevel(1) > k256KB * 0.8 &&
|
||||
SizeAtLevel(1) < k256KB * 1.2);
|
||||
ASSERT_TRUE(SizeAtLevel(2) > 2 * k256KB * 0.8 &&
|
||||
SizeAtLevel(2) < 2 * k256KB * 1.2);
|
||||
ASSERT_TRUE(SizeAtLevel(3) > 4 * k256KB * 0.8 &&
|
||||
SizeAtLevel(3) < 4 * k256KB * 1.2);
|
||||
// Reduce max_bytes_for_level_base and disable compaction at the same time
|
||||
// This should cause score to increase
|
||||
ASSERT_TRUE(dbfull()->SetOptions({
|
||||
{"disable_auto_compactions", "true"},
|
||||
{"max_bytes_for_level_base", "65536"},
|
||||
}));
|
||||
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024)));
|
||||
dbfull()->TEST_FlushMemTable(true);
|
||||
|
||||
// Check score is above 2
|
||||
ASSERT_TRUE(SizeAtLevel(1) / k64KB > 2 ||
|
||||
SizeAtLevel(2) / k64KB > 4 ||
|
||||
SizeAtLevel(3) / k64KB > 8);
|
||||
|
||||
// Enfoce hard rate limit, L0 score is not regulated by this limit
|
||||
ASSERT_TRUE(dbfull()->SetOptions({
|
||||
{"hard_rate_limit", "2"}
|
||||
}));
|
||||
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024)));
|
||||
dbfull()->TEST_FlushMemTable(true);
|
||||
|
||||
// Hard rate limit slow down for 1000 us, so default 10ms should be ok
|
||||
ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok());
|
||||
wo.timeout_hint_us = 500;
|
||||
ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).IsTimedOut());
|
||||
|
||||
// Bump up limit
|
||||
ASSERT_TRUE(dbfull()->SetOptions({
|
||||
{"hard_rate_limit", "100"}
|
||||
}));
|
||||
dbfull()->TEST_FlushMemTable(true);
|
||||
ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok());
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -23,6 +23,8 @@ struct MutableCFOptions {
|
||||
max_successive_merges(options.max_successive_merges),
|
||||
filter_deletes(options.filter_deletes),
|
||||
disable_auto_compactions(options.disable_auto_compactions),
|
||||
soft_rate_limit(options.soft_rate_limit),
|
||||
hard_rate_limit(options.hard_rate_limit),
|
||||
level0_file_num_compaction_trigger(
|
||||
options.level0_file_num_compaction_trigger),
|
||||
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
|
||||
@ -49,6 +51,8 @@ struct MutableCFOptions {
|
||||
max_successive_merges(0),
|
||||
filter_deletes(false),
|
||||
disable_auto_compactions(false),
|
||||
soft_rate_limit(0),
|
||||
hard_rate_limit(0),
|
||||
level0_file_num_compaction_trigger(0),
|
||||
level0_slowdown_writes_trigger(0),
|
||||
level0_stop_writes_trigger(0),
|
||||
@ -86,6 +90,8 @@ struct MutableCFOptions {
|
||||
|
||||
// Compaction related options
|
||||
bool disable_auto_compactions;
|
||||
double soft_rate_limit;
|
||||
double hard_rate_limit;
|
||||
int level0_file_num_compaction_trigger;
|
||||
int level0_slowdown_writes_trigger;
|
||||
int level0_stop_writes_trigger;
|
||||
|
@ -105,6 +105,10 @@ bool ParseCompactionOptions(const std::string& name, const std::string& value,
|
||||
OptionsType* new_options) {
|
||||
if (name == "disable_auto_compactions") {
|
||||
new_options->disable_auto_compactions = ParseBoolean(name, value);
|
||||
} else if (name == "soft_rate_limit") {
|
||||
new_options->soft_rate_limit = ParseDouble(value);
|
||||
} else if (name == "hard_rate_limit") {
|
||||
new_options->hard_rate_limit = ParseDouble(value);
|
||||
} else if (name == "level0_file_num_compaction_trigger") {
|
||||
new_options->level0_file_num_compaction_trigger = ParseInt(value);
|
||||
} else if (name == "level0_slowdown_writes_trigger") {
|
||||
@ -268,10 +272,6 @@ bool GetColumnFamilyOptionsFromMap(
|
||||
new_options->num_levels = ParseInt(o.second);
|
||||
} else if (o.first == "max_mem_compaction_level") {
|
||||
new_options->max_mem_compaction_level = ParseInt(o.second);
|
||||
} else if (o.first == "soft_rate_limit") {
|
||||
new_options->soft_rate_limit = ParseDouble(o.second);
|
||||
} else if (o.first == "hard_rate_limit") {
|
||||
new_options->hard_rate_limit = ParseDouble(o.second);
|
||||
} else if (o.first == "purge_redundant_kvs_while_flush") {
|
||||
new_options->purge_redundant_kvs_while_flush =
|
||||
ParseBoolean(o.first, o.second);
|
||||
|
Loading…
Reference in New Issue
Block a user