Break down the amount of data written during flushes/compactions per file type (#8013)

Summary:
The patch breaks down the "bytes written" (as well as the "number of output files")
compaction statistics into two, so the values are logged separately for table files
and blob files in the info log, and are shown in separate columns (`Write(GB)` for table
files, `Wblob(GB)` for blob files) when the compaction statistics are dumped.
This will also come in handy for fixing the write amplification statistics, which currently
do not consider the amount of data read from blob files during compaction. (This will
be fixed by an upcoming patch.)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8013

Test Plan: Ran `make check` and `db_bench`.

Reviewed By: riversand963

Differential Revision: D26742156

Pulled By: ltamasi

fbshipit-source-id: 31d18ee8f90438b438ca7ed1ea8cbd92114442d5
This commit is contained in:
Levi Tamasi 2021-03-02 09:46:10 -08:00 committed by Facebook GitHub Bot
parent f19612970d
commit a46f080cce
9 changed files with 117 additions and 63 deletions

View File

@ -6,6 +6,7 @@
### Public API change ### Public API change
* Add a new option BlockBasedTableOptions::max_auto_readahead_size. RocksDB does auto-readahead for iterators on noticing more than two reads for a table file if user doesn't provide readahead_size. The readahead starts at 8KB and doubles on every additional read upto max_auto_readahead_size and now max_auto_readahead_size can be configured dynamically as well. Found that 256 KB readahead size provides the best performance, based on experiments, for auto readahead. Experiment data is in PR #3282. If value is set 0 then no automatic prefetching will be done by rocksdb. Also changing the value will only affect files opened after the change. * Add a new option BlockBasedTableOptions::max_auto_readahead_size. RocksDB does auto-readahead for iterators on noticing more than two reads for a table file if user doesn't provide readahead_size. The readahead starts at 8KB and doubles on every additional read upto max_auto_readahead_size and now max_auto_readahead_size can be configured dynamically as well. Found that 256 KB readahead size provides the best performance, based on experiments, for auto readahead. Experiment data is in PR #3282. If value is set 0 then no automatic prefetching will be done by rocksdb. Also changing the value will only affect files opened after the change.
* Add suppport to extend DB::VerifyFileChecksums API to also verify blob files checksum. * Add suppport to extend DB::VerifyFileChecksums API to also verify blob files checksum.
* When using the new BlobDB, the amount of data written by flushes/compactions is now broken down into table files and blob files in the compaction statistics; namely, Write(GB) denotes the amount of data written to table files, while Wblob(GB) means the amount of data written to blob files.
### New Features ### New Features
* Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision. * Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision.

View File

@ -795,27 +795,30 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
double bytes_written_per_sec = 0; double bytes_written_per_sec = 0;
if (stats.bytes_read_non_output_levels > 0) { if (stats.bytes_read_non_output_levels > 0) {
read_write_amp = (stats.bytes_written + stats.bytes_read_output_level + read_write_amp =
stats.bytes_read_non_output_levels) / (stats.bytes_written + stats.bytes_written_blob +
static_cast<double>(stats.bytes_read_non_output_levels); stats.bytes_read_output_level + stats.bytes_read_non_output_levels) /
write_amp = stats.bytes_written / static_cast<double>(stats.bytes_read_non_output_levels);
write_amp = (stats.bytes_written + stats.bytes_written_blob) /
static_cast<double>(stats.bytes_read_non_output_levels); static_cast<double>(stats.bytes_read_non_output_levels);
} }
if (stats.micros > 0) { if (stats.micros > 0) {
bytes_read_per_sec = bytes_read_per_sec =
(stats.bytes_read_non_output_levels + stats.bytes_read_output_level) / (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
static_cast<double>(stats.micros); static_cast<double>(stats.micros);
bytes_written_per_sec = bytes_written_per_sec = (stats.bytes_written + stats.bytes_written_blob) /
stats.bytes_written / static_cast<double>(stats.micros); static_cast<double>(stats.micros);
} }
const std::string& column_family_name = cfd->GetName(); const std::string& column_family_name = cfd->GetName();
constexpr double kMB = 1048576.0;
ROCKS_LOG_BUFFER( ROCKS_LOG_BUFFER(
log_buffer_, log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d) " "files in(%d, %d) out(%d +%d blob) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "MB in(%.1f, %.1f) out(%.1f +%.1f blob), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %" PRIu64 "write-amplify(%.1f) %s, records in: %" PRIu64
", records dropped: %" PRIu64 " output_compression: %s\n", ", records dropped: %" PRIu64 " output_compression: %s\n",
column_family_name.c_str(), vstorage->LevelSummary(&tmp), column_family_name.c_str(), vstorage->LevelSummary(&tmp),
@ -823,9 +826,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
compact_->compaction->output_level(), compact_->compaction->output_level(),
stats.num_input_files_in_non_output_levels, stats.num_input_files_in_non_output_levels,
stats.num_input_files_in_output_level, stats.num_output_files, stats.num_input_files_in_output_level, stats.num_output_files,
stats.bytes_read_non_output_levels / 1048576.0, stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
stats.bytes_read_output_level / 1048576.0, stats.bytes_read_output_level / kMB, stats.bytes_written / kMB,
stats.bytes_written / 1048576.0, read_write_amp, write_amp, stats.bytes_written_blob / kMB, read_write_amp, write_amp,
status.ToString().c_str(), stats.num_input_records, status.ToString().c_str(), stats.num_input_records,
stats.num_dropped_records, stats.num_dropped_records,
CompressionTypeToString(compact_->compaction->output_compression()) CompressionTypeToString(compact_->compaction->output_compression())
@ -1825,10 +1828,11 @@ void CompactionJob::UpdateCompactionStats() {
} }
compaction_stats_.num_output_files = compaction_stats_.num_output_files =
static_cast<int>(compact_->num_output_files) + static_cast<int>(compact_->num_output_files);
compaction_stats_.num_output_files_blob =
static_cast<int>(compact_->num_blob_output_files); static_cast<int>(compact_->num_blob_output_files);
compaction_stats_.bytes_written = compaction_stats_.bytes_written = compact_->total_bytes;
compact_->total_bytes + compact_->total_blob_bytes; compaction_stats_.bytes_written_blob = compact_->total_blob_bytes;
if (compaction_stats_.num_input_records > compact_->num_output_records) { if (compaction_stats_.num_input_records > compact_->num_output_records) {
compaction_stats_.num_dropped_records = compaction_stats_.num_dropped_records =
@ -1867,9 +1871,11 @@ void CompactionJob::UpdateCompactionJobStats(
stats.num_input_files_in_output_level; stats.num_input_files_in_output_level;
// output information // output information
compaction_job_stats_->total_output_bytes = stats.bytes_written; compaction_job_stats_->total_output_bytes =
stats.bytes_written + stats.bytes_written_blob;
compaction_job_stats_->num_output_records = compact_->num_output_records; compaction_job_stats_->num_output_records = compact_->num_output_records;
compaction_job_stats_->num_output_files = stats.num_output_files; compaction_job_stats_->num_output_files =
stats.num_output_files + stats.num_output_files_blob;
if (stats.num_output_files > 0) { if (stats.num_output_files > 0) {
CopyPrefix(compact_->SmallestUserKey(), CopyPrefix(compact_->SmallestUserKey(),

View File

@ -5946,13 +5946,13 @@ TEST_F(DBCompactionTest, CompactionWithBlob) {
const InternalStats* const internal_stats = cfd->internal_stats(); const InternalStats* const internal_stats = cfd->internal_stats();
ASSERT_NE(internal_stats, nullptr); ASSERT_NE(internal_stats, nullptr);
const uint64_t expected_bytes =
table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2); ASSERT_GE(compaction_stats.size(), 2);
ASSERT_EQ(compaction_stats[1].bytes_written, expected_bytes); ASSERT_EQ(compaction_stats[1].bytes_written, table_file->fd.GetFileSize());
ASSERT_EQ(compaction_stats[1].num_output_files, 2); ASSERT_EQ(compaction_stats[1].bytes_written_blob,
blob_file->GetTotalBlobBytes());
ASSERT_EQ(compaction_stats[1].num_output_files, 1);
ASSERT_EQ(compaction_stats[1].num_output_files_blob, 1);
} }
class DBCompactionTestBlobError class DBCompactionTestBlobError
@ -6040,11 +6040,15 @@ TEST_P(DBCompactionTestBlobError, CompactionError) {
if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") { if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
ASSERT_EQ(compaction_stats[1].bytes_written, 0); ASSERT_EQ(compaction_stats[1].bytes_written, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
ASSERT_EQ(compaction_stats[1].num_output_files, 0); ASSERT_EQ(compaction_stats[1].num_output_files, 0);
ASSERT_EQ(compaction_stats[1].num_output_files_blob, 0);
} else { } else {
// SST file writing succeeded; blob file writing failed (during Finish) // SST file writing succeeded; blob file writing failed (during Finish)
ASSERT_GT(compaction_stats[1].bytes_written, 0); ASSERT_GT(compaction_stats[1].bytes_written, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
ASSERT_EQ(compaction_stats[1].num_output_files, 1); ASSERT_EQ(compaction_stats[1].num_output_files, 1);
ASSERT_EQ(compaction_stats[1].num_output_files_blob, 0);
} }
} }

View File

@ -513,16 +513,18 @@ TEST_F(DBFlushTest, FlushWithBlob) {
const InternalStats* const internal_stats = cfd->internal_stats(); const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats); assert(internal_stats);
const uint64_t expected_bytes =
table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_FALSE(compaction_stats.empty()); ASSERT_FALSE(compaction_stats.empty());
ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes); ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize());
ASSERT_EQ(compaction_stats[0].num_output_files, 2); ASSERT_EQ(compaction_stats[0].bytes_written_blob,
blob_file->GetTotalBlobBytes());
ASSERT_EQ(compaction_stats[0].num_output_files, 1);
ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1);
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes); ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
compaction_stats[0].bytes_written +
compaction_stats[0].bytes_written_blob);
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
@ -802,16 +804,21 @@ TEST_P(DBFlushTestBlobError, FlushError) {
if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") { if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
ASSERT_EQ(compaction_stats[0].bytes_written, 0); ASSERT_EQ(compaction_stats[0].bytes_written, 0);
ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0);
ASSERT_EQ(compaction_stats[0].num_output_files, 0); ASSERT_EQ(compaction_stats[0].num_output_files, 0);
ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0);
} else { } else {
// SST file writing succeeded; blob file writing failed (during Finish) // SST file writing succeeded; blob file writing failed (during Finish)
ASSERT_GT(compaction_stats[0].bytes_written, 0); ASSERT_GT(compaction_stats[0].bytes_written, 0);
ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0);
ASSERT_EQ(compaction_stats[0].num_output_files, 1); ASSERT_EQ(compaction_stats[0].num_output_files, 1);
ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0);
} }
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
compaction_stats[0].bytes_written); compaction_stats[0].bytes_written +
compaction_stats[0].bytes_written_blob);
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }

