Improve statistics
Summary: This adds more statistics to be reported by GetProperty("leveldb.stats"). The new stats include time spent waiting on stalls in MakeRoomForWrite. This also includes the total amplification rate where that is: (#bytes of sequential IO during compaction) / (#bytes from Put) This also includes a lot more data for the per-level compaction report. * Rn(MB) - MB read from level N during compaction between levels N and N+1 * Rnp1(MB) - MB read from level N+1 during compaction between levels N and N+1 * Wnew(MB) - new data written to the level during compaction * Amplify - ( Write(MB) + Rnp1(MB) ) / Rn(MB) * Rn - files read from level N during compaction between levels N and N+1 * Rnp1 - files read from level N+1 during compaction between levels N and N+1 * Wnp1 - files written to level N+1 during compaction between levels N and N+1 * NewW - new files written to level N+1 during compaction * Count - number of compactions done for this level This is the new output from DB::GetProperty("leveldb.stats"). The old output stopped at Write(MB) Compactions Level Files Size(MB) Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count ------------------------------------------------------------------------------------------------------------------------------------- 0 3 6 33 0 576 0 0 576 -1.0 0.0 1.3 0 0 0 0 290 1 127 242 351 5316 5314 570 4747 567 17.0 12.1 12.1 287 2399 2685 286 32 2 161 328 54 822 824 326 496 328 4.0 1.9 1.9 160 251 411 160 161 Amplification: 22.3 rate, 0.56 GB in, 12.55 GB out Uptime(secs): 439.8 Stalls(secs): 206.938 level0_slowdown, 0.000 level0_numfiles, 24.129 memtable_compaction Task ID: # Blame Rev: Test Plan: run db_bench Revert Plan: Database Impact: Memcache Impact: Other Notes: EImportant: - begin *PUBLIC* platform impact section - Bugzilla: # - end platform impact - (cherry picked from commit ecdeead38f86cc02e754d0032600742c4f02fec8) Reviewers: dhruba Differential Revision: https://reviews.facebook.net/D6153
This commit is contained in:
parent
51d2adfbeb
commit
e7206f43ee
@ -159,6 +159,9 @@ static int FLAGS_level0_stop_writes_trigger = 12;
|
||||
// Number of files in level-0 that will slow down writes.
|
||||
static int FLAGS_level0_slowdown_writes_trigger = 8;
|
||||
|
||||
// Number of files in level-0 when compactions start
|
||||
static int FLAGS_level0_file_num_compaction_trigger = 4;
|
||||
|
||||
// Ratio of reads to writes (expressed as a percentage)
|
||||
// for the ReadRandomWriteRandom workload. The default
|
||||
// setting is 9 gets for every 1 put.
|
||||
@ -299,7 +302,7 @@ class Stats {
|
||||
|
||||
void SetId(int id) { id_ = id; }
|
||||
|
||||
void FinishedSingleOp() {
|
||||
void FinishedSingleOp(DB* db) {
|
||||
if (FLAGS_histogram) {
|
||||
double now = FLAGS_env->NowMicros();
|
||||
double micros = now - last_op_finish_;
|
||||
@ -326,12 +329,17 @@ class Stats {
|
||||
} else {
|
||||
double now = FLAGS_env->NowMicros();
|
||||
fprintf(stderr,
|
||||
"... thread %d: %ld ops in %.6f seconds and %.2f ops/sec\n",
|
||||
"... thread %d: (%ld,%ld) ops (interval,total) in %.6f seconds and %.2f ops/sec\n",
|
||||
id_,
|
||||
done_ - last_report_done_,
|
||||
done_ - last_report_done_, done_,
|
||||
(now - last_report_finish_) / 1000000.0,
|
||||
(done_ - last_report_done_) /
|
||||
((now - last_report_finish_) / 1000000.0));
|
||||
|
||||
std::string stats;
|
||||
if (db && db->GetProperty("leveldb.stats", &stats))
|
||||
fprintf(stderr, stats.c_str());
|
||||
|
||||
fflush(stderr);
|
||||
next_report_ += FLAGS_stats_interval;
|
||||
last_report_finish_ = now;
|
||||
@ -794,7 +802,7 @@ class Benchmark {
|
||||
uint32_t crc = 0;
|
||||
while (bytes < 500 * 1048576) {
|
||||
crc = crc32c::Value(data.data(), size);
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(NULL);
|
||||
bytes += size;
|
||||
}
|
||||
// Print so result is not dead
|
||||
@ -815,7 +823,7 @@ class Benchmark {
|
||||
ptr = ap.Acquire_Load();
|
||||
}
|
||||
count++;
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(NULL);
|
||||
}
|
||||
if (ptr == NULL) exit(1); // Disable unused variable warning.
|
||||
}
|
||||
@ -831,7 +839,7 @@ class Benchmark {
|
||||
ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
|
||||
produced += compressed.size();
|
||||
bytes += input.size();
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(NULL);
|
||||
}
|
||||
|
||||
if (!ok) {
|
||||
@ -856,7 +864,7 @@ class Benchmark {
|
||||
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
|
||||
uncompressed);
|
||||
bytes += input.size();
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(NULL);
|
||||
}
|
||||
delete[] uncompressed;
|
||||
|
||||
@ -887,6 +895,8 @@ class Benchmark {
|
||||
options.max_bytes_for_level_multiplier =
|
||||
FLAGS_max_bytes_for_level_multiplier;
|
||||
options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
|
||||
options.level0_file_num_compaction_trigger =
|
||||
FLAGS_level0_file_num_compaction_trigger;
|
||||
options.level0_slowdown_writes_trigger =
|
||||
FLAGS_level0_slowdown_writes_trigger;
|
||||
options.compression = FLAGS_compression_type;
|
||||
@ -927,7 +937,7 @@ class Benchmark {
|
||||
snprintf(key, sizeof(key), "%016d", k);
|
||||
batch.Put(key, gen.Generate(value_size_));
|
||||
bytes += value_size_ + strlen(key);
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(db_);
|
||||
}
|
||||
s = db_->Write(write_options_, &batch);
|
||||
if (!s.ok()) {
|
||||
@ -944,7 +954,7 @@ class Benchmark {
|
||||
int64_t bytes = 0;
|
||||
for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
|
||||
bytes += iter->key().size() + iter->value().size();
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(db_);
|
||||
++i;
|
||||
}
|
||||
delete iter;
|
||||
@ -957,7 +967,7 @@ class Benchmark {
|
||||
int64_t bytes = 0;
|
||||
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
|
||||
bytes += iter->key().size() + iter->value().size();
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(db_);
|
||||
++i;
|
||||
}
|
||||
delete iter;
|
||||
@ -975,7 +985,7 @@ class Benchmark {
|
||||
if (db_->Get(options, key, &value).ok()) {
|
||||
found++;
|
||||
}
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(db_);
|
||||
}
|
||||
char msg[100];
|
||||
snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, num_);
|
||||
@ -990,7 +1000,7 @@ class Benchmark {
|
||||
const int k = thread->rand.Next() % FLAGS_num;
|
||||
snprintf(key, sizeof(key), "%016d.", k);
|
||||
db_->Get(options, key, &value);
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(db_);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1003,7 +1013,7 @@ class Benchmark {
|
||||
const int k = thread->rand.Next() % range;
|
||||
snprintf(key, sizeof(key), "%016d", k);
|
||||
db_->Get(options, key, &value);
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(db_);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1019,7 +1029,7 @@ class Benchmark {
|
||||
iter->Seek(key);
|
||||
if (iter->Valid() && iter->key() == key) found++;
|
||||
delete iter;
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(db_);
|
||||
}
|
||||
char msg[100];
|
||||
snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, num_);
|
||||
@ -1037,7 +1047,7 @@ class Benchmark {
|
||||
char key[100];
|
||||
snprintf(key, sizeof(key), "%016d", k);
|
||||
batch.Delete(key);
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(db_);
|
||||
}
|
||||
s = db_->Write(write_options_, &batch);
|
||||
if (!s.ok()) {
|
||||
@ -1126,7 +1136,7 @@ class Benchmark {
|
||||
put_weight--;
|
||||
writes_done++;
|
||||
}
|
||||
thread->stats.FinishedSingleOp();
|
||||
thread->stats.FinishedSingleOp(db_);
|
||||
}
|
||||
char msg[100];
|
||||
snprintf(msg, sizeof(msg), "( reads:%ld writes:%ld total:%ld )",
|
||||
@ -1281,6 +1291,9 @@ int main(int argc, char** argv) {
|
||||
} else if (sscanf(argv[i],"--level0_slowdown_writes_trigger=%d%c",
|
||||
&n, &junk) == 1) {
|
||||
FLAGS_level0_slowdown_writes_trigger = n;
|
||||
} else if (sscanf(argv[i],"--level0_file_num_compaction_trigger=%d%c",
|
||||
&n, &junk) == 1) {
|
||||
FLAGS_level0_file_num_compaction_trigger = n;
|
||||
} else if (strncmp(argv[i], "--compression_type=", 19) == 0) {
|
||||
const char* ctype = argv[i] + 19;
|
||||
if (!strcasecmp(ctype, "none"))
|
||||
|
@ -160,7 +160,11 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
||||
manual_compaction_(NULL),
|
||||
logger_(NULL),
|
||||
disable_delete_obsolete_files_(false),
|
||||
delete_obsolete_files_last_run_(0) {
|
||||
delete_obsolete_files_last_run_(0),
|
||||
stall_level0_slowdown_(0),
|
||||
stall_memtable_compaction_(0),
|
||||
stall_level0_num_files_(0),
|
||||
started_at_(options.env->NowMicros()) {
|
||||
mem_->Ref();
|
||||
has_imm_.Release_Store(NULL);
|
||||
|
||||
@ -608,6 +612,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
|
||||
CompactionStats stats;
|
||||
stats.micros = env_->NowMicros() - start_micros;
|
||||
stats.bytes_written = meta.file_size;
|
||||
stats.files_out_levelnp1 = 1;
|
||||
stats_[level].Add(stats);
|
||||
return s;
|
||||
}
|
||||
@ -1155,11 +1160,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
|
||||
CompactionStats stats;
|
||||
stats.micros = env_->NowMicros() - start_micros - imm_micros;
|
||||
for (int which = 0; which < 2; which++) {
|
||||
for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
|
||||
stats.bytes_read += compact->compaction->input(which, i)->file_size;
|
||||
}
|
||||
}
|
||||
|
||||
stats.files_in_leveln = compact->compaction->num_input_files(0);
|
||||
stats.files_in_levelnp1 = compact->compaction->num_input_files(1);
|
||||
stats.files_out_levelnp1 = compact->outputs.size();
|
||||
|
||||
for (int i = 0; i < compact->compaction->num_input_files(0); i++)
|
||||
stats.bytes_readn += compact->compaction->input(0, i)->file_size;
|
||||
|
||||
for (int i = 0; i < compact->compaction->num_input_files(1); i++)
|
||||
stats.bytes_readnp1 += compact->compaction->input(1, i)->file_size;
|
||||
|
||||
for (size_t i = 0; i < compact->outputs.size(); i++) {
|
||||
stats.bytes_written += compact->outputs[i].file_size;
|
||||
}
|
||||
@ -1172,7 +1183,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
}
|
||||
VersionSet::LevelSummaryStorage tmp;
|
||||
Log(options_.info_log,
|
||||
"compacted to: %s", versions_->LevelSummary(&tmp));
|
||||
"compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) "
|
||||
"MB in(%.1f, %.1f) out(%.1f), amplify(%.1f)\n",
|
||||
versions_->LevelSummary(&tmp),
|
||||
(stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) /
|
||||
(double) stats.micros,
|
||||
compact->compaction->level() + 1,
|
||||
stats.files_in_leveln, stats.files_in_levelnp1, stats.files_out_levelnp1,
|
||||
stats.bytes_readn / 1048576.0,
|
||||
stats.bytes_readnp1 / 1048576.0,
|
||||
stats.bytes_written / 1048576.0,
|
||||
(stats.bytes_written + stats.bytes_readnp1) /
|
||||
(double) stats.bytes_readn);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
@ -1461,6 +1484,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
|
||||
// case it is sharing the same core as the writer.
|
||||
mutex_.Unlock();
|
||||
env_->SleepForMicroseconds(1000);
|
||||
stall_level0_slowdown_ += 1000;
|
||||
allow_delay = false; // Do not delay a single write more than once
|
||||
Log(options_.info_log, "delaying write...\n");
|
||||
mutex_.Lock();
|
||||
@ -1472,12 +1496,16 @@ Status DBImpl::MakeRoomForWrite(bool force) {
|
||||
// We have filled up the current memtable, but the previous
|
||||
// one is still being compacted, so we wait.
|
||||
Log(options_.info_log, "wait for memtable compaction...\n");
|
||||
uint64_t t1 = env_->NowMicros();
|
||||
bg_cv_.Wait();
|
||||
stall_memtable_compaction_ += env_->NowMicros() - t1;
|
||||
} else if (versions_->NumLevelFiles(0) >=
|
||||
options_.level0_stop_writes_trigger) {
|
||||
// There are too many level-0 files.
|
||||
Log(options_.info_log, "waiting...\n");
|
||||
uint64_t t1 = env_->NowMicros();
|
||||
bg_cv_.Wait();
|
||||
stall_level0_num_files_ += env_->NowMicros() - t1;
|
||||
} else {
|
||||
// Attempt to switch to a new memtable and trigger compaction of old
|
||||
assert(versions_->PrevLogNumber() == 0);
|
||||
@ -1528,28 +1556,73 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
|
||||
return true;
|
||||
}
|
||||
} else if (in == "stats") {
|
||||
char buf[200];
|
||||
char buf[1000];
|
||||
uint64_t total_bytes = 0;
|
||||
uint64_t micros_up = env_->NowMicros() - started_at_;
|
||||
double seconds_up = micros_up / 1000000.0;
|
||||
|
||||
// Pardon the long line but I think it is easier to read this way.
|
||||
snprintf(buf, sizeof(buf),
|
||||
" Compactions\n"
|
||||
"Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
|
||||
"--------------------------------------------------\n"
|
||||
"Level Files Size(MB) Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count\n"
|
||||
"------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
|
||||
);
|
||||
value->append(buf);
|
||||
for (int level = 0; level < NumberLevels(); level++) {
|
||||
int files = versions_->NumLevelFiles(level);
|
||||
if (stats_[level].micros > 0 || files > 0) {
|
||||
int64_t bytes_read = stats_[level].bytes_readn +
|
||||
stats_[level].bytes_readnp1;
|
||||
int64_t bytes_new = stats_[level].bytes_written -
|
||||
stats_[level].bytes_readnp1;
|
||||
double amplify = (stats_[level].bytes_readn == 0)
|
||||
? 0.0
|
||||
: (stats_[level].bytes_written + stats_[level].bytes_readnp1) /
|
||||
(double) stats_[level].bytes_readn;
|
||||
|
||||
total_bytes += bytes_read + stats_[level].bytes_written;
|
||||
snprintf(
|
||||
buf, sizeof(buf),
|
||||
"%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
|
||||
"%3d %8d %8.0f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d\n",
|
||||
level,
|
||||
files,
|
||||
versions_->NumLevelBytes(level) / 1048576.0,
|
||||
stats_[level].micros / 1e6,
|
||||
stats_[level].bytes_read / 1048576.0,
|
||||
stats_[level].bytes_written / 1048576.0);
|
||||
bytes_read / 1048576.0,
|
||||
stats_[level].bytes_written / 1048576.0,
|
||||
stats_[level].bytes_readn / 1048576.0,
|
||||
stats_[level].bytes_readnp1 / 1048576.0,
|
||||
bytes_new / 1048576.0,
|
||||
amplify,
|
||||
bytes_read / 1048576.0 / seconds_up,
|
||||
stats_[level].bytes_written / 1048576.0 / seconds_up,
|
||||
stats_[level].files_in_leveln,
|
||||
stats_[level].files_in_levelnp1,
|
||||
stats_[level].files_out_levelnp1,
|
||||
stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1,
|
||||
stats_[level].count);
|
||||
value->append(buf);
|
||||
}
|
||||
}
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Amplification: %.1f rate, %.2f GB in, %.2f GB out\n",
|
||||
(double) total_bytes / stats_[0].bytes_written,
|
||||
stats_[0].bytes_written / (1048576.0 * 1024),
|
||||
total_bytes / (1048576.0 * 1024));
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f\n", seconds_up);
|
||||
value->append(buf);
|
||||
|
||||
snprintf(buf, sizeof(buf),
|
||||
"Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, "
|
||||
"%.3f memtable_compaction\n",
|
||||
stall_level0_slowdown_ / 1000000.0,
|
||||
stall_level0_num_files_ / 1000000.0,
|
||||
stall_memtable_compaction_ / 1000000.0);
|
||||
value->append(buf);
|
||||
|
||||
return true;
|
||||
} else if (in == "sstables") {
|
||||
*value = versions_->current()->DebugString();
|
||||
|
42
db/db_impl.h
42
db/db_impl.h
@ -213,21 +213,57 @@ class DBImpl : public DB {
|
||||
// last time when DeleteObsoleteFiles was invoked
|
||||
uint64_t delete_obsolete_files_last_run_;
|
||||
|
||||
// These count the number of microseconds for which MakeRoomForWrite stalls.
|
||||
uint64_t stall_level0_slowdown_;
|
||||
uint64_t stall_memtable_compaction_;
|
||||
uint64_t stall_level0_num_files_;
|
||||
|
||||
// Time at which this instance was started.
|
||||
const uint64_t started_at_;
|
||||
|
||||
// Per level compaction stats. stats_[level] stores the stats for
|
||||
// compactions that produced data for the specified "level".
|
||||
struct CompactionStats {
|
||||
int64_t micros;
|
||||
int64_t bytes_read;
|
||||
|
||||
// Bytes read from level N during compaction between levels N and N+1
|
||||
int64_t bytes_readn;
|
||||
|
||||
// Bytes read from level N+1 during compaction between levels N and N+1
|
||||
int64_t bytes_readnp1;
|
||||
|
||||
// Total bytes written during compaction between levels N and N+1
|
||||
int64_t bytes_written;
|
||||
|
||||
CompactionStats() : micros(0), bytes_read(0), bytes_written(0) { }
|
||||
// Files read from level N during compaction between levels N and N+1
|
||||
int files_in_leveln;
|
||||
|
||||
// Files read from level N+1 during compaction between levels N and N+1
|
||||
int files_in_levelnp1;
|
||||
|
||||
// Files written during compaction between levels N and N+1
|
||||
int files_out_levelnp1;
|
||||
|
||||
// Number of compactions done
|
||||
int count;
|
||||
|
||||
CompactionStats() : micros(0), bytes_readn(0), bytes_readnp1(0),
|
||||
bytes_written(0), files_in_leveln(0),
|
||||
files_in_levelnp1(0), files_out_levelnp1(0),
|
||||
count(0) { }
|
||||
|
||||
void Add(const CompactionStats& c) {
|
||||
this->micros += c.micros;
|
||||
this->bytes_read += c.bytes_read;
|
||||
this->bytes_readn += c.bytes_readn;
|
||||
this->bytes_readnp1 += c.bytes_readnp1;
|
||||
this->bytes_written += c.bytes_written;
|
||||
this->files_in_leveln += c.files_in_leveln;
|
||||
this->files_in_levelnp1 += c.files_in_levelnp1;
|
||||
this->files_out_levelnp1 += c.files_out_levelnp1;
|
||||
this->count += 1;
|
||||
}
|
||||
};
|
||||
|
||||
CompactionStats* stats_;
|
||||
|
||||
static const int KEEP_LOG_FILE_NUM = 1000;
|
||||
|
Loading…
x
Reference in New Issue
Block a user