From a28b3c438887c66544ad4d912f7f41484d114f8b Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Tue, 28 Oct 2014 10:00:51 -0700 Subject: [PATCH] unfriend UniversalCompactionPicker,LevelCompactionPicker and FIFOCompactionPicker from VersionSet Summary: as title Test Plan: make release I will run make all check for all stacked diffs before commit Reviewers: sdong, yhchiang, rven, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D27573 --- db/compaction_picker.cc | 121 ++++++++++++++++++++-------------------- db/version_set.cc | 4 +- db/version_set.h | 29 ++++++---- 3 files changed, 83 insertions(+), 71 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 42887e057..6377ebc64 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -372,12 +372,11 @@ Compaction* LevelCompactionPicker::PickCompaction( // // Find the compactions by size on all levels. for (int i = 0; i < NumberLevels() - 1; i++) { - assert(i == 0 || - version->compaction_score_[i] <= version->compaction_score_[i - 1]); - level = version->compaction_level_[i]; - if ((version->compaction_score_[i] >= 1)) { - c = PickCompactionBySize(mutable_cf_options, version, level, - version->compaction_score_[i]); + double score = version->CompactionScore(i); + assert(i == 0 || score <= version->CompactionScore(i - 1)); + level = version->CompactionScoreLevel(i); + if (score >= 1) { + c = PickCompactionBySize(mutable_cf_options, version, level, score); if (c == nullptr || ExpandWhileOverlapping(c) == false) { delete c; c = nullptr; @@ -455,7 +454,8 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( // Pick the largest file in this level that is not already // being compacted - std::vector& file_size = version->files_by_size_[level]; + const std::vector& file_size = version->FilesBySize(level); + const std::vector& level_files = version->LevelFiles(level); // record the first file that is not yet compacted int nextIndex = -1; @@ -463,13 +463,13 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( for (unsigned int i = version->NextCompactionIndex(level); i < file_size.size(); i++) { int index = file_size[i]; - FileMetaData* f = version->files_[level][index]; + FileMetaData* f = level_files[index]; // Check to verify files are arranged in descending compensated size. assert((i == file_size.size() - 1) || - (i >= Version::number_of_files_to_sort_ - 1) || + (i >= Version::kNumberFilesToSort - 1) || (f->compensated_file_size >= - version->files_[level][file_size[i + 1]]->compensated_file_size)); + level_files[file_size[i + 1]]->compensated_file_size)); // do not pick a file to compact if it is being compacted // from n-1 level. @@ -512,26 +512,27 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( Compaction* UniversalCompactionPicker::PickCompaction( const MutableCFOptions& mutable_cf_options, Version* version, LogBuffer* log_buffer) { - int level = 0; - double score = version->compaction_score_[0]; + const int kLevel0 = 0; + double score = version->CompactionScore(kLevel0); + const std::vector& level_files = version->LevelFiles(kLevel0); - if ((version->files_[level].size() < + if ((level_files.size() < (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger)) { LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n", - version->cfd_->GetName().c_str()); + version->cfd()->GetName().c_str()); return nullptr; } Version::FileSummaryStorage tmp; LogToBuffer(log_buffer, 3072, "[%s] Universal: candidate files(%zu): %s\n", - version->cfd_->GetName().c_str(), version->files_[level].size(), - version->LevelFileSummary(&tmp, 0)); + version->cfd()->GetName().c_str(), level_files.size(), + version->LevelFileSummary(&tmp, kLevel0)); // Check for size amplification first. Compaction* c; if ((c = PickCompactionUniversalSizeAmp( mutable_cf_options, version, score, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for size amp\n", - version->cfd_->GetName().c_str()); + version->cfd()->GetName().c_str()); } else { // Size amplification is within limits. Try reducing read // amplification while maintaining file size ratios. @@ -541,31 +542,31 @@ Compaction* UniversalCompactionPicker::PickCompaction( mutable_cf_options, version, score, ratio, UINT_MAX, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for size ratio\n", - version->cfd_->GetName().c_str()); + version->cfd()->GetName().c_str()); } else { // Size amplification and file size ratios are within configured limits. // If max read amplification is exceeding configured limits, then force // compaction without looking at filesize ratios and try to reduce // the number of files to fewer than level0_file_num_compaction_trigger. - unsigned int num_files = version->files_[level].size() - + unsigned int num_files = level_files.size() - mutable_cf_options.level0_file_num_compaction_trigger; if ((c = PickCompactionUniversalReadAmp( mutable_cf_options, version, score, UINT_MAX, num_files, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for file num -- %u\n", - version->cfd_->GetName().c_str(), num_files); + version->cfd()->GetName().c_str(), num_files); } } } if (c == nullptr) { return nullptr; } - assert(c->inputs_[0].size() > 1); + assert(c->inputs_[kLevel0].size() > 1); // validate that all the chosen files are non overlapping in time FileMetaData* newerfile __attribute__((unused)) = nullptr; - for (unsigned int i = 0; i < c->inputs_[0].size(); i++) { - FileMetaData* f = c->inputs_[0][i]; + for (unsigned int i = 0; i < c->inputs_[kLevel0].size(); i++) { + FileMetaData* f = c->inputs_[kLevel0][i]; assert (f->smallest_seqno <= f->largest_seqno); assert(newerfile == nullptr || newerfile->smallest_seqno > f->largest_seqno); @@ -573,23 +574,22 @@ Compaction* UniversalCompactionPicker::PickCompaction( } // Is the earliest file part of this compaction? - FileMetaData* last_file = c->input_version_->files_[level].back(); - c->bottommost_level_ = c->inputs_[0].files.back() == last_file; + FileMetaData* last_file = level_files.back(); + c->bottommost_level_ = c->inputs_[kLevel0].files.back() == last_file; // update statistics MeasureTime(ioptions_.statistics, - NUM_FILES_IN_SINGLE_COMPACTION, c->inputs_[0].size()); + NUM_FILES_IN_SINGLE_COMPACTION, c->inputs_[kLevel0].size()); // mark all the files that are being compacted c->MarkFilesBeingCompacted(true); // remember this currently undergoing compaction - compactions_in_progress_[level].insert(c); + compactions_in_progress_[kLevel0].insert(c); // Record whether this compaction includes all sst files. // For now, it is only relevant in universal compaction mode. - c->is_full_compaction_ = - (c->inputs_[0].size() == c->input_version_->files_[0].size()); + c->is_full_compaction_ = (c->inputs_[kLevel0].size() == level_files.size()); c->mutable_cf_options_ = mutable_cf_options; return c; @@ -634,7 +634,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( const MutableCFOptions& mutable_cf_options, Version* version, double score, unsigned int ratio, unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) { - int level = 0; + const int kLevel0 = 0; unsigned int min_merge_width = ioptions_.compaction_options_universal.min_merge_width; @@ -642,7 +642,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( ioptions_.compaction_options_universal.max_merge_width; // The files are sorted from newest first to oldest last. - const auto& files = version->files_[level]; + const auto& files = version->LevelFiles(kLevel0); FileMetaData* f = nullptr; bool done = false; @@ -669,7 +669,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( } LogToBuffer(log_buffer, "[%s] Universal: file %" PRIu64 "[%d] being compacted, skipping", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop); + version->cfd()->GetName().c_str(), f->fd.GetNumber(), loop); f = nullptr; } @@ -681,7 +681,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, sizeof(file_num_buf)); LogToBuffer(log_buffer, "[%s] Universal: Possible candidate file %s[%d].", - version->cfd_->GetName().c_str(), file_num_buf, loop); + version->cfd()->GetName().c_str(), file_num_buf, loop); } // Check if the suceeding files need compaction. @@ -732,7 +732,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( LogToBuffer(log_buffer, "[%s] Universal: Skipping file %" PRIu64 "[%d] with size %" PRIu64 " (compensated size %" PRIu64 ") %d\n", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), i, + version->cfd()->GetName().c_str(), f->fd.GetNumber(), i, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted); } @@ -748,7 +748,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int ratio_to_compress = ioptions_.compaction_options_universal.compression_size_percent; if (ratio_to_compress >= 0) { - uint64_t total_size = version->NumLevelBytes(level); + uint64_t total_size = version->NumLevelBytes(kLevel0); uint64_t older_file_size = 0; for (unsigned int i = files.size() - 1; i >= first_index_after; i--) { @@ -766,14 +766,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( } uint32_t path_id = GetPathId(ioptions_, estimated_total_size); - Compaction* c = new Compaction( - version, level, level, mutable_cf_options.MaxFileSizeForLevel(level), - LLONG_MAX, path_id, GetCompressionType(ioptions_, level, + Compaction* c = new Compaction(version, kLevel0, kLevel0, + mutable_cf_options.MaxFileSizeForLevel(kLevel0), + LLONG_MAX, path_id, GetCompressionType(ioptions_, kLevel0, enable_compression)); c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { - FileMetaData* f = c->input_version_->files_[level][i]; + FileMetaData* f = files[i]; c->inputs_[0].files.push_back(f); char file_num_buf[kFormatFileNumberBufSize]; FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, @@ -781,7 +781,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( LogToBuffer(log_buffer, "[%s] Universal: Picking file %s[%d] " "with size %" PRIu64 " (compensated size %" PRIu64 ")\n", - version->cfd_->GetName().c_str(), file_num_buf, i, + version->cfd()->GetName().c_str(), file_num_buf, i, f->fd.GetFileSize(), f->compensated_file_size); } return c; @@ -796,14 +796,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( const MutableCFOptions& mutable_cf_options, Version* version, double score, LogBuffer* log_buffer) { - int level = 0; + const int kLevel = 0; // percentage flexibilty while reducing size amplification uint64_t ratio = ioptions_.compaction_options_universal. max_size_amplification_percent; // The files are sorted from newest first to oldest last. - const auto& files = version->files_[level]; + const auto& files = version->LevelFiles(kLevel); unsigned int candidate_count = 0; uint64_t candidate_size = 0; @@ -821,10 +821,11 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, sizeof(file_num_buf)); LogToBuffer(log_buffer, "[%s] Universal: skipping file %s[%d] compacted %s", - version->cfd_->GetName().c_str(), file_num_buf, loop, + version->cfd()->GetName().c_str(), file_num_buf, loop, " cannot be a candidate to reduce size amp.\n"); f = nullptr; } + if (f == nullptr) { return nullptr; // no candidate files } @@ -833,7 +834,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, sizeof(file_num_buf)); LogToBuffer(log_buffer, "[%s] Universal: First candidate file %s[%d] %s", - version->cfd_->GetName().c_str(), file_num_buf, start_index, + version->cfd()->GetName().c_str(), file_num_buf, start_index, " to reduce size amp.\n"); // keep adding up all the remaining files @@ -845,7 +846,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( sizeof(file_num_buf)); LogToBuffer( log_buffer, "[%s] Universal: Possible candidate file %s[%d] %s.", - version->cfd_->GetName().c_str(), file_num_buf, loop, + version->cfd()->GetName().c_str(), file_num_buf, loop, " is already being compacted. No size amp reduction possible.\n"); return nullptr; } @@ -865,14 +866,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( log_buffer, "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64 "earliest-file-size %" PRIu64, - version->cfd_->GetName().c_str(), candidate_size, earliest_file_size); + version->cfd()->GetName().c_str(), candidate_size, earliest_file_size); return nullptr; } else { LogToBuffer( log_buffer, "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64 "earliest-file-size %" PRIu64, - version->cfd_->GetName().c_str(), candidate_size, earliest_file_size); + version->cfd()->GetName().c_str(), candidate_size, earliest_file_size); } assert(start_index < files.size() - 1); @@ -886,17 +887,17 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( // create a compaction request // We always compact all the files, so always compress. Compaction* c = - new Compaction(version, level, level, - mutable_cf_options.MaxFileSizeForLevel(level), - LLONG_MAX, path_id, GetCompressionType(ioptions_, level)); + new Compaction(version, kLevel, kLevel, + mutable_cf_options.MaxFileSizeForLevel(kLevel), + LLONG_MAX, path_id, GetCompressionType(ioptions_, kLevel)); c->score_ = score; for (unsigned int loop = start_index; loop < files.size(); loop++) { - f = c->input_version_->files_[level][loop]; + f = files[loop]; c->inputs_[0].files.push_back(f); LogToBuffer(log_buffer, "[%s] Universal: size amp picking file %" PRIu64 "[%d] " "with size %" PRIu64 " (compensated size %" PRIu64 ")", - version->cfd_->GetName().c_str(), + version->cfd()->GetName().c_str(), f->fd.GetNumber(), loop, f->fd.GetFileSize(), f->compensated_file_size); } @@ -907,18 +908,20 @@ Compaction* FIFOCompactionPicker::PickCompaction( const MutableCFOptions& mutable_cf_options, Version* version, LogBuffer* log_buffer) { assert(version->NumberLevels() == 1); + const int kLevel0 = 0; + const std::vector& level_files = version->LevelFiles(kLevel0); uint64_t total_size = 0; - for (const auto& file : version->files_[0]) { + for (const auto& file : level_files) { total_size += file->compensated_file_size; } if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size || - version->files_[0].size() == 0) { + level_files.size() == 0) { // total size not exceeded LogToBuffer(log_buffer, "[%s] FIFO compaction: nothing to do. Total size %" PRIu64 ", max size %" PRIu64 "\n", - version->cfd_->GetName().c_str(), total_size, + version->cfd()->GetName().c_str(), total_size, ioptions_.compaction_options_fifo.max_table_files_size); return nullptr; } @@ -927,15 +930,14 @@ Compaction* FIFOCompactionPicker::PickCompaction( LogToBuffer(log_buffer, "[%s] FIFO compaction: Already executing compaction. No need " "to run parallel compactions since compactions are very fast", - version->cfd_->GetName().c_str()); + version->cfd()->GetName().c_str()); return nullptr; } Compaction* c = new Compaction(version, 0, 0, 0, 0, 0, kNoCompression, false, true /* is deletion compaction */); // delete old files (FIFO) - for (auto ritr = version->files_[0].rbegin(); - ritr != version->files_[0].rend(); ++ritr) { + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { auto f = *ritr; total_size -= f->compensated_file_size; c->inputs_[0].files.push_back(f); @@ -943,7 +945,8 @@ Compaction* FIFOCompactionPicker::PickCompaction( AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64 " with size %s for deletion", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), tmp_fsize); + version->cfd()->GetName().c_str(), f->fd.GetNumber(), + tmp_fsize); if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) { break; } diff --git a/db/version_set.cc b/db/version_set.cc index 88f66ad51..b121424d7 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -991,8 +991,8 @@ void Version::UpdateFilesBySize() { temp[i].file = files[i]; } - // sort the top number_of_files_to_sort_ based on file size - size_t num = Version::number_of_files_to_sort_; + // sort the top kNumberFilesToSort based on file size + size_t num = Version::kNumberFilesToSort; if (num > temp.size()) { num = temp.size(); } diff --git a/db/version_set.h b/db/version_set.h index 2c5b3a8a7..95e38fca4 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -130,6 +130,12 @@ class Version { // See field declaration int MaxCompactionScoreLevel() const { return max_compaction_score_level_; } + // Return level number that has idx'th highest score + int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; } + + // Return idx'th highest score + double CompactionScore(int idx) const { return compaction_score_[idx]; } + void GetOverlappingInputs( int level, const InternalKey* begin, // nullptr means before all keys @@ -251,6 +257,12 @@ class Version { return files_[level]; } + // REQUIRES: This version has been saved (see VersionSet::SaveTo) + const std::vector& FilesBySize(int level) const { + assert(finalized_); + return files_by_size_[level]; + } + // REQUIRES: lock is held // Set the index that is used to offset into files_by_size_ to find // the next compaction candidate file. @@ -263,14 +275,18 @@ class Version { return next_file_to_compact_by_size_[level]; } + // Only the first few entries of files_by_size_ are sorted. + // There is no need to sort all the files because it is likely + // that on a running system, we need to look at only the first + // few largest files because a new version is created every few + // seconds/minutes (because of concurrent compactions). + static const size_t kNumberFilesToSort = 50; + private: friend class VersionSet; friend class DBImpl; friend class CompactedDBImpl; friend class ColumnFamilyData; - friend class LevelCompactionPicker; - friend class UniversalCompactionPicker; - friend class FIFOCompactionPicker; friend class ForwardIterator; friend class InternalStats; @@ -332,13 +348,6 @@ class Version { // file that is not yet compacted std::vector next_file_to_compact_by_size_; - // Only the first few entries of files_by_size_ are sorted. - // There is no need to sort all the files because it is likely - // that on a running system, we need to look at only the first - // few largest files because a new version is created every few - // seconds/minutes (because of concurrent compactions). - static const size_t number_of_files_to_sort_ = 50; - // Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields // are initialized by Finalize().