View File

@ -1408,14 +1408,15 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
const auto& blobs = edit->GetBlobFileAdditions(); const auto& blobs = edit->GetBlobFileAdditions();
for (const auto& blob : blobs) { for (const auto& blob : blobs) {
stats.bytes_written += blob.GetTotalBlobBytes(); stats.bytes_written_blob += blob.GetTotalBlobBytes();
} }
stats.num_output_files += static_cast<int>(blobs.size()); stats.num_output_files_blob = static_cast<int>(blobs.size());
cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats); cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, cfd->internal_stats()->AddCFStats(
stats.bytes_written); InternalStats::BYTES_FLUSHED,
stats.bytes_written + stats.bytes_written_blob);
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
return s; return s;
} }

View File

@ -425,16 +425,18 @@ TEST_F(DBWALTest, RecoverWithBlob) {
const InternalStats* const internal_stats = cfd->internal_stats(); const InternalStats* const internal_stats = cfd->internal_stats();
ASSERT_NE(internal_stats, nullptr); ASSERT_NE(internal_stats, nullptr);
const uint64_t expected_bytes =
table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_FALSE(compaction_stats.empty()); ASSERT_FALSE(compaction_stats.empty());
ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes); ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize());
ASSERT_EQ(compaction_stats[0].num_output_files, 2); ASSERT_EQ(compaction_stats[0].bytes_written_blob,
blob_file->GetTotalBlobBytes());
ASSERT_EQ(compaction_stats[0].num_output_files, 1);
ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1);
const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes); ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
compaction_stats[0].bytes_written +
compaction_stats[0].bytes_written_blob);
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }

