// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/version_builder.h" #include #include #include #include #include #include #include #include #include #include #include #include "db/dbformat.h" #include "db/internal_stats.h" #include "db/table_cache.h" #include "db/version_set.h" #include "port/port.h" #include "table/table_reader.h" #include "util/string_util.h" namespace rocksdb { bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { if (a->fd.largest_seqno != b->fd.largest_seqno) { return a->fd.largest_seqno > b->fd.largest_seqno; } if (a->fd.smallest_seqno != b->fd.smallest_seqno) { return a->fd.smallest_seqno > b->fd.smallest_seqno; } // Break ties by file number return a->fd.GetNumber() > b->fd.GetNumber(); } namespace { bool BySmallestKey(FileMetaData* a, FileMetaData* b, const InternalKeyComparator* cmp) { int r = cmp->Compare(a->smallest, b->smallest); if (r != 0) { return (r < 0); } // Break ties by file number return (a->fd.GetNumber() < b->fd.GetNumber()); } } // namespace class VersionBuilder::Rep { private: // Helper to sort files_ in v // kLevel0 -- NewestFirstBySeqNo // kLevelNon0 -- BySmallestKey struct FileComparator { enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method; const InternalKeyComparator* internal_comparator; FileComparator() : internal_comparator(nullptr) {} bool operator()(FileMetaData* f1, FileMetaData* f2) const { switch (sort_method) { case kLevel0: return NewestFirstBySeqNo(f1, f2); case kLevelNon0: return BySmallestKey(f1, f2, internal_comparator); } assert(false); return false; } }; struct LevelState { std::unordered_set deleted_files; // Map from file number to file meta data. std::unordered_map added_files; }; const EnvOptions& env_options_; Logger* info_log_; TableCache* table_cache_; VersionStorageInfo* base_vstorage_; int num_levels_; LevelState* levels_; // Store states of levels larger than num_levels_. We do this instead of // storing them in levels_ to avoid regression in case there are no files // on invalid levels. The version is not consistent if in the end the files // on invalid levels don't cancel out. std::map> invalid_levels_; // Whether there are invalid new files or invalid deletion on levels larger // than num_levels_. bool has_invalid_levels_; FileComparator level_zero_cmp_; FileComparator level_nonzero_cmp_; public: Rep(const EnvOptions& env_options, Logger* info_log, TableCache* table_cache, VersionStorageInfo* base_vstorage) : env_options_(env_options), info_log_(info_log), table_cache_(table_cache), base_vstorage_(base_vstorage), num_levels_(base_vstorage->num_levels()), has_invalid_levels_(false) { levels_ = new LevelState[num_levels_]; level_zero_cmp_.sort_method = FileComparator::kLevel0; level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0; level_nonzero_cmp_.internal_comparator = base_vstorage_->InternalComparator(); } ~Rep() { for (int level = 0; level < num_levels_; level++) { const auto& added = levels_[level].added_files; for (auto& pair : added) { UnrefFile(pair.second); } } delete[] levels_; } void UnrefFile(FileMetaData* f) { f->refs--; if (f->refs <= 0) { if (f->table_reader_handle) { assert(table_cache_ != nullptr); table_cache_->ReleaseHandle(f->table_reader_handle); f->table_reader_handle = nullptr; } delete f; } } Status CheckConsistency(VersionStorageInfo* vstorage) { #ifdef NDEBUG if (!vstorage->force_consistency_checks()) { // Dont run consistency checks in release mode except if // explicitly asked to return Status::OK(); } #endif // make sure the files are sorted correctly for (int level = 0; level < num_levels_; level++) { auto& level_files = vstorage->LevelFiles(level); for (size_t i = 1; i < level_files.size(); i++) { auto f1 = level_files[i - 1]; auto f2 = level_files[i]; #ifndef NDEBUG auto pair = std::make_pair(&f1, &f2); TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency", &pair); #endif if (level == 0) { if (!level_zero_cmp_(f1, f2)) { fprintf(stderr, "L0 files are not sorted properly"); return Status::Corruption("L0 files are not sorted properly"); } if (f2->fd.smallest_seqno == f2->fd.largest_seqno) { // This is an external file that we ingested SequenceNumber external_file_seqno = f2->fd.smallest_seqno; if (!(external_file_seqno < f1->fd.largest_seqno || external_file_seqno == 0)) { fprintf(stderr, "L0 file with seqno %" PRIu64 " %" PRIu64 " vs. file with global_seqno %" PRIu64 "\n", f1->fd.smallest_seqno, f1->fd.largest_seqno, external_file_seqno); return Status::Corruption( "L0 file with seqno " + NumberToString(f1->fd.smallest_seqno) + " " + NumberToString(f1->fd.largest_seqno) + " vs. file with global_seqno" + NumberToString(external_file_seqno) + " with fileNumber " + NumberToString(f1->fd.GetNumber())); } } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) { fprintf(stderr, "L0 files seqno %" PRIu64 " %" PRIu64 " vs. %" PRIu64 " %" PRIu64 "\n", f1->fd.smallest_seqno, f1->fd.largest_seqno, f2->fd.smallest_seqno, f2->fd.largest_seqno); return Status::Corruption( "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) + " " + NumberToString(f1->fd.largest_seqno) + " " + NumberToString(f1->fd.GetNumber()) + " vs. " + NumberToString(f2->fd.smallest_seqno) + " " + NumberToString(f2->fd.largest_seqno) + " " + NumberToString(f2->fd.GetNumber())); } } else { if (!level_nonzero_cmp_(f1, f2)) { fprintf(stderr, "L%d files are not sorted properly", level); return Status::Corruption("L" + NumberToString(level) + " files are not sorted properly"); } // Make sure there is no overlap in levels > 0 if (vstorage->InternalComparator()->Compare(f1->largest, f2->smallest) >= 0) { fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level, (f1->largest).DebugString(true).c_str(), (f2->smallest).DebugString(true).c_str()); return Status::Corruption( "L" + NumberToString(level) + " have overlapping ranges " + (f1->largest).DebugString(true) + " vs. " + (f2->smallest).DebugString(true)); } } } } return Status::OK(); } Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number, int level) { #ifdef NDEBUG if (!base_vstorage_->force_consistency_checks()) { // Dont run consistency checks in release mode except if // explicitly asked to return Status::OK(); } #endif // a file to be deleted better exist in the previous version bool found = false; for (int l = 0; !found && l < num_levels_; l++) { const std::vector& base_files = base_vstorage_->LevelFiles(l); for (size_t i = 0; i < base_files.size(); i++) { FileMetaData* f = base_files[i]; if (f->fd.GetNumber() == number) { found = true; break; } } } // if the file did not exist in the previous version, then it // is possibly moved from lower level to higher level in current // version for (int l = level + 1; !found && l < num_levels_; l++) { auto& level_added = levels_[l].added_files; auto got = level_added.find(number); if (got != level_added.end()) { found = true; break; } } // maybe this file was added in a previous edit that was Applied if (!found) { auto& level_added = levels_[level].added_files; auto got = level_added.find(number); if (got != level_added.end()) { found = true; } } if (!found) { fprintf(stderr, "not found %" PRIu64 "\n", number); return Status::Corruption("not found " + NumberToString(number)); } return Status::OK(); } bool CheckConsistencyForNumLevels() { // Make sure there are no files on or beyond num_levels(). if (has_invalid_levels_) { return false; } for (auto& level : invalid_levels_) { if (level.second.size() > 0) { return false; } } return true; } // Apply all of the edits in *edit to the current state. Status Apply(VersionEdit* edit) { Status s = CheckConsistency(base_vstorage_); if (!s.ok()) { return s; } // Delete files const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles(); for (const auto& del_file : del) { const auto level = del_file.first; const auto number = del_file.second; if (level < num_levels_) { levels_[level].deleted_files.insert(number); CheckConsistencyForDeletes(edit, number, level); auto exising = levels_[level].added_files.find(number); if (exising != levels_[level].added_files.end()) { UnrefFile(exising->second); levels_[level].added_files.erase(exising); } } else { auto exising = invalid_levels_[level].find(number); if (exising != invalid_levels_[level].end()) { invalid_levels_[level].erase(exising); } else { // Deleting an non-existing file on invalid level. has_invalid_levels_ = true; } } } // Add new files for (const auto& new_file : edit->GetNewFiles()) { const int level = new_file.first; if (level < num_levels_) { FileMetaData* f = new FileMetaData(new_file.second); f->refs = 1; assert(levels_[level].added_files.find(f->fd.GetNumber()) == levels_[level].added_files.end()); levels_[level].deleted_files.erase(f->fd.GetNumber()); levels_[level].added_files[f->fd.GetNumber()] = f; } else { uint64_t number = new_file.second.fd.GetNumber(); if (invalid_levels_[level].count(number) == 0) { invalid_levels_[level].insert(number); } else { // Creating an already existing file on invalid level. has_invalid_levels_ = true; } } } return s; } // Save the current state in *v. Status SaveTo(VersionStorageInfo* vstorage) { Status s = CheckConsistency(base_vstorage_); if (!s.ok()) { return s; } s = CheckConsistency(vstorage); if (!s.ok()) { return s; } for (int level = 0; level < num_levels_; level++) { const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_; // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. const auto& base_files = base_vstorage_->LevelFiles(level); const auto& unordered_added_files = levels_[level].added_files; vstorage->Reserve(level, base_files.size() + unordered_added_files.size()); // Sort added files for the level. std::vector added_files; added_files.reserve(unordered_added_files.size()); for (const auto& pair : unordered_added_files) { added_files.push_back(pair.second); } std::sort(added_files.begin(), added_files.end(), cmp); #ifndef NDEBUG FileMetaData* prev_added_file = nullptr; for (const auto& added : added_files) { if (level > 0 && prev_added_file != nullptr) { assert(base_vstorage_->InternalComparator()->Compare( prev_added_file->smallest, added->smallest) <= 0); } prev_added_file = added; } #endif auto base_iter = base_files.begin(); auto base_end = base_files.end(); auto added_iter = added_files.begin(); auto added_end = added_files.end(); while (added_iter != added_end || base_iter != base_end) { if (base_iter == base_end || (added_iter != added_end && cmp(*added_iter, *base_iter))) { MaybeAddFile(vstorage, level, *added_iter++); } else { MaybeAddFile(vstorage, level, *base_iter++); } } } s = CheckConsistency(vstorage); return s; } Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, bool prefetch_index_and_filter_in_cache, bool is_initial_load, const SliceTransform* prefix_extractor) { assert(table_cache_ != nullptr); size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity(); bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity); size_t max_load = port::kMaxSizet; if (!always_load) { // If it is initial loading and not set to always laoding all the // files, we only load up to kInitialLoadLimit files, to limit the // time reopening the DB. const size_t kInitialLoadLimit = 16; size_t load_limit; // If the table cache is not 1/4 full, we pin the table handle to // file metadata to avoid the cache read costs when reading the file. // The downside of pinning those files is that LRU won't be followed // for those files. This doesn't matter much because if number of files // of the DB excceeds table cache capacity, eventually no table reader // will be pinned and LRU will be followed. if (is_initial_load) { load_limit = std::min(kInitialLoadLimit, table_cache_capacity / 4); } else { load_limit = table_cache_capacity / 4; } size_t table_cache_usage = table_cache_->get_cache()->GetUsage(); if (table_cache_usage >= load_limit) { // TODO (yanqin) find a suitable status code. return Status::OK(); } else { max_load = load_limit - table_cache_usage; } } // std::vector> files_meta; std::vector statuses; for (int level = 0; level < num_levels_; level++) { for (auto& file_meta_pair : levels_[level].added_files) { auto* file_meta = file_meta_pair.second; // If the file has been opened before, just skip it. if (!file_meta->table_reader_handle) { files_meta.emplace_back(file_meta, level); statuses.emplace_back(Status::OK()); } if (files_meta.size() >= max_load) { break; } } if (files_meta.size() >= max_load) { break; } } std::atomic next_file_meta_idx(0); std::function load_handlers_func([&]() { while (true) { size_t file_idx = next_file_meta_idx.fetch_add(1); if (file_idx >= files_meta.size()) { break; } auto* file_meta = files_meta[file_idx].first; int level = files_meta[file_idx].second; statuses[file_idx] = table_cache_->FindTable( env_options_, *(base_vstorage_->InternalComparator()), file_meta->fd, &file_meta->table_reader_handle, prefix_extractor, false /*no_io */, true /* record_read_stats */, internal_stats->GetFileReadHist(level), false, level, prefetch_index_and_filter_in_cache); if (file_meta->table_reader_handle != nullptr) { // Load table_reader file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle( file_meta->table_reader_handle); } } }); std::vector threads; for (int i = 1; i < max_threads; i++) { threads.emplace_back(load_handlers_func); } load_handlers_func(); for (auto& t : threads) { t.join(); } for (const auto& s : statuses) { if (!s.ok()) { return s; } } return Status::OK(); } void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) { if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) { // f is to-be-deleted table file vstorage->RemoveCurrentStats(f); } else { vstorage->AddFile(level, f, info_log_); } } }; VersionBuilder::VersionBuilder(const EnvOptions& env_options, TableCache* table_cache, VersionStorageInfo* base_vstorage, Logger* info_log) : rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {} VersionBuilder::~VersionBuilder() { delete rep_; } Status VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) { return rep_->CheckConsistency(vstorage); } Status VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, int level) { return rep_->CheckConsistencyForDeletes(edit, number, level); } bool VersionBuilder::CheckConsistencyForNumLevels() { return rep_->CheckConsistencyForNumLevels(); } Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); } Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { return rep_->SaveTo(vstorage); } Status VersionBuilder::LoadTableHandlers( InternalStats* internal_stats, int max_threads, bool prefetch_index_and_filter_in_cache, bool is_initial_load, const SliceTransform* prefix_extractor) { return rep_->LoadTableHandlers(internal_stats, max_threads, prefetch_index_and_filter_in_cache, is_initial_load, prefix_extractor); } void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) { rep_->MaybeAddFile(vstorage, level, f); } } // namespace rocksdb