diff --git a/db/db_bench.cc b/db/db_bench.cc index 33c1ecfe1..158a5faa2 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -149,7 +149,7 @@ DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink" DEFINE_bool(histogram, false, "Print histogram of operation timings"); -DEFINE_int32(write_buffer_size, rocksdb::Options().write_buffer_size, +DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size, "Number of bytes to buffer in memtable before compacting"); DEFINE_int32(max_write_buffer_number, diff --git a/db/db_impl.cc b/db/db_impl.cc index 49423d776..c06d2f5bc 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1041,6 +1041,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { stats.bytes_written = meta.file_size; stats.files_out_levelnp1 = 1; stats_[level].Add(stats); + RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size); return s; } @@ -1129,6 +1130,7 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; stats_[level].Add(stats); + RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size); return s; } @@ -2454,14 +2456,22 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } stats.files_out_levelnp1 = num_output_files; - for (int i = 0; i < compact->compaction->num_input_files(0); i++) + for (int i = 0; i < compact->compaction->num_input_files(0); i++) { stats.bytes_readn += compact->compaction->input(0, i)->file_size; + RecordTick(options_.statistics.get(), COMPACT_READ_BYTES, + compact->compaction->input(0, i)->file_size); + } - for (int i = 0; i < compact->compaction->num_input_files(1); i++) + for (int i = 0; i < compact->compaction->num_input_files(1); i++) { stats.bytes_readnp1 += compact->compaction->input(1, i)->file_size; + RecordTick(options_.statistics.get(), COMPACT_READ_BYTES, + compact->compaction->input(1, i)->file_size); + } for (int i = 0; i < num_output_files; i++) { stats.bytes_written += compact->outputs[i].file_size; + RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, + compact->outputs[i].file_size); } LogFlush(options_.info_log); @@ -2810,8 +2820,16 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { while (!w.done && &w != writers_.front()) { w.cv.Wait(); } + + if (!options.disableWAL) { + RecordTick(options_.statistics.get(), WRITE_WITH_WAL, 1); + } + if (w.done) { + RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1); return w.status; + } else { + RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1); } // May temporarily unlock and wait. @@ -2849,7 +2867,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (!options.disableWAL) { StopWatchNano timer(env_); StartPerfTimer(&timer); - status = log_->AddRecord(WriteBatchInternal::Contents(updates)); + Slice log_entry = WriteBatchInternal::Contents(updates); + status = log_->AddRecord(log_entry); + RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1); + RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size()); BumpPerfTime(&perf_context.wal_write_time, &timer); if (status.ok() && options.sync) { if (options_.use_fsync) { @@ -3225,6 +3246,13 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { } else if (in == "stats") { char buf[1000]; + + uint64_t wal_bytes = 0; + uint64_t wal_synced = 0; + uint64_t user_bytes_written = 0; + uint64_t write_other = 0; + uint64_t write_self = 0; + uint64_t write_with_wal = 0; uint64_t total_bytes_written = 0; uint64_t total_bytes_read = 0; uint64_t micros_up = env_->NowMicros() - started_at_; @@ -3237,6 +3265,16 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { uint64_t interval_bytes_new = 0; double interval_seconds_up = 0; + Statistics* s = options_.statistics.get(); + if (s) { + wal_bytes = s->getTickerCount(WAL_FILE_BYTES); + wal_synced = s->getTickerCount(WAL_FILE_SYNCED); + user_bytes_written = s->getTickerCount(BYTES_WRITTEN); + write_other = s->getTickerCount(WRITE_DONE_BY_OTHER); + write_self = s->getTickerCount(WRITE_DONE_BY_SELF); + write_with_wal = s->getTickerCount(WRITE_WITH_WAL); + } + // Pardon the long line but I think it is easier to read this way. snprintf(buf, sizeof(buf), " Compactions\n" @@ -3293,19 +3331,38 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { } } - interval_bytes_new = stats_[0].bytes_written - last_stats_.bytes_new_; - interval_bytes_read = total_bytes_read - last_stats_.bytes_read_; - interval_bytes_written = total_bytes_written - last_stats_.bytes_written_; + interval_bytes_new = user_bytes_written - last_stats_.ingest_bytes_; + interval_bytes_read = total_bytes_read - last_stats_.compaction_bytes_read_; + interval_bytes_written = + total_bytes_written - last_stats_.compaction_bytes_written_; interval_seconds_up = seconds_up - last_stats_.seconds_up_; snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", seconds_up, interval_seconds_up); value->append(buf); + snprintf(buf, sizeof(buf), + "Writes cumulative: %llu total, %llu batches, " + "%.1f per batch, %.2f ingest GB\n", + (unsigned long long) (write_other + write_self), + (unsigned long long) write_self, + (write_other + write_self) / (double) (write_self + 1), + user_bytes_written / (1048576.0 * 1024)); + value->append(buf); + + snprintf(buf, sizeof(buf), + "WAL cumulative: %llu WAL writes, %llu WAL syncs, " + "%.2f writes per sync, %.2f GB written\n", + (unsigned long long) write_with_wal, + (unsigned long long ) wal_synced, + write_with_wal / (double) (wal_synced + 1), + wal_bytes / (1048576.0 * 1024)); + value->append(buf); + snprintf(buf, sizeof(buf), "Compaction IO cumulative (GB): " "%.2f new, %.2f read, %.2f write, %.2f read+write\n", - stats_[0].bytes_written / (1048576.0 * 1024), + user_bytes_written / (1048576.0 * 1024), total_bytes_read / (1048576.0 * 1024), total_bytes_written / (1048576.0 * 1024), (total_bytes_read + total_bytes_written) / (1048576.0 * 1024)); @@ -3314,7 +3371,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { snprintf(buf, sizeof(buf), "Compaction IO cumulative (MB/sec): " "%.1f new, %.1f read, %.1f write, %.1f read+write\n", - stats_[0].bytes_written / 1048576.0 / seconds_up, + user_bytes_written / 1048576.0 / seconds_up, total_bytes_read / 1048576.0 / seconds_up, total_bytes_written / 1048576.0 / seconds_up, (total_bytes_read + total_bytes_written) / 1048576.0 / seconds_up); @@ -3323,9 +3380,38 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { // +1 to avoid divide by 0 and NaN snprintf(buf, sizeof(buf), "Amplification cumulative: %.1f write, %.1f compaction\n", - (double) total_bytes_written / (stats_[0].bytes_written+1), - (double) (total_bytes_written + total_bytes_read) - / (stats_[0].bytes_written+1)); + (double) (total_bytes_written + wal_bytes) + / (user_bytes_written + 1), + (double) (total_bytes_written + total_bytes_read + wal_bytes) + / (user_bytes_written + 1)); + value->append(buf); + + uint64_t interval_write_other = write_other - last_stats_.write_other_; + uint64_t interval_write_self = write_self - last_stats_.write_self_; + + snprintf(buf, sizeof(buf), + "Writes interval: %llu total, %llu batches, " + "%.1f per batch, %.1f ingest MB\n", + (unsigned long long) (interval_write_other + interval_write_self), + (unsigned long long) interval_write_self, + (double) (interval_write_other + interval_write_self) + / (interval_write_self + 1), + (user_bytes_written - last_stats_.ingest_bytes_) / 1048576.0); + value->append(buf); + + uint64_t interval_write_with_wal = + write_with_wal - last_stats_.write_with_wal_; + + uint64_t interval_wal_synced = wal_synced - last_stats_.wal_synced_; + uint64_t interval_wal_bytes = wal_bytes - last_stats_.wal_bytes_; + + snprintf(buf, sizeof(buf), + "WAL interval: %llu WAL writes, %llu WAL syncs, " + "%.2f writes per sync, %.2f MB written\n", + (unsigned long long) interval_write_with_wal, + (unsigned long long ) interval_wal_synced, + interval_write_with_wal / (double) (interval_wal_synced + 1), + interval_wal_bytes / (1048576.0 * 1024)); value->append(buf); snprintf(buf, sizeof(buf), @@ -3350,9 +3436,10 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { // +1 to avoid divide by 0 and NaN snprintf(buf, sizeof(buf), "Amplification interval: %.1f write, %.1f compaction\n", - (double) interval_bytes_written / (interval_bytes_new+1), - (double) (interval_bytes_written + interval_bytes_read) / - (interval_bytes_new+1)); + (double) (interval_bytes_written + wal_bytes) + / (interval_bytes_new + 1), + (double) (interval_bytes_written + interval_bytes_read + wal_bytes) + / (interval_bytes_new + 1)); value->append(buf); snprintf(buf, sizeof(buf), @@ -3373,10 +3460,15 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { (unsigned long) total_slowdown_count); value->append(buf); - last_stats_.bytes_read_ = total_bytes_read; - last_stats_.bytes_written_ = total_bytes_written; - last_stats_.bytes_new_ = stats_[0].bytes_written; + last_stats_.compaction_bytes_read_ = total_bytes_read; + last_stats_.compaction_bytes_written_ = total_bytes_written; + last_stats_.ingest_bytes_ = user_bytes_written; last_stats_.seconds_up_ = seconds_up; + last_stats_.wal_bytes_ = wal_bytes; + last_stats_.wal_synced_ = wal_synced; + last_stats_.write_with_wal_ = write_with_wal; + last_stats_.write_other_ = write_other; + last_stats_.write_self_ = write_self; return true; } else if (in == "sstables") { diff --git a/db/db_impl.h b/db/db_impl.h index 059183940..39e132979 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -441,15 +441,25 @@ class DBImpl : public DB { // Used to compute per-interval statistics struct StatsSnapshot { - uint64_t bytes_read_; - uint64_t bytes_written_; - uint64_t bytes_new_; + uint64_t compaction_bytes_read_; // Bytes read by compaction + uint64_t compaction_bytes_written_; // Bytes written by compaction + uint64_t ingest_bytes_; // Bytes written by user + uint64_t wal_bytes_; // Bytes written to WAL + uint64_t wal_synced_; // Number of times WAL is synced + uint64_t write_with_wal_; // Number of writes that request WAL + // These count the number of writes processed by the calling thread or + // another thread. + uint64_t write_other_; + uint64_t write_self_; double seconds_up_; - StatsSnapshot() : bytes_read_(0), bytes_written_(0), - bytes_new_(0), seconds_up_(0) {} + StatsSnapshot() : compaction_bytes_read_(0), compaction_bytes_written_(0), + ingest_bytes_(0), wal_bytes_(0), wal_synced_(0), + write_with_wal_(0), write_other_(0), write_self_(0), + seconds_up_(0) {} }; + // Counters from the previous time per-interval stats were computed StatsSnapshot last_stats_; static const int KEEP_LOG_FILE_NUM = 1000; diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 286a624c8..011e510f5 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -114,6 +114,19 @@ enum Tickers { BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache + WAL_FILE_SYNCED, // Number of times WAL sync is done + WAL_FILE_BYTES, // Number of bytes written to WAL + + // Writes can be processed by requesting thread or by the thread at the + // head of the writers queue. + WRITE_DONE_BY_SELF, + WRITE_DONE_BY_OTHER, + + WRITE_WITH_WAL, // Number of Write calls that request WAL + + COMPACT_READ_BYTES, // Bytes read during compaction + COMPACT_WRITE_BYTES, // Bytes written during compaction + TICKER_ENUM_MAX }; @@ -159,7 +172,14 @@ const std::vector> TickersNameMap = { { NUMBER_OF_RESEEKS_IN_ITERATION, "rocksdb.number.reseeks.iteration" }, { GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls" }, { BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss" }, - { BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit" } + { BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit" }, + { WAL_FILE_SYNCED, "rocksdb.wal.synced" }, + { WAL_FILE_BYTES, "rocksdb.wal.bytes" }, + { WRITE_DONE_BY_SELF, "rocksdb.write.self" }, + { WRITE_DONE_BY_OTHER, "rocksdb.write.other" }, + { WRITE_WITH_WAL, "rocksdb.write.wal" }, + { COMPACT_READ_BYTES, "rocksdb.compact.read.bytes" }, + { COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes" }, }; /**