Add rate_delay_limit_milliseconds
Summary: This adds the rate_delay_limit_milliseconds option to make the delay configurable in MakeRoomForWrite when the max compaction score is too high. This delay is called the Ln slowdown. This change also counts the Ln slowdown per level to make it possible to see where the stalls occur. From IO-bound performance testing, the Level N stalls occur: * with compression -> at the largest uncompressed level. This makes sense because compaction for compressed levels is much slower. When Lx is uncompressed and Lx+1 is compressed then files pile up at Lx because the (Lx,Lx+1)->Lx+1 compaction process is the first to be slowed by compression. * without compression -> at level 1 Task ID: #1832108 Blame Rev: Test Plan: run with real data, added test Revert Plan: Database Impact: Memcache Impact: Other Notes: EImportant: - begin *PUBLIC* platform impact section - Bugzilla: # - end platform impact - Reviewers: dhruba Reviewed By: dhruba Differential Revision: https://reviews.facebook.net/D9045
This commit is contained in:
parent
806e264350
commit
993543d1be
@ -232,6 +232,9 @@ static int FLAGS_stats_per_interval = 0;
|
||||
// less than or equal to this value.
|
||||
static double FLAGS_rate_limit = 0;
|
||||
|
||||
// When FLAGS_rate_limit is set then this is the max time a put will be stalled.
|
||||
static int FLAGS_rate_limit_delay_milliseconds = 1000;
|
||||
|
||||
// Control maximum bytes of overlaps in grandparent (i.e., level+2) before we
|
||||
// stop building a single file in a level->level+1 compaction.
|
||||
static int FLAGS_max_grandparent_overlap_factor = 10;
|
||||
@ -1029,6 +1032,7 @@ unique_ptr<char []> GenerateKeyFromInt(int v)
|
||||
options.delete_obsolete_files_period_micros =
|
||||
FLAGS_delete_obsolete_files_period_micros;
|
||||
options.rate_limit = FLAGS_rate_limit;
|
||||
options.rate_limit_delay_milliseconds = FLAGS_rate_limit_delay_milliseconds;
|
||||
options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
|
||||
options.max_grandparent_overlap_factor =
|
||||
FLAGS_max_grandparent_overlap_factor;
|
||||
@ -1652,6 +1656,10 @@ int main(int argc, char** argv) {
|
||||
} else if (sscanf(argv[i], "--rate_limit=%lf%c", &d, &junk) == 1 &&
|
||||
d > 1.0) {
|
||||
FLAGS_rate_limit = d;
|
||||
} else if (sscanf(argv[i],
|
||||
"--rate_limit_delay_milliseconds=%d%c", &n, &junk) == 1
|
||||
&& n > 0) {
|
||||
FLAGS_rate_limit_delay_milliseconds = n;
|
||||
} else if (sscanf(argv[i], "--readonly=%d%c", &n, &junk) == 1 &&
|
||||
(n == 0 || n ==1 )) {
|
||||
FLAGS_read_only = n;
|
||||
|
@ -159,7 +159,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
stall_level0_slowdown_(0),
|
||||
stall_memtable_compaction_(0),
|
||||
stall_level0_num_files_(0),
|
||||
stall_leveln_slowdown_(0),
|
||||
started_at_(options.env->NowMicros()),
|
||||
flush_on_destroy_(false),
|
||||
delayed_writes_(0) {
|
||||
@ -167,6 +166,11 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
|
||||
env_->GetAbsolutePath(dbname, &db_absolute_path_);
|
||||
stats_ = new CompactionStats[options.num_levels];
|
||||
|
||||
stall_leveln_slowdown_.resize(options.num_levels);
|
||||
for (int i = 0; i < options.num_levels; ++i)
|
||||
stall_leveln_slowdown_[i] = 0;
|
||||
|
||||
// Reserve ten files or so for other uses and give the rest to TableCache.
|
||||
const int table_cache_size = options_.max_open_files - 10;
|
||||
table_cache_.reset(new TableCache(dbname_, &options_, table_cache_size));
|
||||
@ -2042,6 +2046,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
|
||||
mutex_.AssertHeld();
|
||||
assert(!writers_.empty());
|
||||
bool allow_delay = !force;
|
||||
bool allow_rate_limit_delay = !force;
|
||||
uint64_t rate_limit_delay_millis = 0;
|
||||
Status s;
|
||||
double score;
|
||||
|
||||
@ -2095,19 +2101,24 @@ Status DBImpl::MakeRoomForWrite(bool force) {
|
||||
bg_cv_.Wait();
|
||||
stall_level0_num_files_ += env_->NowMicros() - t1;
|
||||
} else if (
|
||||
allow_delay &&
|
||||
allow_rate_limit_delay &&
|
||||
options_.rate_limit > 1.0 &&
|
||||
(score = versions_->MaxCompactionScore()) > options_.rate_limit) {
|
||||
// Delay a write when the compaction score for any level is too large.
|
||||
int max_level = versions_->MaxCompactionScoreLevel();
|
||||
mutex_.Unlock();
|
||||
uint64_t t1 = env_->NowMicros();
|
||||
env_->SleepForMicroseconds(1000);
|
||||
uint64_t delayed = env_->NowMicros() - t1;
|
||||
stall_leveln_slowdown_ += delayed;
|
||||
allow_delay = false; // Do not delay a single write more than once
|
||||
Log(options_.info_log,
|
||||
"delaying write %llu usecs for rate limits with max score %.2f\n",
|
||||
(long long unsigned int)delayed, score);
|
||||
stall_leveln_slowdown_[max_level] += delayed;
|
||||
// Make sure the following value doesn't round to zero.
|
||||
rate_limit_delay_millis += std::max((delayed / 1000), (uint64_t) 1);
|
||||
if (rate_limit_delay_millis >= options_.rate_limit_delay_milliseconds) {
|
||||
allow_rate_limit_delay = false;
|
||||
}
|
||||
// Log(options_.info_log,
|
||||
// "delaying write %llu usecs for rate limits with max score %.2f\n",
|
||||
// (long long unsigned int)delayed, score);
|
||||
mutex_.Lock();
|
||||
} else {
|
||||
// Attempt to switch to a new memtable and trigger compaction of old
|
||||
@ -2163,12 +2174,13 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
|
||||
uint64_t total_bytes = 0;
|
||||
uint64_t micros_up = env_->NowMicros() - started_at_;
|
||||
double seconds_up = micros_up / 1000000.0;
|
||||
uint64_t total_slowdown = 0;
|
||||
|
||||
// Pardon the long line but I think it is easier to read this way.
|
||||
snprintf(buf, sizeof(buf),
|
||||
" Compactions\n"
|
||||
"Level Files Size(MB) Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count\n"
|
||||
"------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
|
||||
"Level Files Size(MB) Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall\n"
|
||||
"----------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
|
||||
);
|
||||
value->append(buf);
|
||||
for (int level = 0; level < NumberLevels(); level++) {
|
||||
@ -2186,7 +2198,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
|
||||
total_bytes += bytes_read + stats_[level].bytes_written;
|
||||
snprintf(
|
||||
buf, sizeof(buf),
|
||||
"%3d %8d %8.0f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d\n",
|
||||
"%3d %8d %8.0f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f\n",
|
||||
level,
|
||||
files,
|
||||
versions_->NumLevelBytes(level) / 1048576.0,
|
||||
@ -2204,7 +2216,9 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
|
||||
stats_[level].files_in_levelnp1,
|
||||
stats_[level].files_out_levelnp1,
|
||||
stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1,
|
||||
stats_[level].count);
|
||||
stats_[level].count,
|
||||
stall_leveln_slowdown_[level] / 1000000.0);
|
||||
total_slowdown += stall_leveln_slowdown_[level];
|
||||
value->append(buf);
|
||||
}
|
||||
}
|
||||
@ -2227,7 +2241,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
|
||||
stall_level0_slowdown_ / 1000000.0,
|
||||
stall_level0_num_files_ / 1000000.0,
|
||||
stall_memtable_compaction_ / 1000000.0,
|
||||
stall_leveln_slowdown_ / 1000000.0);
|
||||
total_slowdown / 1000000.0);
|
||||
value->append(buf);
|
||||
|
||||
return true;
|
||||
|
@ -265,7 +265,7 @@ class DBImpl : public DB {
|
||||
uint64_t stall_level0_slowdown_;
|
||||
uint64_t stall_memtable_compaction_;
|
||||
uint64_t stall_level0_num_files_;
|
||||
uint64_t stall_leveln_slowdown_;
|
||||
std::vector<uint64_t> stall_leveln_slowdown_;
|
||||
|
||||
// Time at which this instance was started.
|
||||
const uint64_t started_at_;
|
||||
|
@ -212,6 +212,7 @@ class DBTest {
|
||||
kDBLogDir,
|
||||
kManifestFileSize,
|
||||
kCompactOnFlush,
|
||||
kPerfOptions,
|
||||
kEnd
|
||||
};
|
||||
int option_config_;
|
||||
@ -271,6 +272,12 @@ class DBTest {
|
||||
options.max_manifest_file_size = 50; // 50 bytes
|
||||
case kCompactOnFlush:
|
||||
options.purge_redundant_kvs_while_flush = !options.purge_redundant_kvs_while_flush;
|
||||
break;
|
||||
case kPerfOptions:
|
||||
options.rate_limit = 2.0;
|
||||
options.rate_limit_delay_milliseconds = 2;
|
||||
// TODO -- test more options
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -1376,6 +1376,8 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
|
||||
void VersionSet::Finalize(Version* v) {
|
||||
|
||||
double max_score = 0;
|
||||
int max_score_level = 0;
|
||||
|
||||
for (int level = 0; level < NumberLevels()-1; level++) {
|
||||
double score;
|
||||
if (level == 0) {
|
||||
@ -1421,6 +1423,7 @@ void VersionSet::Finalize(Version* v) {
|
||||
}
|
||||
if (max_score < score) {
|
||||
max_score = score;
|
||||
max_score_level = level;
|
||||
}
|
||||
}
|
||||
v->compaction_level_[level] = level;
|
||||
@ -1429,6 +1432,7 @@ void VersionSet::Finalize(Version* v) {
|
||||
|
||||
// update the max compaction score in levels 1 to n-1
|
||||
v->max_compaction_score_ = max_score;
|
||||
v->max_compaction_score_level_ = max_score_level;
|
||||
|
||||
// sort all the levels based on their score. Higher scores get listed
|
||||
// first. Use bubble sort because the number of entries are small.
|
||||
|
@ -176,6 +176,7 @@ class Version {
|
||||
std::vector<double> compaction_score_;
|
||||
std::vector<int> compaction_level_;
|
||||
double max_compaction_score_; // max score in l1 to ln-1
|
||||
int max_compaction_score_level_; // level on which max score occurs
|
||||
|
||||
// The offset in the manifest file where this version is stored.
|
||||
uint64_t offset_manifest_file_;
|
||||
@ -315,6 +316,11 @@ class VersionSet {
|
||||
return current_->max_compaction_score_;
|
||||
}
|
||||
|
||||
// See field declaration
|
||||
int MaxCompactionScoreLevel() const {
|
||||
return current_->max_compaction_score_level_;
|
||||
}
|
||||
|
||||
// Add all files listed in any live version to *live.
|
||||
// May also mutate some internal state.
|
||||
void AddLiveFiles(std::set<uint64_t>* live);
|
||||
|
@ -324,6 +324,9 @@ struct Options {
|
||||
// exceeds rate_limit. This is ignored when <= 1.0.
|
||||
double rate_limit;
|
||||
|
||||
// Max time a put will be stalled when rate_limit is enforced
|
||||
int rate_limit_delay_milliseconds;
|
||||
|
||||
// manifest file is rolled over on reaching this limit.
|
||||
// The older manifest file be deleted.
|
||||
// The default value is MAX_INT so that roll-over does not take place.
|
||||
|
@ -51,6 +51,7 @@ Options::Options()
|
||||
log_file_time_to_roll(0),
|
||||
keep_log_file_num(1000),
|
||||
rate_limit(0.0),
|
||||
rate_limit_delay_milliseconds(1000),
|
||||
max_manifest_file_size(std::numeric_limits<uint64_t>::max()),
|
||||
no_block_cache(false),
|
||||
table_cache_numshardbits(4),
|
||||
@ -146,6 +147,8 @@ Options::Dump(Logger* log) const
|
||||
max_background_compactions);
|
||||
Log(log," Options.rate_limit: %.2f",
|
||||
rate_limit);
|
||||
Log(log," Options.rate_limit_delay_milliseconds: %d",
|
||||
rate_limit_delay_milliseconds);
|
||||
Log(log," Options.compaction_filter_args: %p",
|
||||
compaction_filter_args);
|
||||
Log(log," Options.CompactionFilter: %p",
|
||||
|
Loading…
Reference in New Issue
Block a user