Fix the "records dropped" statistics (#6325)
Summary: The earlier code used two conflicting definitions for the number of input records going into a compaction, one based on the `rocksdb.num.entries` table property and one based on `CompactionIterationStats`. The first one is correct and in line with how output records are counted, while the second one incorrectly ignores input records in various cases when the `CompactionIterator` advances or reseeks the input iterator (this can happen, amongst other cases, when dealing with `SingleDelete`s, regular `Delete`s, `Merge`s, and compaction filters). This can result in the code undercounting the input records and computing an incorrect value for "records dropped" during the compaction. The patch fixes this by switching over to the correct (table property based) input record count for "records dropped". Pull Request resolved: https://github.com/facebook/rocksdb/pull/6325 Test Plan: Tested using `make check` and `db_bench`. Differential Revision: D19525491 Pulled By: ltamasi fbshipit-source-id: 4340b0b2f41546db8e356db70ca02199e48fa636
This commit is contained in:
parent
0672a6db64
commit
f34782a67d
@ -21,6 +21,7 @@
|
||||
* Fix incorrect results while block-based table uses kHashSearch, together with Prev()/SeekForPrev().
|
||||
* Fixed an issue where the thread pools were not resized upon setting `max_background_jobs` dynamically through the `SetDBOptions` interface.
|
||||
* Fix a bug that can cause write threads to hang when a slowdown/stall happens and there is a mix of writers with WriteOptions::no_slowdown set/unset.
|
||||
* Fixed an issue where an incorrect "number of input records" value was used to compute the "records dropped" statistics for compactions.
|
||||
|
||||
### New Features
|
||||
* It is now possible to enable periodic compactions for the base DB when using BlobDB.
|
||||
|
@ -144,7 +144,6 @@ struct CompactionJob::SubcompactionState {
|
||||
|
||||
// State during the subcompaction
|
||||
uint64_t total_bytes;
|
||||
uint64_t num_input_records;
|
||||
uint64_t num_output_records;
|
||||
CompactionJobStats compaction_job_stats;
|
||||
uint64_t approx_size;
|
||||
@ -165,7 +164,6 @@ struct CompactionJob::SubcompactionState {
|
||||
builder(nullptr),
|
||||
current_output_file_size(0),
|
||||
total_bytes(0),
|
||||
num_input_records(0),
|
||||
num_output_records(0),
|
||||
approx_size(size),
|
||||
grandparent_index(0),
|
||||
@ -186,7 +184,6 @@ struct CompactionJob::SubcompactionState {
|
||||
builder = std::move(o.builder);
|
||||
current_output_file_size = std::move(o.current_output_file_size);
|
||||
total_bytes = std::move(o.total_bytes);
|
||||
num_input_records = std::move(o.num_input_records);
|
||||
num_output_records = std::move(o.num_output_records);
|
||||
compaction_job_stats = std::move(o.compaction_job_stats);
|
||||
approx_size = std::move(o.approx_size);
|
||||
@ -245,13 +242,11 @@ struct CompactionJob::CompactionState {
|
||||
Status status;
|
||||
|
||||
uint64_t total_bytes;
|
||||
uint64_t num_input_records;
|
||||
uint64_t num_output_records;
|
||||
|
||||
explicit CompactionState(Compaction* c)
|
||||
: compaction(c),
|
||||
total_bytes(0),
|
||||
num_input_records(0),
|
||||
num_output_records(0) {}
|
||||
|
||||
size_t NumOutputFiles() {
|
||||
@ -289,7 +284,6 @@ struct CompactionJob::CompactionState {
|
||||
void CompactionJob::AggregateStatistics() {
|
||||
for (SubcompactionState& sc : compact_->sub_compact_states) {
|
||||
compact_->total_bytes += sc.total_bytes;
|
||||
compact_->num_input_records += sc.num_input_records;
|
||||
compact_->num_output_records += sc.num_output_records;
|
||||
}
|
||||
if (compaction_job_stats_) {
|
||||
@ -770,12 +764,12 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
|
||||
auto stream = event_logger_->LogToBuffer(log_buffer_);
|
||||
stream << "job" << job_id_ << "event"
|
||||
<< "compaction_finished"
|
||||
<< "compaction_time_micros" << compaction_stats_.micros
|
||||
<< "compaction_time_cpu_micros" << compaction_stats_.cpu_micros
|
||||
<< "output_level" << compact_->compaction->output_level()
|
||||
<< "num_output_files" << compact_->NumOutputFiles()
|
||||
<< "total_output_size" << compact_->total_bytes << "num_input_records"
|
||||
<< compact_->num_input_records << "num_output_records"
|
||||
<< "compaction_time_micros" << stats.micros
|
||||
<< "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
|
||||
<< compact_->compaction->output_level() << "num_output_files"
|
||||
<< compact_->NumOutputFiles() << "total_output_size"
|
||||
<< compact_->total_bytes << "num_input_records"
|
||||
<< stats.num_input_records << "num_output_records"
|
||||
<< compact_->num_output_records << "num_subcompactions"
|
||||
<< compact_->sub_compact_states.size() << "output_compression"
|
||||
<< CompressionTypeToString(compact_->compaction->output_compression());
|
||||
@ -991,7 +985,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
}
|
||||
}
|
||||
|
||||
sub_compact->num_input_records = c_iter_stats.num_input_records;
|
||||
sub_compact->compaction_job_stats.num_input_deletion_records =
|
||||
c_iter_stats.num_input_deletion_records;
|
||||
sub_compact->compaction_job_stats.num_corrupt_keys =
|
||||
@ -1589,6 +1582,8 @@ void CompactionJob::UpdateCompactionStats() {
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t num_output_records = 0;
|
||||
|
||||
for (const auto& sub_compact : compact_->sub_compact_states) {
|
||||
size_t num_output_files = sub_compact.outputs.size();
|
||||
if (sub_compact.builder != nullptr) {
|
||||
@ -1598,13 +1593,16 @@ void CompactionJob::UpdateCompactionStats() {
|
||||
}
|
||||
compaction_stats_.num_output_files += static_cast<int>(num_output_files);
|
||||
|
||||
num_output_records += sub_compact.num_output_records;
|
||||
|
||||
for (const auto& out : sub_compact.outputs) {
|
||||
compaction_stats_.bytes_written += out.meta.fd.file_size;
|
||||
}
|
||||
if (sub_compact.num_input_records > sub_compact.num_output_records) {
|
||||
compaction_stats_.num_dropped_records +=
|
||||
sub_compact.num_input_records - sub_compact.num_output_records;
|
||||
}
|
||||
}
|
||||
|
||||
if (compaction_stats_.num_input_records > num_output_records) {
|
||||
compaction_stats_.num_dropped_records =
|
||||
compaction_stats_.num_input_records - num_output_records;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1632,7 +1630,7 @@ void CompactionJob::UpdateCompactionJobStats(
|
||||
// input information
|
||||
compaction_job_stats_->total_input_bytes =
|
||||
stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
|
||||
compaction_job_stats_->num_input_records = compact_->num_input_records;
|
||||
compaction_job_stats_->num_input_records = stats.num_input_records;
|
||||
compaction_job_stats_->num_input_files =
|
||||
stats.num_input_files_in_non_output_levels +
|
||||
stats.num_input_files_in_output_level;
|
||||
|
Loading…
Reference in New Issue
Block a user