Consolidate file cutting logic in compaction loop

Summary:
It was really annoying to have two places (top and bottom of compaction loop) where we cut output files. I had bugs in both DeleteRange and dictionary compression due to updating only one of the two. This diff consolidates the file-cutting logic to the bottom of the compaction loop.

Keep in mind that my goal with input_status is to be consistent with the past behavior, even though I'm not sure it's ideal.
Closes https://github.com/facebook/rocksdb/pull/1832

Differential Revision: D4503038

Pulled By: ajkr

fbshipit-source-id: 7da5213
This commit is contained in:
Andrew Kryczka 2017-02-08 16:07:29 -08:00 committed by Facebook Github Bot
parent ac2a77a746
commit b48e4778be

View File

@ -747,6 +747,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
shutting_down_)); shutting_down_));
auto c_iter = sub_compact->c_iter.get(); auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst(); c_iter->SeekToFirst();
if (c_iter->Valid() &&
sub_compact->compaction->output_level() != 0) {
// ShouldStopBefore() maintains state based on keys processed so far. The
// compaction loop always calls it on the "next" key, thus won't tell it the
// first key. So we do that here.
sub_compact->ShouldStopBefore(
c_iter->key(), sub_compact->current_output_file_size);
}
const auto& c_iter_stats = c_iter->iter_stats(); const auto& c_iter_stats = c_iter->iter_stats();
auto sample_begin_offset_iter = sample_begin_offsets.cbegin(); auto sample_begin_offset_iter = sample_begin_offsets.cbegin();
// data_begin_offset and compression_dict are only valid while generating // data_begin_offset and compression_dict are only valid while generating
@ -766,21 +774,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (end != nullptr && if (end != nullptr &&
cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
break; break;
} else if (sub_compact->compaction->output_level() != 0 &&
sub_compact->ShouldStopBefore(
key, sub_compact->current_output_file_size) &&
sub_compact->builder != nullptr) {
CompactionIterationStats range_del_out_stats;
status = FinishCompactionOutputFile(input->status(), sub_compact,
range_del_agg.get(),
&range_del_out_stats, &key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
if (!status.ok()) {
break;
}
} }
if (c_iter_stats.num_input_records % kRecordStatsEvery == if (c_iter_stats.num_input_records % kRecordStatsEvery ==
kRecordStatsEvery - 1) { kRecordStatsEvery - 1) {
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
@ -852,17 +846,37 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
} }
} }
// Close output file if it is big enough // Close output file if it is big enough. Two possibilities determine it's
// time to close it: (1) the current key should be this file's last key, (2)
// the next key should not be in this file.
//
// TODO(aekmekji): determine if file should be closed earlier than this // TODO(aekmekji): determine if file should be closed earlier than this
// during subcompactions (i.e. if output size, estimated by input size, is // during subcompactions (i.e. if output size, estimated by input size, is
// going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
// and 0.6MB instead of 1MB and 0.2MB) // and 0.6MB instead of 1MB and 0.2MB)
bool output_file_ended = false;
Status input_status;
if (sub_compact->compaction->output_level() != 0 && if (sub_compact->compaction->output_level() != 0 &&
sub_compact->current_output_file_size >= sub_compact->current_output_file_size >=
sub_compact->compaction->max_output_file_size()) { sub_compact->compaction->max_output_file_size()) {
Status input_status = input->status(); // (1) this key terminates the file. For historical reasons, the iterator
c_iter->Next(); // status before advancing will be given to FinishCompactionOutputFile().
input_status = input->status();
output_file_ended = true;
}
c_iter->Next();
if (!output_file_ended && c_iter->Valid() &&
sub_compact->compaction->output_level() != 0 &&
sub_compact->ShouldStopBefore(
c_iter->key(), sub_compact->current_output_file_size) &&
sub_compact->builder != nullptr) {
// (2) this key belongs to the next file. For historical reasons, the
// iterator status after advancing will be given to
// FinishCompactionOutputFile().
input_status = input->status();
output_file_ended = true;
}
if (output_file_ended) {
const Slice* next_key = nullptr; const Slice* next_key = nullptr;
if (c_iter->Valid()) { if (c_iter->Valid()) {
next_key = &c_iter->key(); next_key = &c_iter->key();
@ -878,8 +892,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// files. // files.
sub_compact->compression_dict = std::move(compression_dict); sub_compact->compression_dict = std::move(compression_dict);
} }
} else {
c_iter->Next();
} }
} }