Add is_full_compaction to CompactionJobStats, cleanup (#7451)

Summary:
This exposes to the listener interface whether a compaction was
full or not. Also cleaned up API comment for CompactionJobInfo::stats,
which is not of a nullable type. And since CompactionJob is always
created with non-null CompactionJobStats, removed conditionals on it
being nullptr and instead assert non-null.

TODO later: update C and Java interfaces

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7451

Test Plan: updated existing unit tests to check new field, make check

Reviewed By: ltamasi

Differential Revision: D23977796

Pulled By: pdillinger

fbshipit-source-id: 1ae7e26cb949631c2b2fb9e696710daf53cc378d
This commit is contained in:
Peter Dillinger 2020-10-01 12:51:33 -07:00 committed by Facebook GitHub Bot
parent 786c1a2cc4
commit 9082771b86
6 changed files with 52 additions and 59 deletions

View File

@ -15,6 +15,7 @@
### New Features
* Methods to configure serialize, and compare -- such as TableFactory -- are exposed directly through the Configurable base class (from which these objects inherit). This change will allow for better and more thorough configuration management and retrieval in the future. The options for a Configurable object can be set via the ConfigureFromMap, ConfigureFromString, or ConfigureOption method. The serialized version of the options of an object can be retrieved via the GetOptionString, ToString, or GetOption methods. The list of options supported by an object can be obtained via the GetOptionNames method. The "raw" object (such as the BlockBasedTableOption) for an option may be retrieved via the GetOptions method. Configurable options can be compared via the AreEquivalent method. The settings within a Configurable object may be validated via the ValidateOptions method. The object may be intialized (at which point only mutable options may be updated) via the PrepareOptions method.
* Introduce options.check_flush_compaction_key_order with default value to be true. With this option, during flush and compaction, key order will be checked when writing to each SST file. If the order is violated, the flush or compaction will fail.
* Added is_full_compaction to CompactionJobStats, so that the information is available through the EventListener interface.
## 6.13 (09/12/2020)
### Bug fixes

View File

@ -276,10 +276,8 @@ void CompactionJob::AggregateStatistics() {
compact_->total_bytes += sc.total_bytes;
compact_->num_output_records += sc.num_output_records;
}
if (compaction_job_stats_) {
for (SubcompactionState& sc : compact_->sub_compact_states) {
compaction_job_stats_->Add(sc.compaction_job_stats);
}
for (SubcompactionState& sc : compact_->sub_compact_states) {
compaction_job_stats_->Add(sc.compaction_job_stats);
}
}
@ -332,6 +330,7 @@ CompactionJob::CompactionJob(
measure_io_stats_(measure_io_stats),
write_hint_(Env::WLTH_NOT_SET),
thread_pri_(thread_pri) {
assert(compaction_job_stats_ != nullptr);
assert(log_buffer_ != nullptr);
const auto* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
@ -383,10 +382,9 @@ void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
// to ensure GetThreadList() can always show them all together.
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
if (compaction_job_stats_) {
compaction_job_stats_->is_manual_compaction =
compaction->is_manual_compaction();
}
compaction_job_stats_->is_manual_compaction =
compaction->is_manual_compaction();
compaction_job_stats_->is_full_compaction = compaction->is_full_compaction();
}
void CompactionJob::Prepare() {
@ -804,14 +802,12 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
<< compact_->sub_compact_states.size() << "output_compression"
<< CompressionTypeToString(compact_->compaction->output_compression());
if (compaction_job_stats_ != nullptr) {
stream << "num_single_delete_mismatches"
<< compaction_job_stats_->num_single_del_mismatch;
stream << "num_single_delete_fallthrough"
<< compaction_job_stats_->num_single_del_fallthru;
}
stream << "num_single_delete_mismatches"
<< compaction_job_stats_->num_single_del_mismatch;
stream << "num_single_delete_fallthrough"
<< compaction_job_stats_->num_single_del_fallthru;
if (measure_io_stats_ && compaction_job_stats_ != nullptr) {
if (measure_io_stats_) {
stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
stream << "file_range_sync_nanos"
<< compaction_job_stats_->file_range_sync_nanos;
@ -1750,32 +1746,29 @@ void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
void CompactionJob::UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const {
#ifndef ROCKSDB_LITE
if (compaction_job_stats_) {
compaction_job_stats_->elapsed_micros = stats.micros;
compaction_job_stats_->elapsed_micros = stats.micros;
// input information
compaction_job_stats_->total_input_bytes =
stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
compaction_job_stats_->num_input_records = stats.num_input_records;
compaction_job_stats_->num_input_files =
stats.num_input_files_in_non_output_levels +
stats.num_input_files_in_output_level;
compaction_job_stats_->num_input_files_at_output_level =
stats.num_input_files_in_output_level;
// input information
compaction_job_stats_->total_input_bytes =
stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
compaction_job_stats_->num_input_records = stats.num_input_records;
compaction_job_stats_->num_input_files =
stats.num_input_files_in_non_output_levels +
stats.num_input_files_in_output_level;
compaction_job_stats_->num_input_files_at_output_level =
stats.num_input_files_in_output_level;
// output information
compaction_job_stats_->total_output_bytes = stats.bytes_written;
compaction_job_stats_->num_output_records = compact_->num_output_records;
compaction_job_stats_->num_output_files = stats.num_output_files;
// output information
compaction_job_stats_->total_output_bytes = stats.bytes_written;
compaction_job_stats_->num_output_records = compact_->num_output_records;
compaction_job_stats_->num_output_files = stats.num_output_files;
if (compact_->NumOutputFiles() > 0U) {
CopyPrefix(compact_->SmallestUserKey(),
CompactionJobStats::kMaxPrefixLength,
&compaction_job_stats_->smallest_output_key_prefix);
CopyPrefix(compact_->LargestUserKey(),
CompactionJobStats::kMaxPrefixLength,
&compaction_job_stats_->largest_output_key_prefix);
}
if (compact_->NumOutputFiles() > 0U) {
CopyPrefix(compact_->SmallestUserKey(),
CompactionJobStats::kMaxPrefixLength,
&compaction_job_stats_->smallest_output_key_prefix);
CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
&compaction_job_stats_->largest_output_key_prefix);
}
#else
(void)stats;

View File

@ -459,6 +459,7 @@ class CompactionJobStatsChecker : public EventListener {
ASSERT_EQ(current_stats.num_output_files,
stats.num_output_files);
ASSERT_EQ(current_stats.is_full_compaction, stats.is_full_compaction);
ASSERT_EQ(current_stats.is_manual_compaction,
stats.is_manual_compaction);
@ -571,7 +572,7 @@ CompactionJobStats NewManualCompactionJobStats(
uint64_t num_input_records, size_t key_size, size_t value_size,
size_t num_output_files, uint64_t num_output_records,
double compression_ratio, uint64_t num_records_replaced,
bool is_manual = true) {
bool is_full = false, bool is_manual = true) {
CompactionJobStats stats;
stats.Reset();
@ -595,6 +596,7 @@ CompactionJobStats NewManualCompactionJobStats(
stats.total_input_raw_value_bytes =
num_input_records * value_size;
stats.is_full_compaction = is_full;
stats.is_manual_compaction = is_manual;
stats.num_records_replaced = num_records_replaced;
@ -981,25 +983,20 @@ TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
if (num_input_units == 0) {
continue;
}
// A full compaction only happens when the number of flushes equals to
// the number of compaction input runs.
bool is_full = num_flushes == num_input_units;
// The following statement determines the expected smallest key
// based on whether it is a full compaction. A full compaction only
// happens when the number of flushes equals to the number of compaction
// input runs.
uint64_t smallest_key =
(num_flushes == num_input_units) ?
key_base : key_base * (num_flushes - 1);
// based on whether it is a full compaction.
uint64_t smallest_key = is_full ? key_base : key_base * (num_flushes - 1);
stats_checker->AddExpectedStats(
NewManualCompactionJobStats(
Key(smallest_key, 10),
Key(smallest_key + key_base * num_input_units - key_interval, 10),
num_input_units,
num_input_units > 2 ? num_input_units / 2 : 0,
num_keys_per_table * num_input_units,
kKeySize, kValueSize,
num_input_units,
num_keys_per_table * num_input_units,
1.0, 0, false));
stats_checker->AddExpectedStats(NewManualCompactionJobStats(
Key(smallest_key, 10),
Key(smallest_key + key_base * num_input_units - key_interval, 10),
num_input_units, num_input_units > 2 ? num_input_units / 2 : 0,
num_keys_per_table * num_input_units, kKeySize, kValueSize,
num_input_units, num_keys_per_table * num_input_units, 1.0, 0, is_full,
false));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 3U);

View File

@ -35,6 +35,8 @@ struct CompactionJobStats {
// the number of compaction output files.
size_t num_output_files;
// true if the compaction is a full compaction (all live SST files input)
bool is_full_compaction;
// true if the compaction is a manual compaction
bool is_manual_compaction;

View File

@ -294,8 +294,7 @@ struct CompactionJobInfo {
// Compression algorithm used for output files
CompressionType compression;
// If non-null, this variable stores detailed information
// about this compaction.
// Statistics and other additional details on the compaction
CompactionJobStats stats;
};

View File

@ -20,7 +20,8 @@ void CompactionJobStats::Reset() {
num_output_records = 0;
num_output_files = 0;
is_manual_compaction = 0;
is_full_compaction = false;
is_manual_compaction = false;
total_input_bytes = 0;
total_output_bytes = 0;