From e7206f43ee87e8b27d8a7dc9377b23f3a109ce29 Mon Sep 17 00:00:00 2001 From: Mark Callaghan Date: Tue, 23 Oct 2012 10:34:09 -0700 Subject: [PATCH 1/2] Improve statistics Summary: This adds more statistics to be reported by GetProperty("leveldb.stats"). The new stats include time spent waiting on stalls in MakeRoomForWrite. This also includes the total amplification rate where that is: (#bytes of sequential IO during compaction) / (#bytes from Put) This also includes a lot more data for the per-level compaction report. * Rn(MB) - MB read from level N during compaction between levels N and N+1 * Rnp1(MB) - MB read from level N+1 during compaction between levels N and N+1 * Wnew(MB) - new data written to the level during compaction * Amplify - ( Write(MB) + Rnp1(MB) ) / Rn(MB) * Rn - files read from level N during compaction between levels N and N+1 * Rnp1 - files read from level N+1 during compaction between levels N and N+1 * Wnp1 - files written to level N+1 during compaction between levels N and N+1 * NewW - new files written to level N+1 during compaction * Count - number of compactions done for this level This is the new output from DB::GetProperty("leveldb.stats"). The old output stopped at Write(MB) Compactions Level Files Size(MB) Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count ------------------------------------------------------------------------------------------------------------------------------------- 0 3 6 33 0 576 0 0 576 -1.0 0.0 1.3 0 0 0 0 290 1 127 242 351 5316 5314 570 4747 567 17.0 12.1 12.1 287 2399 2685 286 32 2 161 328 54 822 824 326 496 328 4.0 1.9 1.9 160 251 411 160 161 Amplification: 22.3 rate, 0.56 GB in, 12.55 GB out Uptime(secs): 439.8 Stalls(secs): 206.938 level0_slowdown, 0.000 level0_numfiles, 24.129 memtable_compaction Task ID: # Blame Rev: Test Plan: run db_bench Revert Plan: Database Impact: Memcache Impact: Other Notes: EImportant: - begin *PUBLIC* platform impact section - Bugzilla: # - end platform impact - (cherry picked from commit ecdeead38f86cc02e754d0032600742c4f02fec8) Reviewers: dhruba Differential Revision: https://reviews.facebook.net/D6153 --- db/db_bench.cc | 45 +++++++++++++++-------- db/db_impl.cc | 99 +++++++++++++++++++++++++++++++++++++++++++------- db/db_impl.h | 42 +++++++++++++++++++-- 3 files changed, 154 insertions(+), 32 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 485536da8..626cc163b 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -159,6 +159,9 @@ static int FLAGS_level0_stop_writes_trigger = 12; // Number of files in level-0 that will slow down writes. static int FLAGS_level0_slowdown_writes_trigger = 8; +// Number of files in level-0 when compactions start +static int FLAGS_level0_file_num_compaction_trigger = 4; + // Ratio of reads to writes (expressed as a percentage) // for the ReadRandomWriteRandom workload. The default // setting is 9 gets for every 1 put. @@ -299,7 +302,7 @@ class Stats { void SetId(int id) { id_ = id; } - void FinishedSingleOp() { + void FinishedSingleOp(DB* db) { if (FLAGS_histogram) { double now = FLAGS_env->NowMicros(); double micros = now - last_op_finish_; @@ -326,12 +329,17 @@ class Stats { } else { double now = FLAGS_env->NowMicros(); fprintf(stderr, - "... thread %d: %ld ops in %.6f seconds and %.2f ops/sec\n", + "... thread %d: (%ld,%ld) ops (interval,total) in %.6f seconds and %.2f ops/sec\n", id_, - done_ - last_report_done_, + done_ - last_report_done_, done_, (now - last_report_finish_) / 1000000.0, (done_ - last_report_done_) / ((now - last_report_finish_) / 1000000.0)); + + std::string stats; + if (db && db->GetProperty("leveldb.stats", &stats)) + fprintf(stderr, stats.c_str()); + fflush(stderr); next_report_ += FLAGS_stats_interval; last_report_finish_ = now; @@ -794,7 +802,7 @@ class Benchmark { uint32_t crc = 0; while (bytes < 500 * 1048576) { crc = crc32c::Value(data.data(), size); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(NULL); bytes += size; } // Print so result is not dead @@ -815,7 +823,7 @@ class Benchmark { ptr = ap.Acquire_Load(); } count++; - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(NULL); } if (ptr == NULL) exit(1); // Disable unused variable warning. } @@ -831,7 +839,7 @@ class Benchmark { ok = port::Snappy_Compress(input.data(), input.size(), &compressed); produced += compressed.size(); bytes += input.size(); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(NULL); } if (!ok) { @@ -856,7 +864,7 @@ class Benchmark { ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), uncompressed); bytes += input.size(); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(NULL); } delete[] uncompressed; @@ -887,6 +895,8 @@ class Benchmark { options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger; + options.level0_file_num_compaction_trigger = + FLAGS_level0_file_num_compaction_trigger; options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type; @@ -927,7 +937,7 @@ class Benchmark { snprintf(key, sizeof(key), "%016d", k); batch.Put(key, gen.Generate(value_size_)); bytes += value_size_ + strlen(key); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } s = db_->Write(write_options_, &batch); if (!s.ok()) { @@ -944,7 +954,7 @@ class Benchmark { int64_t bytes = 0; for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { bytes += iter->key().size() + iter->value().size(); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); ++i; } delete iter; @@ -957,7 +967,7 @@ class Benchmark { int64_t bytes = 0; for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { bytes += iter->key().size() + iter->value().size(); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); ++i; } delete iter; @@ -975,7 +985,7 @@ class Benchmark { if (db_->Get(options, key, &value).ok()) { found++; } - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } char msg[100]; snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, num_); @@ -990,7 +1000,7 @@ class Benchmark { const int k = thread->rand.Next() % FLAGS_num; snprintf(key, sizeof(key), "%016d.", k); db_->Get(options, key, &value); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } } @@ -1003,7 +1013,7 @@ class Benchmark { const int k = thread->rand.Next() % range; snprintf(key, sizeof(key), "%016d", k); db_->Get(options, key, &value); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } } @@ -1019,7 +1029,7 @@ class Benchmark { iter->Seek(key); if (iter->Valid() && iter->key() == key) found++; delete iter; - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } char msg[100]; snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, num_); @@ -1037,7 +1047,7 @@ class Benchmark { char key[100]; snprintf(key, sizeof(key), "%016d", k); batch.Delete(key); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } s = db_->Write(write_options_, &batch); if (!s.ok()) { @@ -1126,7 +1136,7 @@ class Benchmark { put_weight--; writes_done++; } - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } char msg[100]; snprintf(msg, sizeof(msg), "( reads:%ld writes:%ld total:%ld )", @@ -1281,6 +1291,9 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i],"--level0_slowdown_writes_trigger=%d%c", &n, &junk) == 1) { FLAGS_level0_slowdown_writes_trigger = n; + } else if (sscanf(argv[i],"--level0_file_num_compaction_trigger=%d%c", + &n, &junk) == 1) { + FLAGS_level0_file_num_compaction_trigger = n; } else if (strncmp(argv[i], "--compression_type=", 19) == 0) { const char* ctype = argv[i] + 19; if (!strcasecmp(ctype, "none")) diff --git a/db/db_impl.cc b/db/db_impl.cc index ae34c36eb..0c77a26a3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -160,7 +160,11 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) manual_compaction_(NULL), logger_(NULL), disable_delete_obsolete_files_(false), - delete_obsolete_files_last_run_(0) { + delete_obsolete_files_last_run_(0), + stall_level0_slowdown_(0), + stall_memtable_compaction_(0), + stall_level0_num_files_(0), + started_at_(options.env->NowMicros()) { mem_->Ref(); has_imm_.Release_Store(NULL); @@ -608,6 +612,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, CompactionStats stats; stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; + stats.files_out_levelnp1 = 1; stats_[level].Add(stats); return s; } @@ -1155,11 +1160,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; - for (int which = 0; which < 2; which++) { - for (int i = 0; i < compact->compaction->num_input_files(which); i++) { - stats.bytes_read += compact->compaction->input(which, i)->file_size; - } - } + + stats.files_in_leveln = compact->compaction->num_input_files(0); + stats.files_in_levelnp1 = compact->compaction->num_input_files(1); + stats.files_out_levelnp1 = compact->outputs.size(); + + for (int i = 0; i < compact->compaction->num_input_files(0); i++) + stats.bytes_readn += compact->compaction->input(0, i)->file_size; + + for (int i = 0; i < compact->compaction->num_input_files(1); i++) + stats.bytes_readnp1 += compact->compaction->input(1, i)->file_size; + for (size_t i = 0; i < compact->outputs.size(); i++) { stats.bytes_written += compact->outputs[i].file_size; } @@ -1172,7 +1183,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, - "compacted to: %s", versions_->LevelSummary(&tmp)); + "compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) " + "MB in(%.1f, %.1f) out(%.1f), amplify(%.1f)\n", + versions_->LevelSummary(&tmp), + (stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) / + (double) stats.micros, + compact->compaction->level() + 1, + stats.files_in_leveln, stats.files_in_levelnp1, stats.files_out_levelnp1, + stats.bytes_readn / 1048576.0, + stats.bytes_readnp1 / 1048576.0, + stats.bytes_written / 1048576.0, + (stats.bytes_written + stats.bytes_readnp1) / + (double) stats.bytes_readn); + return status; } @@ -1461,6 +1484,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { // case it is sharing the same core as the writer. mutex_.Unlock(); env_->SleepForMicroseconds(1000); + stall_level0_slowdown_ += 1000; allow_delay = false; // Do not delay a single write more than once Log(options_.info_log, "delaying write...\n"); mutex_.Lock(); @@ -1472,12 +1496,16 @@ Status DBImpl::MakeRoomForWrite(bool force) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. Log(options_.info_log, "wait for memtable compaction...\n"); + uint64_t t1 = env_->NowMicros(); bg_cv_.Wait(); + stall_memtable_compaction_ += env_->NowMicros() - t1; } else if (versions_->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { // There are too many level-0 files. Log(options_.info_log, "waiting...\n"); + uint64_t t1 = env_->NowMicros(); bg_cv_.Wait(); + stall_level0_num_files_ += env_->NowMicros() - t1; } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); @@ -1528,28 +1556,73 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { return true; } } else if (in == "stats") { - char buf[200]; + char buf[1000]; + uint64_t total_bytes = 0; + uint64_t micros_up = env_->NowMicros() - started_at_; + double seconds_up = micros_up / 1000000.0; + + // Pardon the long line but I think it is easier to read this way. snprintf(buf, sizeof(buf), " Compactions\n" - "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n" - "--------------------------------------------------\n" + "Level Files Size(MB) Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count\n" + "------------------------------------------------------------------------------------------------------------------------------------------------------------\n" ); value->append(buf); for (int level = 0; level < NumberLevels(); level++) { int files = versions_->NumLevelFiles(level); if (stats_[level].micros > 0 || files > 0) { + int64_t bytes_read = stats_[level].bytes_readn + + stats_[level].bytes_readnp1; + int64_t bytes_new = stats_[level].bytes_written - + stats_[level].bytes_readnp1; + double amplify = (stats_[level].bytes_readn == 0) + ? 0.0 + : (stats_[level].bytes_written + stats_[level].bytes_readnp1) / + (double) stats_[level].bytes_readn; + + total_bytes += bytes_read + stats_[level].bytes_written; snprintf( buf, sizeof(buf), - "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", + "%3d %8d %8.0f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d\n", level, files, versions_->NumLevelBytes(level) / 1048576.0, stats_[level].micros / 1e6, - stats_[level].bytes_read / 1048576.0, - stats_[level].bytes_written / 1048576.0); + bytes_read / 1048576.0, + stats_[level].bytes_written / 1048576.0, + stats_[level].bytes_readn / 1048576.0, + stats_[level].bytes_readnp1 / 1048576.0, + bytes_new / 1048576.0, + amplify, + bytes_read / 1048576.0 / seconds_up, + stats_[level].bytes_written / 1048576.0 / seconds_up, + stats_[level].files_in_leveln, + stats_[level].files_in_levelnp1, + stats_[level].files_out_levelnp1, + stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1, + stats_[level].count); value->append(buf); } } + + snprintf(buf, sizeof(buf), + "Amplification: %.1f rate, %.2f GB in, %.2f GB out\n", + (double) total_bytes / stats_[0].bytes_written, + stats_[0].bytes_written / (1048576.0 * 1024), + total_bytes / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), "Uptime(secs): %.1f\n", seconds_up); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " + "%.3f memtable_compaction\n", + stall_level0_slowdown_ / 1000000.0, + stall_level0_num_files_ / 1000000.0, + stall_memtable_compaction_ / 1000000.0); + value->append(buf); + return true; } else if (in == "sstables") { *value = versions_->current()->DebugString(); diff --git a/db/db_impl.h b/db/db_impl.h index 1d480ecde..fcedea010 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -213,21 +213,57 @@ class DBImpl : public DB { // last time when DeleteObsoleteFiles was invoked uint64_t delete_obsolete_files_last_run_; + // These count the number of microseconds for which MakeRoomForWrite stalls. + uint64_t stall_level0_slowdown_; + uint64_t stall_memtable_compaction_; + uint64_t stall_level0_num_files_; + + // Time at which this instance was started. + const uint64_t started_at_; + // Per level compaction stats. stats_[level] stores the stats for // compactions that produced data for the specified "level". struct CompactionStats { int64_t micros; - int64_t bytes_read; + + // Bytes read from level N during compaction between levels N and N+1 + int64_t bytes_readn; + + // Bytes read from level N+1 during compaction between levels N and N+1 + int64_t bytes_readnp1; + + // Total bytes written during compaction between levels N and N+1 int64_t bytes_written; - CompactionStats() : micros(0), bytes_read(0), bytes_written(0) { } + // Files read from level N during compaction between levels N and N+1 + int files_in_leveln; + + // Files read from level N+1 during compaction between levels N and N+1 + int files_in_levelnp1; + + // Files written during compaction between levels N and N+1 + int files_out_levelnp1; + + // Number of compactions done + int count; + + CompactionStats() : micros(0), bytes_readn(0), bytes_readnp1(0), + bytes_written(0), files_in_leveln(0), + files_in_levelnp1(0), files_out_levelnp1(0), + count(0) { } void Add(const CompactionStats& c) { this->micros += c.micros; - this->bytes_read += c.bytes_read; + this->bytes_readn += c.bytes_readn; + this->bytes_readnp1 += c.bytes_readnp1; this->bytes_written += c.bytes_written; + this->files_in_leveln += c.files_in_leveln; + this->files_in_levelnp1 += c.files_in_levelnp1; + this->files_out_levelnp1 += c.files_out_levelnp1; + this->count += 1; } }; + CompactionStats* stats_; static const int KEEP_LOG_FILE_NUM = 1000; From 8eedf13a82a5e209bd5860f654bf0132b49c0be1 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Thu, 25 Oct 2012 21:55:06 -0700 Subject: [PATCH 2/2] Fix unit test failure caused by delaying deleting obsolete files. Summary: A previous commit 4c107587ed47af84633f8c61f65516a504d6cd98 introduced the idea that some version updates might not delete obsolete files. This means that if a unit test blindly counts the number of files in the db directory it might not represent the true state of the database. Use GetLiveFiles() insteads to count the number of live files in the database. Test Plan: make check Reviewers: heyongqiang, MarkCallaghan Reviewed By: MarkCallaghan Differential Revision: https://reviews.facebook.net/D6207 --- db/db_test.cc | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 03ca9a796..daea3aaaf 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -385,6 +385,13 @@ class DBTest { return static_cast(files.size()); } + int CountLiveFiles() { + std::vector files; + uint64_t manifest_file_size; + db_->GetLiveFiles(files, &manifest_file_size); + return files.size(); + } + uint64_t Size(const Slice& start, const Slice& limit) { Range r(start, limit); uint64_t size; @@ -1631,12 +1638,12 @@ TEST(DBTest, NonWritableFileSystem) TEST(DBTest, FilesDeletedAfterCompaction) { ASSERT_OK(Put("foo", "v2")); Compact("a", "z"); - const int num_files = CountFiles(); + const int num_files = CountLiveFiles(); for (int i = 0; i < 10; i++) { ASSERT_OK(Put("foo", "v2")); Compact("a", "z"); } - ASSERT_EQ(CountFiles(), num_files); + ASSERT_EQ(CountLiveFiles(), num_files); } TEST(DBTest, BloomFilter) {