Refactor Version::MultiGet code into smaller reusable functions
This commit is contained in:
parent
c4cd8e1acc
commit
4a206e228f
@ -504,68 +504,63 @@ class FilePickerMultiGet {
|
|||||||
return file_hit;
|
return file_hit;
|
||||||
}
|
}
|
||||||
|
|
||||||
FdWithKeyRange* GetNextFile() {
|
void PrepareNextLevelForSearch() { search_ended_ = !PrepareNextLevel(); }
|
||||||
while (!search_ended_) {
|
|
||||||
// Start searching next level.
|
|
||||||
if (batch_iter_ == current_level_range_.end()) {
|
|
||||||
search_ended_ = !PrepareNextLevel();
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
if (maybe_repeat_key_) {
|
|
||||||
maybe_repeat_key_ = false;
|
|
||||||
// Check if we found the final value for the last key in the
|
|
||||||
// previous lookup range. If we did, then there's no need to look
|
|
||||||
// any further for that key, so advance batch_iter_. Else, keep
|
|
||||||
// batch_iter_ positioned on that key so we look it up again in
|
|
||||||
// the next file
|
|
||||||
// For L0, always advance the key because we will look in the next
|
|
||||||
// file regardless for all keys not found yet
|
|
||||||
if (current_level_range_.CheckKeyDone(batch_iter_) ||
|
|
||||||
curr_level_ == 0) {
|
|
||||||
batch_iter_ = upper_key_;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// batch_iter_prev_ will become the start key for the next file
|
|
||||||
// lookup
|
|
||||||
batch_iter_prev_ = batch_iter_;
|
|
||||||
}
|
|
||||||
|
|
||||||
MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
|
FdWithKeyRange* GetNextFileInLevel() {
|
||||||
current_level_range_.end());
|
if (batch_iter_ == current_level_range_.end() || search_ended_) {
|
||||||
size_t curr_file_index =
|
return nullptr;
|
||||||
(batch_iter_ != current_level_range_.end())
|
} else {
|
||||||
? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
|
if (maybe_repeat_key_) {
|
||||||
: curr_file_level_->num_files;
|
maybe_repeat_key_ = false;
|
||||||
FdWithKeyRange* f;
|
// Check if we found the final value for the last key in the
|
||||||
bool is_last_key_in_file;
|
// previous lookup range. If we did, then there's no need to look
|
||||||
if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
|
// any further for that key, so advance batch_iter_. Else, keep
|
||||||
&is_last_key_in_file)) {
|
// batch_iter_ positioned on that key so we look it up again in
|
||||||
search_ended_ = !PrepareNextLevel();
|
// the next file
|
||||||
} else {
|
// For L0, always advance the key because we will look in the next
|
||||||
if (is_last_key_in_file) {
|
// file regardless for all keys not found yet
|
||||||
// Since cmp_largest is 0, batch_iter_ still points to the last key
|
if (current_level_range_.CheckKeyDone(batch_iter_) ||
|
||||||
// that falls in this file, instead of the next one. Increment
|
curr_level_ == 0) {
|
||||||
// the file index for all keys between batch_iter_ and upper_key_
|
batch_iter_ = upper_key_;
|
||||||
auto tmp_iter = batch_iter_;
|
|
||||||
while (tmp_iter != upper_key_) {
|
|
||||||
++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level);
|
|
||||||
++tmp_iter;
|
|
||||||
}
|
|
||||||
maybe_repeat_key_ = true;
|
|
||||||
}
|
}
|
||||||
// Set the range for this file
|
|
||||||
current_file_range_ =
|
|
||||||
MultiGetRange(next_file_range, batch_iter_prev_, upper_key_);
|
|
||||||
returned_file_level_ = curr_level_;
|
|
||||||
hit_file_level_ = curr_level_;
|
|
||||||
is_hit_file_last_in_level_ =
|
|
||||||
curr_file_index == curr_file_level_->num_files - 1;
|
|
||||||
return f;
|
|
||||||
}
|
}
|
||||||
|
// batch_iter_prev_ will become the start key for the next file
|
||||||
|
// lookup
|
||||||
|
batch_iter_prev_ = batch_iter_;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Search ended
|
MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
|
||||||
return nullptr;
|
current_level_range_.end());
|
||||||
|
size_t curr_file_index =
|
||||||
|
(batch_iter_ != current_level_range_.end())
|
||||||
|
? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
|
||||||
|
: curr_file_level_->num_files;
|
||||||
|
FdWithKeyRange* f;
|
||||||
|
bool is_last_key_in_file;
|
||||||
|
if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
|
||||||
|
&is_last_key_in_file)) {
|
||||||
|
return nullptr;
|
||||||
|
} else {
|
||||||
|
if (is_last_key_in_file) {
|
||||||
|
// Since cmp_largest is 0, batch_iter_ still points to the last key
|
||||||
|
// that falls in this file, instead of the next one. Increment
|
||||||
|
// the file index for all keys between batch_iter_ and upper_key_
|
||||||
|
auto tmp_iter = batch_iter_;
|
||||||
|
while (tmp_iter != upper_key_) {
|
||||||
|
++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level);
|
||||||
|
++tmp_iter;
|
||||||
|
}
|
||||||
|
maybe_repeat_key_ = true;
|
||||||
|
}
|
||||||
|
// Set the range for this file
|
||||||
|
current_file_range_ =
|
||||||
|
MultiGetRange(next_file_range, batch_iter_prev_, upper_key_);
|
||||||
|
returned_file_level_ = curr_level_;
|
||||||
|
hit_file_level_ = curr_level_;
|
||||||
|
is_hit_file_last_in_level_ =
|
||||||
|
curr_file_index == curr_file_level_->num_files - 1;
|
||||||
|
return f;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getter for current file level
|
// getter for current file level
|
||||||
@ -576,6 +571,10 @@ class FilePickerMultiGet {
|
|||||||
// GetNextFile()) is at the last index in its level.
|
// GetNextFile()) is at the last index in its level.
|
||||||
bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
|
bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
|
||||||
|
|
||||||
|
bool KeyMaySpanNextFile() { return maybe_repeat_key_; }
|
||||||
|
|
||||||
|
bool IsSearchEnded() { return search_ended_; }
|
||||||
|
|
||||||
const MultiGetRange& CurrentFileRange() { return current_file_range_; }
|
const MultiGetRange& CurrentFileRange() { return current_file_range_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -2138,6 +2137,147 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Lookup a batch of keys in a single SST file
|
||||||
|
Status Version::MultiGetFromSST(
|
||||||
|
const ReadOptions& read_options, MultiGetRange file_range,
|
||||||
|
int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f,
|
||||||
|
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs,
|
||||||
|
uint64_t& num_filter_read, uint64_t& num_index_read,
|
||||||
|
uint64_t& num_data_read, uint64_t& num_sst_read) {
|
||||||
|
bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
|
||||||
|
get_perf_context()->per_level_perf_context_enabled;
|
||||||
|
|
||||||
|
Status s;
|
||||||
|
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
|
||||||
|
s = table_cache_->MultiGet(
|
||||||
|
read_options, *internal_comparator(), *f->file_metadata, &file_range,
|
||||||
|
mutable_cf_options_.prefix_extractor,
|
||||||
|
cfd_->internal_stats()->GetFileReadHist(hit_file_level),
|
||||||
|
IsFilterSkipped(static_cast<int>(hit_file_level),
|
||||||
|
is_hit_file_last_in_level),
|
||||||
|
hit_file_level);
|
||||||
|
// TODO: examine the behavior for corrupted key
|
||||||
|
if (timer_enabled) {
|
||||||
|
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
|
||||||
|
hit_file_level);
|
||||||
|
}
|
||||||
|
if (!s.ok()) {
|
||||||
|
// TODO: Set status for individual keys appropriately
|
||||||
|
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
|
||||||
|
*iter->s = s;
|
||||||
|
file_range.MarkKeyDone(iter);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
uint64_t batch_size = 0;
|
||||||
|
for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
|
||||||
|
++iter) {
|
||||||
|
GetContext& get_context = *iter->get_context;
|
||||||
|
Status* status = iter->s;
|
||||||
|
// The Status in the KeyContext takes precedence over GetContext state
|
||||||
|
// Status may be an error if there were any IO errors in the table
|
||||||
|
// reader. We never expect Status to be NotFound(), as that is
|
||||||
|
// determined by get_context
|
||||||
|
assert(!status->IsNotFound());
|
||||||
|
if (!status->ok()) {
|
||||||
|
file_range.MarkKeyDone(iter);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (get_context.sample()) {
|
||||||
|
sample_file_read_inc(f->file_metadata);
|
||||||
|
}
|
||||||
|
batch_size++;
|
||||||
|
num_index_read += get_context.get_context_stats_.num_index_read;
|
||||||
|
num_filter_read += get_context.get_context_stats_.num_filter_read;
|
||||||
|
num_data_read += get_context.get_context_stats_.num_data_read;
|
||||||
|
num_sst_read += get_context.get_context_stats_.num_sst_read;
|
||||||
|
// Reset these stats since they're specific to a level
|
||||||
|
get_context.get_context_stats_.num_index_read = 0;
|
||||||
|
get_context.get_context_stats_.num_filter_read = 0;
|
||||||
|
get_context.get_context_stats_.num_data_read = 0;
|
||||||
|
get_context.get_context_stats_.num_sst_read = 0;
|
||||||
|
|
||||||
|
// report the counters before returning
|
||||||
|
if (get_context.State() != GetContext::kNotFound &&
|
||||||
|
get_context.State() != GetContext::kMerge &&
|
||||||
|
db_statistics_ != nullptr) {
|
||||||
|
get_context.ReportCounters();
|
||||||
|
} else {
|
||||||
|
if (iter->max_covering_tombstone_seq > 0) {
|
||||||
|
// The remaining files we look at will only contain covered keys, so
|
||||||
|
// we stop here for this key
|
||||||
|
file_range.SkipKey(iter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
switch (get_context.State()) {
|
||||||
|
case GetContext::kNotFound:
|
||||||
|
// Keep searching in other files
|
||||||
|
break;
|
||||||
|
case GetContext::kMerge:
|
||||||
|
// TODO: update per-level perfcontext user_key_return_count for kMerge
|
||||||
|
break;
|
||||||
|
case GetContext::kFound:
|
||||||
|
if (hit_file_level == 0) {
|
||||||
|
RecordTick(db_statistics_, GET_HIT_L0);
|
||||||
|
} else if (hit_file_level == 1) {
|
||||||
|
RecordTick(db_statistics_, GET_HIT_L1);
|
||||||
|
} else if (hit_file_level >= 2) {
|
||||||
|
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
|
||||||
|
}
|
||||||
|
|
||||||
|
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level);
|
||||||
|
|
||||||
|
file_range.MarkKeyDone(iter);
|
||||||
|
|
||||||
|
if (iter->is_blob_index) {
|
||||||
|
if (iter->value) {
|
||||||
|
TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
|
||||||
|
&(*iter));
|
||||||
|
|
||||||
|
const Slice& blob_index_slice = *(iter->value);
|
||||||
|
BlobIndex blob_index;
|
||||||
|
Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
|
||||||
|
if (tmp_s.ok()) {
|
||||||
|
const uint64_t blob_file_num = blob_index.file_number();
|
||||||
|
blob_rqs[blob_file_num].emplace_back(
|
||||||
|
std::make_pair(blob_index, std::cref(*iter)));
|
||||||
|
} else {
|
||||||
|
*(iter->s) = tmp_s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
file_range.AddValueSize(iter->value->size());
|
||||||
|
if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
|
||||||
|
s = Status::Aborted();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
case GetContext::kDeleted:
|
||||||
|
// Use empty error message for speed
|
||||||
|
*status = Status::NotFound();
|
||||||
|
file_range.MarkKeyDone(iter);
|
||||||
|
continue;
|
||||||
|
case GetContext::kCorrupt:
|
||||||
|
*status =
|
||||||
|
Status::Corruption("corrupted key for ", iter->lkey->user_key());
|
||||||
|
file_range.MarkKeyDone(iter);
|
||||||
|
continue;
|
||||||
|
case GetContext::kUnexpectedBlobIndex:
|
||||||
|
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
|
||||||
|
*status = Status::NotSupported(
|
||||||
|
"Encounter unexpected blob index. Please open DB with "
|
||||||
|
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
|
||||||
|
file_range.MarkKeyDone(iter);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||||
ReadCallback* callback) {
|
ReadCallback* callback) {
|
||||||
PinnedIteratorsManager pinned_iters_mgr;
|
PinnedIteratorsManager pinned_iters_mgr;
|
||||||
@ -2183,7 +2323,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
|||||||
&file_picker_range,
|
&file_picker_range,
|
||||||
&storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
|
&storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
|
||||||
&storage_info_.file_indexer_, user_comparator(), internal_comparator());
|
&storage_info_.file_indexer_, user_comparator(), internal_comparator());
|
||||||
FdWithKeyRange* f = fp.GetNextFile();
|
FdWithKeyRange* f = fp.GetNextFileInLevel();
|
||||||
Status s;
|
Status s;
|
||||||
uint64_t num_index_read = 0;
|
uint64_t num_index_read = 0;
|
||||||
uint64_t num_filter_read = 0;
|
uint64_t num_filter_read = 0;
|
||||||
@ -2193,14 +2333,9 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
|||||||
MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
|
MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
|
||||||
// blob_file => [[blob_idx, it], ...]
|
// blob_file => [[blob_idx, it], ...]
|
||||||
std::unordered_map<uint64_t, BlobReadRequests> blob_rqs;
|
std::unordered_map<uint64_t, BlobReadRequests> blob_rqs;
|
||||||
int level = -1;
|
int level = fp.GetHitFileLevel();
|
||||||
|
|
||||||
while (f != nullptr) {
|
|
||||||
MultiGetRange file_range = fp.CurrentFileRange();
|
|
||||||
bool timer_enabled =
|
|
||||||
GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
|
|
||||||
get_perf_context()->per_level_perf_context_enabled;
|
|
||||||
|
|
||||||
|
while (!fp.IsSearchEnded()) {
|
||||||
// Report MultiGet stats per level.
|
// Report MultiGet stats per level.
|
||||||
if (level >= 0 && level != (int)fp.GetHitFileLevel()) {
|
if (level >= 0 && level != (int)fp.GetHitFileLevel()) {
|
||||||
// Dump the stats if the search has moved to the next level and
|
// Dump the stats if the search has moved to the next level and
|
||||||
@ -2218,139 +2353,28 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
|||||||
level = fp.GetHitFileLevel();
|
level = fp.GetHitFileLevel();
|
||||||
}
|
}
|
||||||
|
|
||||||
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
|
if (f) {
|
||||||
s = table_cache_->MultiGet(
|
// Call MultiGetFromSST for looking up a single file
|
||||||
read_options, *internal_comparator(), *f->file_metadata, &file_range,
|
s = MultiGetFromSST(read_options, fp.CurrentFileRange(),
|
||||||
mutable_cf_options_.prefix_extractor,
|
fp.GetHitFileLevel(), fp.IsHitFileLastInLevel(), f,
|
||||||
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
|
blob_rqs, num_filter_read, num_index_read,
|
||||||
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
|
num_data_read, num_sst_read);
|
||||||
fp.IsHitFileLastInLevel()),
|
if (s.ok()) {
|
||||||
fp.GetHitFileLevel());
|
f = fp.GetNextFileInLevel();
|
||||||
// TODO: examine the behavior for corrupted key
|
|
||||||
if (timer_enabled) {
|
|
||||||
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
|
|
||||||
fp.GetHitFileLevel());
|
|
||||||
}
|
|
||||||
if (!s.ok()) {
|
|
||||||
// TODO: Set status for individual keys appropriately
|
|
||||||
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
|
|
||||||
*iter->s = s;
|
|
||||||
file_range.MarkKeyDone(iter);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
uint64_t batch_size = 0;
|
|
||||||
for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
|
|
||||||
++iter) {
|
|
||||||
GetContext& get_context = *iter->get_context;
|
|
||||||
Status* status = iter->s;
|
|
||||||
// The Status in the KeyContext takes precedence over GetContext state
|
|
||||||
// Status may be an error if there were any IO errors in the table
|
|
||||||
// reader. We never expect Status to be NotFound(), as that is
|
|
||||||
// determined by get_context
|
|
||||||
assert(!status->IsNotFound());
|
|
||||||
if (!status->ok()) {
|
|
||||||
file_range.MarkKeyDone(iter);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (get_context.sample()) {
|
|
||||||
sample_file_read_inc(f->file_metadata);
|
|
||||||
}
|
|
||||||
batch_size++;
|
|
||||||
num_index_read += get_context.get_context_stats_.num_index_read;
|
|
||||||
num_filter_read += get_context.get_context_stats_.num_filter_read;
|
|
||||||
num_data_read += get_context.get_context_stats_.num_data_read;
|
|
||||||
num_sst_read += get_context.get_context_stats_.num_sst_read;
|
|
||||||
// Reset these stats since they're specific to a level
|
|
||||||
get_context.get_context_stats_.num_index_read = 0;
|
|
||||||
get_context.get_context_stats_.num_filter_read = 0;
|
|
||||||
get_context.get_context_stats_.num_data_read = 0;
|
|
||||||
get_context.get_context_stats_.num_sst_read = 0;
|
|
||||||
|
|
||||||
// report the counters before returning
|
|
||||||
if (get_context.State() != GetContext::kNotFound &&
|
|
||||||
get_context.State() != GetContext::kMerge &&
|
|
||||||
db_statistics_ != nullptr) {
|
|
||||||
get_context.ReportCounters();
|
|
||||||
} else {
|
|
||||||
if (iter->max_covering_tombstone_seq > 0) {
|
|
||||||
// The remaining files we look at will only contain covered keys, so
|
|
||||||
// we stop here for this key
|
|
||||||
file_picker_range.SkipKey(iter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
switch (get_context.State()) {
|
|
||||||
case GetContext::kNotFound:
|
|
||||||
// Keep searching in other files
|
|
||||||
break;
|
|
||||||
case GetContext::kMerge:
|
|
||||||
// TODO: update per-level perfcontext user_key_return_count for kMerge
|
|
||||||
break;
|
|
||||||
case GetContext::kFound:
|
|
||||||
if (fp.GetHitFileLevel() == 0) {
|
|
||||||
RecordTick(db_statistics_, GET_HIT_L0);
|
|
||||||
} else if (fp.GetHitFileLevel() == 1) {
|
|
||||||
RecordTick(db_statistics_, GET_HIT_L1);
|
|
||||||
} else if (fp.GetHitFileLevel() >= 2) {
|
|
||||||
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
|
|
||||||
}
|
|
||||||
|
|
||||||
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
|
|
||||||
fp.GetHitFileLevel());
|
|
||||||
|
|
||||||
file_range.MarkKeyDone(iter);
|
|
||||||
|
|
||||||
if (iter->is_blob_index) {
|
|
||||||
if (iter->value) {
|
|
||||||
TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
|
|
||||||
&(*iter));
|
|
||||||
|
|
||||||
const Slice& blob_index_slice = *(iter->value);
|
|
||||||
BlobIndex blob_index;
|
|
||||||
Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
|
|
||||||
if (tmp_s.ok()) {
|
|
||||||
const uint64_t blob_file_num = blob_index.file_number();
|
|
||||||
blob_rqs[blob_file_num].emplace_back(
|
|
||||||
std::make_pair(blob_index, std::cref(*iter)));
|
|
||||||
} else {
|
|
||||||
*(iter->s) = tmp_s;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
file_range.AddValueSize(iter->value->size());
|
|
||||||
if (file_range.GetValueSize() >
|
|
||||||
read_options.value_size_soft_limit) {
|
|
||||||
s = Status::Aborted();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
case GetContext::kDeleted:
|
|
||||||
// Use empty error message for speed
|
|
||||||
*status = Status::NotFound();
|
|
||||||
file_range.MarkKeyDone(iter);
|
|
||||||
continue;
|
|
||||||
case GetContext::kCorrupt:
|
|
||||||
*status =
|
|
||||||
Status::Corruption("corrupted key for ", iter->lkey->user_key());
|
|
||||||
file_range.MarkKeyDone(iter);
|
|
||||||
continue;
|
|
||||||
case GetContext::kUnexpectedBlobIndex:
|
|
||||||
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
|
|
||||||
*status = Status::NotSupported(
|
|
||||||
"Encounter unexpected blob index. Please open DB with "
|
|
||||||
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
|
|
||||||
file_range.MarkKeyDone(iter);
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// If bad status or we found final result for all the keys
|
||||||
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
|
|
||||||
if (!s.ok() || file_picker_range.empty()) {
|
if (!s.ok() || file_picker_range.empty()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
f = fp.GetNextFile();
|
if (!f) {
|
||||||
|
// Reached the end of this level. Prepare the next level
|
||||||
|
fp.PrepareNextLevelForSearch();
|
||||||
|
if (!fp.IsSearchEnded()) {
|
||||||
|
// Its possible there is no overlap on this level and f is nullptr
|
||||||
|
f = fp.GetNextFileInLevel();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dump stats for most recent level
|
// Dump stats for most recent level
|
||||||
|
@ -882,6 +882,13 @@ class Version {
|
|||||||
// This accumulated stats will be used in compaction.
|
// This accumulated stats will be used in compaction.
|
||||||
void UpdateAccumulatedStats();
|
void UpdateAccumulatedStats();
|
||||||
|
|
||||||
|
Status MultiGetFromSST(
|
||||||
|
const ReadOptions& read_options, MultiGetRange file_range,
|
||||||
|
int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f,
|
||||||
|
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs,
|
||||||
|
uint64_t& num_filter_read, uint64_t& num_index_read,
|
||||||
|
uint64_t& num_data_read, uint64_t& num_sst_read);
|
||||||
|
|
||||||
ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs
|
ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs
|
||||||
Logger* info_log_;
|
Logger* info_log_;
|
||||||
Statistics* db_statistics_;
|
Statistics* db_statistics_;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user