diff --git a/db/db_bench.cc b/db/db_bench.cc index 7927fab84..355e3adf4 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -312,7 +312,7 @@ class Stats { void SetId(int id) { id_ = id; } - void FinishedSingleOp() { + void FinishedSingleOp(DB* db) { if (FLAGS_histogram) { double now = FLAGS_env->NowMicros(); double micros = now - last_op_finish_; @@ -339,13 +339,18 @@ class Stats { } else { double now = FLAGS_env->NowMicros(); fprintf(stderr, - "%s thread %d: %ld ops in %.6f seconds and %.2f ops/sec\n", + "%s thread %d: (%ld,%ld) ops (interval,total) in %.6f seconds and %.2f ops/sec\n", FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(), id_, - done_ - last_report_done_, + done_ - last_report_done_, done_, (now - last_report_finish_) / 1000000.0, (done_ - last_report_done_) / ((now - last_report_finish_) / 1000000.0)); + + std::string stats; + if (db && db->GetProperty("leveldb.stats", &stats)) + fprintf(stderr, stats.c_str()); + fflush(stderr); next_report_ += FLAGS_stats_interval; last_report_finish_ = now; @@ -808,7 +813,7 @@ class Benchmark { uint32_t crc = 0; while (bytes < 500 * 1048576) { crc = crc32c::Value(data.data(), size); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(NULL); bytes += size; } // Print so result is not dead @@ -829,7 +834,7 @@ class Benchmark { ptr = ap.Acquire_Load(); } count++; - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(NULL); } if (ptr == NULL) exit(1); // Disable unused variable warning. } @@ -845,7 +850,7 @@ class Benchmark { ok = port::Snappy_Compress(input.data(), input.size(), &compressed); produced += compressed.size(); bytes += input.size(); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(NULL); } if (!ok) { @@ -870,7 +875,7 @@ class Benchmark { ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), uncompressed); bytes += input.size(); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(NULL); } delete[] uncompressed; @@ -903,7 +908,7 @@ class Benchmark { options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger; - options.level0_file_num_compaction_trigger = + options.level0_file_num_compaction_trigger = FLAGS_level0_file_num_compaction_trigger; options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; @@ -945,7 +950,7 @@ class Benchmark { snprintf(key, sizeof(key), "%016d", k); batch.Put(key, gen.Generate(value_size_)); bytes += value_size_ + strlen(key); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } s = db_->Write(write_options_, &batch); if (!s.ok()) { @@ -962,7 +967,7 @@ class Benchmark { int64_t bytes = 0; for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { bytes += iter->key().size() + iter->value().size(); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); ++i; } delete iter; @@ -975,7 +980,7 @@ class Benchmark { int64_t bytes = 0; for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { bytes += iter->key().size() + iter->value().size(); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); ++i; } delete iter; @@ -993,7 +998,7 @@ class Benchmark { if (db_->Get(options, key, &value).ok()) { found++; } - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } char msg[100]; snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, num_); @@ -1008,7 +1013,7 @@ class Benchmark { const int k = thread->rand.Next() % FLAGS_num; snprintf(key, sizeof(key), "%016d.", k); db_->Get(options, key, &value); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } } @@ -1021,7 +1026,7 @@ class Benchmark { const int k = thread->rand.Next() % range; snprintf(key, sizeof(key), "%016d", k); db_->Get(options, key, &value); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } } @@ -1037,7 +1042,7 @@ class Benchmark { iter->Seek(key); if (iter->Valid() && iter->key() == key) found++; delete iter; - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } char msg[100]; snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, num_); @@ -1055,7 +1060,7 @@ class Benchmark { char key[100]; snprintf(key, sizeof(key), "%016d", k); batch.Delete(key); - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } s = db_->Write(write_options_, &batch); if (!s.ok()) { @@ -1144,7 +1149,7 @@ class Benchmark { put_weight--; writes_done++; } - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(db_); } char msg[100]; snprintf(msg, sizeof(msg), "( reads:%ld writes:%ld total:%ld )", diff --git a/db/db_impl.cc b/db/db_impl.cc index 8b5855c16..2eef46f43 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -162,6 +162,10 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) logger_(NULL), disable_delete_obsolete_files_(false), delete_obsolete_files_last_run_(0), + stall_level0_slowdown_(0), + stall_memtable_compaction_(0), + stall_level0_num_files_(0), + started_at_(options.env->NowMicros()), delayed_writes_(0) { mem_->Ref(); @@ -605,6 +609,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { CompactionStats stats; stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; + stats.files_out_levelnp1 = 1; stats_[level].Add(stats); return s; } @@ -1334,18 +1339,21 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; - for (int which = 0; which < 2; which++) { - for (int i = 0; i < compact->compaction->num_input_files(which); i++) { - stats.bytes_read += compact->compaction->input(which, i)->file_size; - } - } + + stats.files_in_leveln = compact->compaction->num_input_files(0); + stats.files_in_levelnp1 = compact->compaction->num_input_files(1); + stats.files_out_levelnp1 = compact->outputs.size(); + + for (int i = 0; i < compact->compaction->num_input_files(0); i++) + stats.bytes_readn += compact->compaction->input(0, i)->file_size; + + for (int i = 0; i < compact->compaction->num_input_files(1); i++) + stats.bytes_readnp1 += compact->compaction->input(1, i)->file_size; + for (size_t i = 0; i < compact->outputs.size(); i++) { stats.bytes_written += compact->outputs[i].file_size; } - int MBpersec = ((stats.bytes_read + stats.bytes_written)) - /stats.micros; - mutex_.Lock(); stats_[compact->compaction->level() + 1].Add(stats); @@ -1358,8 +1366,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, - "compacted to: %s %d MBytes/sec", versions_->LevelSummary(&tmp), - MBpersec); + "compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) " + "MB in(%.1f, %.1f) out(%.1f), amplify(%.1f)\n", + versions_->LevelSummary(&tmp), + (stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) / + (double) stats.micros, + compact->compaction->level() + 1, + stats.files_in_leveln, stats.files_in_levelnp1, stats.files_out_levelnp1, + stats.bytes_readn / 1048576.0, + stats.bytes_readnp1 / 1048576.0, + stats.bytes_written / 1048576.0, + (stats.bytes_written + stats.bytes_readnp1) / + (double) stats.bytes_readn); + return status; } @@ -1655,6 +1674,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { // case it is sharing the same core as the writer. mutex_.Unlock(); env_->SleepForMicroseconds(1000); + stall_level0_slowdown_ += 1000; allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); delayed_writes_++; @@ -1670,13 +1690,17 @@ Status DBImpl::MakeRoomForWrite(bool force) { // ones are still being compacted, so we wait. DelayLoggingAndReset(); Log(options_.info_log, "wait for memtable compaction...\n"); + uint64_t t1 = env_->NowMicros(); bg_cv_.Wait(); + stall_memtable_compaction_ += env_->NowMicros() - t1; } else if (versions_->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { // There are too many level-0 files. DelayLoggingAndReset(); + uint64_t t1 = env_->NowMicros(); Log(options_.info_log, "wait for fewer level0 files...\n"); bg_cv_.Wait(); + stall_level0_num_files_ += env_->NowMicros() - t1; } else { // Attempt to switch to a new memtable and trigger compaction of old DelayLoggingAndReset(); @@ -1727,28 +1751,73 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { return true; } } else if (in == "stats") { - char buf[200]; + char buf[1000]; + uint64_t total_bytes = 0; + uint64_t micros_up = env_->NowMicros() - started_at_; + double seconds_up = micros_up / 1000000.0; + + // Pardon the long line but I think it is easier to read this way. snprintf(buf, sizeof(buf), " Compactions\n" - "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n" - "--------------------------------------------------\n" + "Level Files Size(MB) Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count\n" + "------------------------------------------------------------------------------------------------------------------------------------------------------------\n" ); value->append(buf); for (int level = 0; level < NumberLevels(); level++) { int files = versions_->NumLevelFiles(level); if (stats_[level].micros > 0 || files > 0) { + int64_t bytes_read = stats_[level].bytes_readn + + stats_[level].bytes_readnp1; + int64_t bytes_new = stats_[level].bytes_written - + stats_[level].bytes_readnp1; + double amplify = (stats_[level].bytes_readn == 0) + ? 0.0 + : (stats_[level].bytes_written + stats_[level].bytes_readnp1) / + (double) stats_[level].bytes_readn; + + total_bytes += bytes_read + stats_[level].bytes_written; snprintf( buf, sizeof(buf), - "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", + "%3d %8d %8.0f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d\n", level, files, versions_->NumLevelBytes(level) / 1048576.0, stats_[level].micros / 1e6, - stats_[level].bytes_read / 1048576.0, - stats_[level].bytes_written / 1048576.0); + bytes_read / 1048576.0, + stats_[level].bytes_written / 1048576.0, + stats_[level].bytes_readn / 1048576.0, + stats_[level].bytes_readnp1 / 1048576.0, + bytes_new / 1048576.0, + amplify, + bytes_read / 1048576.0 / seconds_up, + stats_[level].bytes_written / 1048576.0 / seconds_up, + stats_[level].files_in_leveln, + stats_[level].files_in_levelnp1, + stats_[level].files_out_levelnp1, + stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1, + stats_[level].count); value->append(buf); } } + + snprintf(buf, sizeof(buf), + "Amplification: %.1f rate, %.2f GB in, %.2f GB out\n", + (double) total_bytes / stats_[0].bytes_written, + stats_[0].bytes_written / (1048576.0 * 1024), + total_bytes / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), "Uptime(secs): %.1f\n", seconds_up); + value->append(buf); + + snprintf(buf, sizeof(buf), + "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " + "%.3f memtable_compaction\n", + stall_level0_slowdown_ / 1000000.0, + stall_level0_num_files_ / 1000000.0, + stall_memtable_compaction_ / 1000000.0); + value->append(buf); + return true; } else if (in == "sstables") { *value = versions_->current()->DebugString(); diff --git a/db/db_impl.h b/db/db_impl.h index 4bb6374a4..4d378a31b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -224,21 +224,57 @@ class DBImpl : public DB { // last time when DeleteObsoleteFiles was invoked uint64_t delete_obsolete_files_last_run_; + // These count the number of microseconds for which MakeRoomForWrite stalls. + uint64_t stall_level0_slowdown_; + uint64_t stall_memtable_compaction_; + uint64_t stall_level0_num_files_; + + // Time at which this instance was started. + const uint64_t started_at_; + // Per level compaction stats. stats_[level] stores the stats for // compactions that produced data for the specified "level". struct CompactionStats { int64_t micros; - int64_t bytes_read; + + // Bytes read from level N during compaction between levels N and N+1 + int64_t bytes_readn; + + // Bytes read from level N+1 during compaction between levels N and N+1 + int64_t bytes_readnp1; + + // Total bytes written during compaction between levels N and N+1 int64_t bytes_written; - CompactionStats() : micros(0), bytes_read(0), bytes_written(0) { } + // Files read from level N during compaction between levels N and N+1 + int files_in_leveln; + + // Files read from level N+1 during compaction between levels N and N+1 + int files_in_levelnp1; + + // Files written during compaction between levels N and N+1 + int files_out_levelnp1; + + // Number of compactions done + int count; + + CompactionStats() : micros(0), bytes_readn(0), bytes_readnp1(0), + bytes_written(0), files_in_leveln(0), + files_in_levelnp1(0), files_out_levelnp1(0), + count(0) { } void Add(const CompactionStats& c) { this->micros += c.micros; - this->bytes_read += c.bytes_read; + this->bytes_readn += c.bytes_readn; + this->bytes_readnp1 += c.bytes_readnp1; this->bytes_written += c.bytes_written; + this->files_in_leveln += c.files_in_leveln; + this->files_in_levelnp1 += c.files_in_levelnp1; + this->files_out_levelnp1 += c.files_out_levelnp1; + this->count += 1; } }; + CompactionStats* stats_; static const int KEEP_LOG_FILE_NUM = 1000; diff --git a/db/db_test.cc b/db/db_test.cc index 211a5d981..daea3aaaf 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -385,6 +385,13 @@ class DBTest { return static_cast(files.size()); } + int CountLiveFiles() { + std::vector files; + uint64_t manifest_file_size; + db_->GetLiveFiles(files, &manifest_file_size); + return files.size(); + } + uint64_t Size(const Slice& start, const Slice& limit) { Range r(start, limit); uint64_t size; @@ -1631,12 +1638,12 @@ TEST(DBTest, NonWritableFileSystem) TEST(DBTest, FilesDeletedAfterCompaction) { ASSERT_OK(Put("foo", "v2")); Compact("a", "z"); - const int num_files = CountFiles(); + const int num_files = CountLiveFiles(); for (int i = 0; i < 10; i++) { ASSERT_OK(Put("foo", "v2")); Compact("a", "z"); } - // ASSERT_EQ(CountFiles(), num_files); TODO + ASSERT_EQ(CountLiveFiles(), num_files); } TEST(DBTest, BloomFilter) {