From 4a206e228f31e9020a5e4aa6360fd7169a33002c Mon Sep 17 00:00:00 2001 From: anand76 Date: Fri, 6 May 2022 19:38:27 -0700 Subject: [PATCH] Refactor Version::MultiGet code into smaller reusable functions --- db/version_set.cc | 410 ++++++++++++++++++++++++---------------------- db/version_set.h | 7 + 2 files changed, 224 insertions(+), 193 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index b21761c5b..a75fba1d6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -504,68 +504,63 @@ class FilePickerMultiGet { return file_hit; } - FdWithKeyRange* GetNextFile() { - while (!search_ended_) { - // Start searching next level. - if (batch_iter_ == current_level_range_.end()) { - search_ended_ = !PrepareNextLevel(); - continue; - } else { - if (maybe_repeat_key_) { - maybe_repeat_key_ = false; - // Check if we found the final value for the last key in the - // previous lookup range. If we did, then there's no need to look - // any further for that key, so advance batch_iter_. Else, keep - // batch_iter_ positioned on that key so we look it up again in - // the next file - // For L0, always advance the key because we will look in the next - // file regardless for all keys not found yet - if (current_level_range_.CheckKeyDone(batch_iter_) || - curr_level_ == 0) { - batch_iter_ = upper_key_; - } - } - // batch_iter_prev_ will become the start key for the next file - // lookup - batch_iter_prev_ = batch_iter_; - } + void PrepareNextLevelForSearch() { search_ended_ = !PrepareNextLevel(); } - MultiGetRange next_file_range(current_level_range_, batch_iter_prev_, - current_level_range_.end()); - size_t curr_file_index = - (batch_iter_ != current_level_range_.end()) - ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level - : curr_file_level_->num_files; - FdWithKeyRange* f; - bool is_last_key_in_file; - if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f, - &is_last_key_in_file)) { - search_ended_ = !PrepareNextLevel(); - } else { - if (is_last_key_in_file) { - // Since cmp_largest is 0, batch_iter_ still points to the last key - // that falls in this file, instead of the next one. Increment - // the file index for all keys between batch_iter_ and upper_key_ - auto tmp_iter = batch_iter_; - while (tmp_iter != upper_key_) { - ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level); - ++tmp_iter; - } - maybe_repeat_key_ = true; + FdWithKeyRange* GetNextFileInLevel() { + if (batch_iter_ == current_level_range_.end() || search_ended_) { + return nullptr; + } else { + if (maybe_repeat_key_) { + maybe_repeat_key_ = false; + // Check if we found the final value for the last key in the + // previous lookup range. If we did, then there's no need to look + // any further for that key, so advance batch_iter_. Else, keep + // batch_iter_ positioned on that key so we look it up again in + // the next file + // For L0, always advance the key because we will look in the next + // file regardless for all keys not found yet + if (current_level_range_.CheckKeyDone(batch_iter_) || + curr_level_ == 0) { + batch_iter_ = upper_key_; } - // Set the range for this file - current_file_range_ = - MultiGetRange(next_file_range, batch_iter_prev_, upper_key_); - returned_file_level_ = curr_level_; - hit_file_level_ = curr_level_; - is_hit_file_last_in_level_ = - curr_file_index == curr_file_level_->num_files - 1; - return f; } + // batch_iter_prev_ will become the start key for the next file + // lookup + batch_iter_prev_ = batch_iter_; } - // Search ended - return nullptr; + MultiGetRange next_file_range(current_level_range_, batch_iter_prev_, + current_level_range_.end()); + size_t curr_file_index = + (batch_iter_ != current_level_range_.end()) + ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level + : curr_file_level_->num_files; + FdWithKeyRange* f; + bool is_last_key_in_file; + if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f, + &is_last_key_in_file)) { + return nullptr; + } else { + if (is_last_key_in_file) { + // Since cmp_largest is 0, batch_iter_ still points to the last key + // that falls in this file, instead of the next one. Increment + // the file index for all keys between batch_iter_ and upper_key_ + auto tmp_iter = batch_iter_; + while (tmp_iter != upper_key_) { + ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level); + ++tmp_iter; + } + maybe_repeat_key_ = true; + } + // Set the range for this file + current_file_range_ = + MultiGetRange(next_file_range, batch_iter_prev_, upper_key_); + returned_file_level_ = curr_level_; + hit_file_level_ = curr_level_; + is_hit_file_last_in_level_ = + curr_file_index == curr_file_level_->num_files - 1; + return f; + } } // getter for current file level @@ -576,6 +571,10 @@ class FilePickerMultiGet { // GetNextFile()) is at the last index in its level. bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } + bool KeyMaySpanNextFile() { return maybe_repeat_key_; } + + bool IsSearchEnded() { return search_ended_; } + const MultiGetRange& CurrentFileRange() { return current_file_range_; } private: @@ -2138,6 +2137,147 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } } +// Lookup a batch of keys in a single SST file +Status Version::MultiGetFromSST( + const ReadOptions& read_options, MultiGetRange file_range, + int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f, + std::unordered_map& blob_rqs, + uint64_t& num_filter_read, uint64_t& num_index_read, + uint64_t& num_data_read, uint64_t& num_sst_read) { + bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && + get_perf_context()->per_level_perf_context_enabled; + + Status s; + StopWatchNano timer(clock_, timer_enabled /* auto_start */); + s = table_cache_->MultiGet( + read_options, *internal_comparator(), *f->file_metadata, &file_range, + mutable_cf_options_.prefix_extractor, + cfd_->internal_stats()->GetFileReadHist(hit_file_level), + IsFilterSkipped(static_cast(hit_file_level), + is_hit_file_last_in_level), + hit_file_level); + // TODO: examine the behavior for corrupted key + if (timer_enabled) { + PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), + hit_file_level); + } + if (!s.ok()) { + // TODO: Set status for individual keys appropriately + for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { + *iter->s = s; + file_range.MarkKeyDone(iter); + } + return s; + } + uint64_t batch_size = 0; + for (auto iter = file_range.begin(); s.ok() && iter != file_range.end(); + ++iter) { + GetContext& get_context = *iter->get_context; + Status* status = iter->s; + // The Status in the KeyContext takes precedence over GetContext state + // Status may be an error if there were any IO errors in the table + // reader. We never expect Status to be NotFound(), as that is + // determined by get_context + assert(!status->IsNotFound()); + if (!status->ok()) { + file_range.MarkKeyDone(iter); + continue; + } + + if (get_context.sample()) { + sample_file_read_inc(f->file_metadata); + } + batch_size++; + num_index_read += get_context.get_context_stats_.num_index_read; + num_filter_read += get_context.get_context_stats_.num_filter_read; + num_data_read += get_context.get_context_stats_.num_data_read; + num_sst_read += get_context.get_context_stats_.num_sst_read; + // Reset these stats since they're specific to a level + get_context.get_context_stats_.num_index_read = 0; + get_context.get_context_stats_.num_filter_read = 0; + get_context.get_context_stats_.num_data_read = 0; + get_context.get_context_stats_.num_sst_read = 0; + + // report the counters before returning + if (get_context.State() != GetContext::kNotFound && + get_context.State() != GetContext::kMerge && + db_statistics_ != nullptr) { + get_context.ReportCounters(); + } else { + if (iter->max_covering_tombstone_seq > 0) { + // The remaining files we look at will only contain covered keys, so + // we stop here for this key + file_range.SkipKey(iter); + } + } + switch (get_context.State()) { + case GetContext::kNotFound: + // Keep searching in other files + break; + case GetContext::kMerge: + // TODO: update per-level perfcontext user_key_return_count for kMerge + break; + case GetContext::kFound: + if (hit_file_level == 0) { + RecordTick(db_statistics_, GET_HIT_L0); + } else if (hit_file_level == 1) { + RecordTick(db_statistics_, GET_HIT_L1); + } else if (hit_file_level >= 2) { + RecordTick(db_statistics_, GET_HIT_L2_AND_UP); + } + + PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level); + + file_range.MarkKeyDone(iter); + + if (iter->is_blob_index) { + if (iter->value) { + TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex", + &(*iter)); + + const Slice& blob_index_slice = *(iter->value); + BlobIndex blob_index; + Status tmp_s = blob_index.DecodeFrom(blob_index_slice); + if (tmp_s.ok()) { + const uint64_t blob_file_num = blob_index.file_number(); + blob_rqs[blob_file_num].emplace_back( + std::make_pair(blob_index, std::cref(*iter))); + } else { + *(iter->s) = tmp_s; + } + } + } else { + file_range.AddValueSize(iter->value->size()); + if (file_range.GetValueSize() > read_options.value_size_soft_limit) { + s = Status::Aborted(); + break; + } + } + continue; + case GetContext::kDeleted: + // Use empty error message for speed + *status = Status::NotFound(); + file_range.MarkKeyDone(iter); + continue; + case GetContext::kCorrupt: + *status = + Status::Corruption("corrupted key for ", iter->lkey->user_key()); + file_range.MarkKeyDone(iter); + continue; + case GetContext::kUnexpectedBlobIndex: + ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); + *status = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); + file_range.MarkKeyDone(iter); + continue; + } + } + + RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size); + return s; +} + void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, ReadCallback* callback) { PinnedIteratorsManager pinned_iters_mgr; @@ -2183,7 +2323,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, &file_picker_range, &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_, user_comparator(), internal_comparator()); - FdWithKeyRange* f = fp.GetNextFile(); + FdWithKeyRange* f = fp.GetNextFileInLevel(); Status s; uint64_t num_index_read = 0; uint64_t num_filter_read = 0; @@ -2193,14 +2333,9 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end()); // blob_file => [[blob_idx, it], ...] std::unordered_map blob_rqs; - int level = -1; - - while (f != nullptr) { - MultiGetRange file_range = fp.CurrentFileRange(); - bool timer_enabled = - GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && - get_perf_context()->per_level_perf_context_enabled; + int level = fp.GetHitFileLevel(); + while (!fp.IsSearchEnded()) { // Report MultiGet stats per level. if (level >= 0 && level != (int)fp.GetHitFileLevel()) { // Dump the stats if the search has moved to the next level and @@ -2218,139 +2353,28 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, level = fp.GetHitFileLevel(); } - StopWatchNano timer(clock_, timer_enabled /* auto_start */); - s = table_cache_->MultiGet( - read_options, *internal_comparator(), *f->file_metadata, &file_range, - mutable_cf_options_.prefix_extractor, - cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), - IsFilterSkipped(static_cast(fp.GetHitFileLevel()), - fp.IsHitFileLastInLevel()), - fp.GetHitFileLevel()); - // TODO: examine the behavior for corrupted key - if (timer_enabled) { - PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), - fp.GetHitFileLevel()); - } - if (!s.ok()) { - // TODO: Set status for individual keys appropriately - for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { - *iter->s = s; - file_range.MarkKeyDone(iter); - } - return; - } - uint64_t batch_size = 0; - for (auto iter = file_range.begin(); s.ok() && iter != file_range.end(); - ++iter) { - GetContext& get_context = *iter->get_context; - Status* status = iter->s; - // The Status in the KeyContext takes precedence over GetContext state - // Status may be an error if there were any IO errors in the table - // reader. We never expect Status to be NotFound(), as that is - // determined by get_context - assert(!status->IsNotFound()); - if (!status->ok()) { - file_range.MarkKeyDone(iter); - continue; - } - - if (get_context.sample()) { - sample_file_read_inc(f->file_metadata); - } - batch_size++; - num_index_read += get_context.get_context_stats_.num_index_read; - num_filter_read += get_context.get_context_stats_.num_filter_read; - num_data_read += get_context.get_context_stats_.num_data_read; - num_sst_read += get_context.get_context_stats_.num_sst_read; - // Reset these stats since they're specific to a level - get_context.get_context_stats_.num_index_read = 0; - get_context.get_context_stats_.num_filter_read = 0; - get_context.get_context_stats_.num_data_read = 0; - get_context.get_context_stats_.num_sst_read = 0; - - // report the counters before returning - if (get_context.State() != GetContext::kNotFound && - get_context.State() != GetContext::kMerge && - db_statistics_ != nullptr) { - get_context.ReportCounters(); - } else { - if (iter->max_covering_tombstone_seq > 0) { - // The remaining files we look at will only contain covered keys, so - // we stop here for this key - file_picker_range.SkipKey(iter); - } - } - switch (get_context.State()) { - case GetContext::kNotFound: - // Keep searching in other files - break; - case GetContext::kMerge: - // TODO: update per-level perfcontext user_key_return_count for kMerge - break; - case GetContext::kFound: - if (fp.GetHitFileLevel() == 0) { - RecordTick(db_statistics_, GET_HIT_L0); - } else if (fp.GetHitFileLevel() == 1) { - RecordTick(db_statistics_, GET_HIT_L1); - } else if (fp.GetHitFileLevel() >= 2) { - RecordTick(db_statistics_, GET_HIT_L2_AND_UP); - } - - PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, - fp.GetHitFileLevel()); - - file_range.MarkKeyDone(iter); - - if (iter->is_blob_index) { - if (iter->value) { - TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex", - &(*iter)); - - const Slice& blob_index_slice = *(iter->value); - BlobIndex blob_index; - Status tmp_s = blob_index.DecodeFrom(blob_index_slice); - if (tmp_s.ok()) { - const uint64_t blob_file_num = blob_index.file_number(); - blob_rqs[blob_file_num].emplace_back( - std::make_pair(blob_index, std::cref(*iter))); - } else { - *(iter->s) = tmp_s; - } - } - } else { - file_range.AddValueSize(iter->value->size()); - if (file_range.GetValueSize() > - read_options.value_size_soft_limit) { - s = Status::Aborted(); - break; - } - } - continue; - case GetContext::kDeleted: - // Use empty error message for speed - *status = Status::NotFound(); - file_range.MarkKeyDone(iter); - continue; - case GetContext::kCorrupt: - *status = - Status::Corruption("corrupted key for ", iter->lkey->user_key()); - file_range.MarkKeyDone(iter); - continue; - case GetContext::kUnexpectedBlobIndex: - ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); - *status = Status::NotSupported( - "Encounter unexpected blob index. Please open DB with " - "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); - file_range.MarkKeyDone(iter); - continue; + if (f) { + // Call MultiGetFromSST for looking up a single file + s = MultiGetFromSST(read_options, fp.CurrentFileRange(), + fp.GetHitFileLevel(), fp.IsHitFileLastInLevel(), f, + blob_rqs, num_filter_read, num_index_read, + num_data_read, num_sst_read); + if (s.ok()) { + f = fp.GetNextFileInLevel(); } } - - RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size); + // If bad status or we found final result for all the keys if (!s.ok() || file_picker_range.empty()) { break; } - f = fp.GetNextFile(); + if (!f) { + // Reached the end of this level. Prepare the next level + fp.PrepareNextLevelForSearch(); + if (!fp.IsSearchEnded()) { + // Its possible there is no overlap on this level and f is nullptr + f = fp.GetNextFileInLevel(); + } + } } // Dump stats for most recent level diff --git a/db/version_set.h b/db/version_set.h index 5afd1202f..8a646dfea 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -882,6 +882,13 @@ class Version { // This accumulated stats will be used in compaction. void UpdateAccumulatedStats(); + Status MultiGetFromSST( + const ReadOptions& read_options, MultiGetRange file_range, + int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f, + std::unordered_map& blob_rqs, + uint64_t& num_filter_read, uint64_t& num_index_read, + uint64_t& num_data_read, uint64_t& num_sst_read); + ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs Logger* info_log_; Statistics* db_statistics_;