View File

@ -477,15 +477,16 @@ Status FlushJob::WriteLevel0Table() {
const auto& blobs = edit_->GetBlobFileAdditions(); const auto& blobs = edit_->GetBlobFileAdditions();
for (const auto& blob : blobs) { for (const auto& blob : blobs) {
stats.bytes_written += blob.GetTotalBlobBytes(); stats.bytes_written_blob += blob.GetTotalBlobBytes();
} }
stats.num_output_files += static_cast<int>(blobs.size()); stats.num_output_files_blob = static_cast<int>(blobs.size());
RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, cfd_->internal_stats()->AddCFStats(
stats.bytes_written); InternalStats::BYTES_FLUSHED,
stats.bytes_written + stats.bytes_written_blob);
RecordFlushIOStats(); RecordFlushIOStats();
return s; return s;
} }

View File

@ -50,6 +50,7 @@ const std::map<LevelStatType, LevelStat> InternalStats::compaction_level_stats =
{LevelStatType::AVG_SEC, LevelStat{"AvgSec", "Avg(sec)"}}, {LevelStatType::AVG_SEC, LevelStat{"AvgSec", "Avg(sec)"}},
{LevelStatType::KEY_IN, LevelStat{"KeyIn", "KeyIn"}}, {LevelStatType::KEY_IN, LevelStat{"KeyIn", "KeyIn"}},
{LevelStatType::KEY_DROP, LevelStat{"KeyDrop", "KeyDrop"}}, {LevelStatType::KEY_DROP, LevelStat{"KeyDrop", "KeyDrop"}},
{LevelStatType::W_BLOB_GB, LevelStat{"WblobGB", "Wblob(GB)"}},
}; };
namespace { namespace {
@ -67,7 +68,7 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name,
}; };
int line_size = snprintf( int line_size = snprintf(
buf + written_size, len - written_size, buf + written_size, len - written_size,
"%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n", "%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n",
// Note that we skip COMPACTED_FILES and merge it with Files column // Note that we skip COMPACTED_FILES and merge it with Files column
group_by.c_str(), hdr(LevelStatType::NUM_FILES), group_by.c_str(), hdr(LevelStatType::NUM_FILES),
hdr(LevelStatType::SIZE_BYTES), hdr(LevelStatType::SCORE), hdr(LevelStatType::SIZE_BYTES), hdr(LevelStatType::SCORE),
@ -78,7 +79,7 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name,
hdr(LevelStatType::WRITE_MBPS), hdr(LevelStatType::COMP_SEC), hdr(LevelStatType::WRITE_MBPS), hdr(LevelStatType::COMP_SEC),
hdr(LevelStatType::COMP_CPU_SEC), hdr(LevelStatType::COMP_COUNT), hdr(LevelStatType::COMP_CPU_SEC), hdr(LevelStatType::COMP_COUNT),
hdr(LevelStatType::AVG_SEC), hdr(LevelStatType::KEY_IN), hdr(LevelStatType::AVG_SEC), hdr(LevelStatType::KEY_IN),
hdr(LevelStatType::KEY_DROP)); hdr(LevelStatType::KEY_DROP), hdr(LevelStatType::W_BLOB_GB));
written_size += line_size; written_size += line_size;
written_size = std::min(written_size, static_cast<int>(len)); written_size = std::min(written_size, static_cast<int>(len));
@ -90,10 +91,11 @@ void PrepareLevelStats(std::map<LevelStatType, double>* level_stats,
int num_files, int being_compacted, int num_files, int being_compacted,
double total_file_size, double score, double w_amp, double total_file_size, double score, double w_amp,
const InternalStats::CompactionStats& stats) { const InternalStats::CompactionStats& stats) {
uint64_t bytes_read = const uint64_t bytes_read =
stats.bytes_read_non_output_levels + stats.bytes_read_output_level; stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
int64_t bytes_new = stats.bytes_written - stats.bytes_read_output_level; const uint64_t bytes_written = stats.bytes_written + stats.bytes_written_blob;
double elapsed = (stats.micros + 1) / kMicrosInSec; const int64_t bytes_new = stats.bytes_written - stats.bytes_read_output_level;
const double elapsed = (stats.micros + 1) / kMicrosInSec;
(*level_stats)[LevelStatType::NUM_FILES] = num_files; (*level_stats)[LevelStatType::NUM_FILES] = num_files;
(*level_stats)[LevelStatType::COMPACTED_FILES] = being_compacted; (*level_stats)[LevelStatType::COMPACTED_FILES] = being_compacted;
@ -108,8 +110,7 @@ void PrepareLevelStats(std::map<LevelStatType, double>* level_stats,
(*level_stats)[LevelStatType::MOVED_GB] = stats.bytes_moved / kGB; (*level_stats)[LevelStatType::MOVED_GB] = stats.bytes_moved / kGB;
(*level_stats)[LevelStatType::WRITE_AMP] = w_amp; (*level_stats)[LevelStatType::WRITE_AMP] = w_amp;
(*level_stats)[LevelStatType::READ_MBPS] = bytes_read / kMB / elapsed; (*level_stats)[LevelStatType::READ_MBPS] = bytes_read / kMB / elapsed;
(*level_stats)[LevelStatType::WRITE_MBPS] = (*level_stats)[LevelStatType::WRITE_MBPS] = bytes_written / kMB / elapsed;
stats.bytes_written / kMB / elapsed;
(*level_stats)[LevelStatType::COMP_SEC] = stats.micros / kMicrosInSec; (*level_stats)[LevelStatType::COMP_SEC] = stats.micros / kMicrosInSec;
(*level_stats)[LevelStatType::COMP_CPU_SEC] = stats.cpu_micros / kMicrosInSec; (*level_stats)[LevelStatType::COMP_CPU_SEC] = stats.cpu_micros / kMicrosInSec;
(*level_stats)[LevelStatType::COMP_COUNT] = stats.count; (*level_stats)[LevelStatType::COMP_COUNT] = stats.count;
@ -119,6 +120,7 @@ void PrepareLevelStats(std::map<LevelStatType, double>* level_stats,
static_cast<double>(stats.num_input_records); static_cast<double>(stats.num_input_records);
(*level_stats)[LevelStatType::KEY_DROP] = (*level_stats)[LevelStatType::KEY_DROP] =
static_cast<double>(stats.num_dropped_records); static_cast<double>(stats.num_dropped_records);
(*level_stats)[LevelStatType::W_BLOB_GB] = stats.bytes_written_blob / kGB;
} }
void PrintLevelStats(char* buf, size_t len, const std::string& name, void PrintLevelStats(char* buf, size_t len, const std::string& name,
@ -143,7 +145,8 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name,
"%9d " /* Comp(cnt) */ "%9d " /* Comp(cnt) */
"%8.3f " /* Avg(sec) */ "%8.3f " /* Avg(sec) */
"%7s " /* KeyIn */ "%7s " /* KeyIn */
"%6s\n", /* KeyDrop */ "%6s " /* KeyDrop */
"%9.1f\n", /* Wblob(GB) */
name.c_str(), static_cast<int>(stat_value.at(LevelStatType::NUM_FILES)), name.c_str(), static_cast<int>(stat_value.at(LevelStatType::NUM_FILES)),
static_cast<int>(stat_value.at(LevelStatType::COMPACTED_FILES)), static_cast<int>(stat_value.at(LevelStatType::COMPACTED_FILES)),
BytesToHumanString( BytesToHumanString(
@ -168,7 +171,8 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name,
.c_str(), .c_str(),
NumberToHumanString( NumberToHumanString(
static_cast<std::int64_t>(stat_value.at(LevelStatType::KEY_DROP))) static_cast<std::int64_t>(stat_value.at(LevelStatType::KEY_DROP)))
.c_str()); .c_str(),
stat_value.at(LevelStatType::W_BLOB_GB));
} }
void PrintLevelStats(char* buf, size_t len, const std::string& name, void PrintLevelStats(char* buf, size_t len, const std::string& name,
@ -1179,7 +1183,8 @@ void InternalStats::DumpCFMapStats(
double w_amp = double w_amp =
(input_bytes == 0) (input_bytes == 0)
? 0.0 ? 0.0
: static_cast<double>(comp_stats_[level].bytes_written) / : static_cast<double>(comp_stats_[level].bytes_written +
comp_stats_[level].bytes_written_blob) /
input_bytes; input_bytes;
std::map<LevelStatType, double> level_stats; std::map<LevelStatType, double> level_stats;
PrepareLevelStats(&level_stats, files, files_being_compacted[level], PrepareLevelStats(&level_stats, files, files_being_compacted[level],
@ -1189,7 +1194,8 @@ void InternalStats::DumpCFMapStats(
} }
} }
// Cumulative summary // Cumulative summary
double w_amp = compaction_stats_sum->bytes_written / double w_amp = (compaction_stats_sum->bytes_written +
compaction_stats_sum->bytes_written_blob) /
static_cast<double>(curr_ingest + 1); static_cast<double>(curr_ingest + 1);
// Stats summary across levels // Stats summary across levels
std::map<LevelStatType, double> sum_stats; std::map<LevelStatType, double> sum_stats;
@ -1294,7 +1300,8 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
CompactionStats interval_stats(compaction_stats_sum); CompactionStats interval_stats(compaction_stats_sum);
interval_stats.Subtract(cf_stats_snapshot_.comp_stats); interval_stats.Subtract(cf_stats_snapshot_.comp_stats);
double w_amp = double w_amp =
interval_stats.bytes_written / static_cast<double>(interval_ingest); (interval_stats.bytes_written + interval_stats.bytes_written_blob) /
static_cast<double>(interval_ingest);
PrintLevelStats(buf, sizeof(buf), "Int", 0, 0, 0, 0, w_amp, interval_stats); PrintLevelStats(buf, sizeof(buf), "Int", 0, 0, 0, 0, w_amp, interval_stats);
value->append(buf); value->append(buf);
@ -1354,7 +1361,8 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
for (int level = 0; level < number_levels_; level++) { for (int level = 0; level < number_levels_; level++) {
compact_bytes_read += comp_stats_[level].bytes_read_output_level + compact_bytes_read += comp_stats_[level].bytes_read_output_level +
comp_stats_[level].bytes_read_non_output_levels; comp_stats_[level].bytes_read_non_output_levels;
compact_bytes_write += comp_stats_[level].bytes_written; compact_bytes_write += comp_stats_[level].bytes_written +
comp_stats_[level].bytes_written_blob;
compact_micros += comp_stats_[level].micros; compact_micros += comp_stats_[level].micros;
} }

View File

@ -79,6 +79,7 @@ enum class LevelStatType {
AVG_SEC, AVG_SEC,
KEY_IN, KEY_IN,
KEY_DROP, KEY_DROP,
W_BLOB_GB,
TOTAL // total number of types TOTAL // total number of types
}; };
@ -143,32 +144,39 @@ class InternalStats {
uint64_t micros; uint64_t micros;
uint64_t cpu_micros; uint64_t cpu_micros;
// The number of bytes read from all non-output levels // The number of bytes read from all non-output levels (table files)
uint64_t bytes_read_non_output_levels; uint64_t bytes_read_non_output_levels;
// The number of bytes read from the compaction output level. // The number of bytes read from the compaction output level (table files)
uint64_t bytes_read_output_level; uint64_t bytes_read_output_level;
// Total number of bytes written during compaction // Total number of bytes written to table files during compaction
uint64_t bytes_written; uint64_t bytes_written;
// Total number of bytes moved to the output level // Total number of bytes written to blob files during compaction
uint64_t bytes_written_blob;
// Total number of bytes moved to the output level (table files)
uint64_t bytes_moved; uint64_t bytes_moved;
// The number of compaction input files in all non-output levels. // The number of compaction input files in all non-output levels (table
// files)
int num_input_files_in_non_output_levels; int num_input_files_in_non_output_levels;
// The number of compaction input files in the output level. // The number of compaction input files in the output level (table files)
int num_input_files_in_output_level; int num_input_files_in_output_level;
// The number of compaction output files. // The number of compaction output files (table files)
int num_output_files; int num_output_files;
// The number of compaction output files (blob files)
int num_output_files_blob;
// Total incoming entries during compaction between levels N and N+1 // Total incoming entries during compaction between levels N and N+1
uint64_t num_input_records; uint64_t num_input_records;
// Accumulated diff number of entries // Accumulated diff number of entries
// (num input entries - num output entires) for compaction levels N and N+1 // (num input entries - num output entries) for compaction levels N and N+1
uint64_t num_dropped_records; uint64_t num_dropped_records;
// Number of compactions done // Number of compactions done
@ -183,10 +191,12 @@ class InternalStats {
bytes_read_non_output_levels(0), bytes_read_non_output_levels(0),
bytes_read_output_level(0), bytes_read_output_level(0),
bytes_written(0), bytes_written(0),
bytes_written_blob(0),
bytes_moved(0), bytes_moved(0),
num_input_files_in_non_output_levels(0), num_input_files_in_non_output_levels(0),
num_input_files_in_output_level(0), num_input_files_in_output_level(0),
num_output_files(0), num_output_files(0),
num_output_files_blob(0),
num_input_records(0), num_input_records(0),
num_dropped_records(0), num_dropped_records(0),
count(0) { count(0) {
@ -202,10 +212,12 @@ class InternalStats {
bytes_read_non_output_levels(0), bytes_read_non_output_levels(0),
bytes_read_output_level(0), bytes_read_output_level(0),
bytes_written(0), bytes_written(0),
bytes_written_blob(0),
bytes_moved(0), bytes_moved(0),
num_input_files_in_non_output_levels(0), num_input_files_in_non_output_levels(0),
num_input_files_in_output_level(0), num_input_files_in_output_level(0),
num_output_files(0), num_output_files(0),
num_output_files_blob(0),
num_input_records(0), num_input_records(0),
num_dropped_records(0), num_dropped_records(0),
count(c) { count(c) {
@ -227,11 +239,13 @@ class InternalStats {
bytes_read_non_output_levels(c.bytes_read_non_output_levels), bytes_read_non_output_levels(c.bytes_read_non_output_levels),
bytes_read_output_level(c.bytes_read_output_level), bytes_read_output_level(c.bytes_read_output_level),
bytes_written(c.bytes_written), bytes_written(c.bytes_written),
bytes_written_blob(c.bytes_written_blob),
bytes_moved(c.bytes_moved), bytes_moved(c.bytes_moved),
num_input_files_in_non_output_levels( num_input_files_in_non_output_levels(
c.num_input_files_in_non_output_levels), c.num_input_files_in_non_output_levels),
num_input_files_in_output_level(c.num_input_files_in_output_level), num_input_files_in_output_level(c.num_input_files_in_output_level),
num_output_files(c.num_output_files), num_output_files(c.num_output_files),
num_output_files_blob(c.num_output_files_blob),
num_input_records(c.num_input_records), num_input_records(c.num_input_records),
num_dropped_records(c.num_dropped_records), num_dropped_records(c.num_dropped_records),
count(c.count) { count(c.count) {
@ -247,11 +261,13 @@ class InternalStats {
bytes_read_non_output_levels = c.bytes_read_non_output_levels; bytes_read_non_output_levels = c.bytes_read_non_output_levels;
bytes_read_output_level = c.bytes_read_output_level; bytes_read_output_level = c.bytes_read_output_level;
bytes_written = c.bytes_written; bytes_written = c.bytes_written;
bytes_written_blob = c.bytes_written_blob;
bytes_moved = c.bytes_moved; bytes_moved = c.bytes_moved;
num_input_files_in_non_output_levels = num_input_files_in_non_output_levels =
c.num_input_files_in_non_output_levels; c.num_input_files_in_non_output_levels;
num_input_files_in_output_level = c.num_input_files_in_output_level; num_input_files_in_output_level = c.num_input_files_in_output_level;
num_output_files = c.num_output_files; num_output_files = c.num_output_files;
num_output_files_blob = c.num_output_files_blob;
num_input_records = c.num_input_records; num_input_records = c.num_input_records;
num_dropped_records = c.num_dropped_records; num_dropped_records = c.num_dropped_records;
count = c.count; count = c.count;
@ -269,10 +285,12 @@ class InternalStats {
this->bytes_read_non_output_levels = 0; this->bytes_read_non_output_levels = 0;
this->bytes_read_output_level = 0; this->bytes_read_output_level = 0;
this->bytes_written = 0; this->bytes_written = 0;
this->bytes_written_blob = 0;
this->bytes_moved = 0; this->bytes_moved = 0;
this->num_input_files_in_non_output_levels = 0; this->num_input_files_in_non_output_levels = 0;
this->num_input_files_in_output_level = 0; this->num_input_files_in_output_level = 0;
this->num_output_files = 0; this->num_output_files = 0;
this->num_output_files_blob = 0;
this->num_input_records = 0; this->num_input_records = 0;
this->num_dropped_records = 0; this->num_dropped_records = 0;
this->count = 0; this->count = 0;
@ -288,12 +306,14 @@ class InternalStats {
this->bytes_read_non_output_levels += c.bytes_read_non_output_levels; this->bytes_read_non_output_levels += c.bytes_read_non_output_levels;
this->bytes_read_output_level += c.bytes_read_output_level; this->bytes_read_output_level += c.bytes_read_output_level;
this->bytes_written += c.bytes_written; this->bytes_written += c.bytes_written;
this->bytes_written_blob += c.bytes_written_blob;
this->bytes_moved += c.bytes_moved; this->bytes_moved += c.bytes_moved;
this->num_input_files_in_non_output_levels += this->num_input_files_in_non_output_levels +=
c.num_input_files_in_non_output_levels; c.num_input_files_in_non_output_levels;
this->num_input_files_in_output_level += this->num_input_files_in_output_level +=
c.num_input_files_in_output_level; c.num_input_files_in_output_level;
this->num_output_files += c.num_output_files; this->num_output_files += c.num_output_files;
this->num_output_files_blob += c.num_output_files_blob;
this->num_input_records += c.num_input_records; this->num_input_records += c.num_input_records;
this->num_dropped_records += c.num_dropped_records; this->num_dropped_records += c.num_dropped_records;
this->count += c.count; this->count += c.count;
@ -309,12 +329,14 @@ class InternalStats {
this->bytes_read_non_output_levels -= c.bytes_read_non_output_levels; this->bytes_read_non_output_levels -= c.bytes_read_non_output_levels;
this->bytes_read_output_level -= c.bytes_read_output_level; this->bytes_read_output_level -= c.bytes_read_output_level;
this->bytes_written -= c.bytes_written; this->bytes_written -= c.bytes_written;
this->bytes_written_blob -= c.bytes_written_blob;
this->bytes_moved -= c.bytes_moved; this->bytes_moved -= c.bytes_moved;
this->num_input_files_in_non_output_levels -= this->num_input_files_in_non_output_levels -=
c.num_input_files_in_non_output_levels; c.num_input_files_in_non_output_levels;
this->num_input_files_in_output_level -= this->num_input_files_in_output_level -=
c.num_input_files_in_output_level; c.num_input_files_in_output_level;
this->num_output_files -= c.num_output_files; this->num_output_files -= c.num_output_files;
this->num_output_files_blob -= c.num_output_files_blob;
this->num_input_records -= c.num_input_records; this->num_input_records -= c.num_input_records;
this->num_dropped_records -= c.num_dropped_records; this->num_dropped_records -= c.num_dropped_records;
this->count -= c.count; this->count -= c.count;
@ -653,10 +675,12 @@ class InternalStats {
uint64_t bytes_read_non_output_levels; uint64_t bytes_read_non_output_levels;
uint64_t bytes_read_output_level; uint64_t bytes_read_output_level;
uint64_t bytes_written; uint64_t bytes_written;
uint64_t bytes_written_blob;
uint64_t bytes_moved; uint64_t bytes_moved;
int num_input_files_in_non_output_levels; int num_input_files_in_non_output_levels;
int num_input_files_in_output_level; int num_input_files_in_output_level;
int num_output_files; int num_output_files;
int num_output_files_blob;
uint64_t num_input_records; uint64_t num_input_records;
uint64_t num_dropped_records; uint64_t num_dropped_records;
int count; int count;