diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 0e2f68f21..f345e1f96 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -747,6 +747,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { shutting_down_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); + if (c_iter->Valid() && + sub_compact->compaction->output_level() != 0) { + // ShouldStopBefore() maintains state based on keys processed so far. The + // compaction loop always calls it on the "next" key, thus won't tell it the + // first key. So we do that here. + sub_compact->ShouldStopBefore( + c_iter->key(), sub_compact->current_output_file_size); + } const auto& c_iter_stats = c_iter->iter_stats(); auto sample_begin_offset_iter = sample_begin_offsets.cbegin(); // data_begin_offset and compression_dict are only valid while generating @@ -766,21 +774,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (end != nullptr && cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { break; - } else if (sub_compact->compaction->output_level() != 0 && - sub_compact->ShouldStopBefore( - key, sub_compact->current_output_file_size) && - sub_compact->builder != nullptr) { - CompactionIterationStats range_del_out_stats; - status = FinishCompactionOutputFile(input->status(), sub_compact, - range_del_agg.get(), - &range_del_out_stats, &key); - RecordDroppedKeys(range_del_out_stats, - &sub_compact->compaction_job_stats); - if (!status.ok()) { - break; - } } - if (c_iter_stats.num_input_records % kRecordStatsEvery == kRecordStatsEvery - 1) { RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); @@ -852,17 +846,37 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } } - // Close output file if it is big enough + // Close output file if it is big enough. Two possibilities determine it's + // time to close it: (1) the current key should be this file's last key, (2) + // the next key should not be in this file. + // // TODO(aekmekji): determine if file should be closed earlier than this // during subcompactions (i.e. if output size, estimated by input size, is // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB // and 0.6MB instead of 1MB and 0.2MB) + bool output_file_ended = false; + Status input_status; if (sub_compact->compaction->output_level() != 0 && sub_compact->current_output_file_size >= sub_compact->compaction->max_output_file_size()) { - Status input_status = input->status(); - c_iter->Next(); - + // (1) this key terminates the file. For historical reasons, the iterator + // status before advancing will be given to FinishCompactionOutputFile(). + input_status = input->status(); + output_file_ended = true; + } + c_iter->Next(); + if (!output_file_ended && c_iter->Valid() && + sub_compact->compaction->output_level() != 0 && + sub_compact->ShouldStopBefore( + c_iter->key(), sub_compact->current_output_file_size) && + sub_compact->builder != nullptr) { + // (2) this key belongs to the next file. For historical reasons, the + // iterator status after advancing will be given to + // FinishCompactionOutputFile(). + input_status = input->status(); + output_file_ended = true; + } + if (output_file_ended) { const Slice* next_key = nullptr; if (c_iter->Valid()) { next_key = &c_iter->key(); @@ -878,8 +892,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // files. sub_compact->compression_dict = std::move(compression_dict); } - } else { - c_iter->Next(); } }