This commit is contained in:
bowang 2022-05-17 12:19:18 -07:00
parent 03abd4582d
commit 488fff9b64
7 changed files with 26 additions and 23 deletions

View File

@ -1988,14 +1988,12 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
tracing_get_id = vset_->block_cache_tracer_->NextGetId();
}
ReadOptions read_opts = read_options;
read_opts.rate_limiter_priority = Env::IO_USER;
// Note: the old StackableDB-based BlobDB passes in
// GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we
// need to provide it here.
bool is_blob_index = false;
bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index;
BlobFetcher blob_fetcher(this, read_opts);
BlobFetcher blob_fetcher(this, read_options);
assert(pinned_iters_mgr);
GetContext get_context(
@ -2032,7 +2030,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
get_perf_context()->per_level_perf_context_enabled;
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
*status = table_cache_->Get(
read_opts, *internal_comparator(), *f->file_metadata, ikey,
read_options, *internal_comparator(), *f->file_metadata, ikey,
&get_context, mutable_cf_options_.prefix_extractor,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
@ -2083,7 +2081,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr uint64_t* bytes_read = nullptr;
*status = GetBlob(read_opts, user_key, *value, prefetch_buffer,
*status = GetBlob(read_options, user_key, *value, prefetch_buffer,
value, bytes_read);
if (!status->ok()) {
if (status->IsIncomplete()) {
@ -2157,13 +2155,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
}
ReadOptions read_opts = read_options;
read_opts.rate_limiter_priority = Env::IO_USER;
// Even though we know the batch size won't be > MAX_BATCH_SIZE,
// use autovector in order to avoid unnecessary construction of GetContext
// objects, which is expensive
autovector<GetContext, 16> get_ctx;
BlobFetcher blob_fetcher(this, read_opts);
BlobFetcher blob_fetcher(this, read_options);
for (auto iter = range->begin(); iter != range->end(); ++iter) {
assert(iter->s->ok() || iter->s->IsMergeInProgress());
get_ctx.emplace_back(
@ -2227,7 +2223,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
s = table_cache_->MultiGet(
read_opts, *internal_comparator(), *f->file_metadata, &file_range,
read_options, *internal_comparator(), *f->file_metadata, &file_range,
mutable_cf_options_.prefix_extractor,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
@ -2326,7 +2322,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
}
} else {
file_range.AddValueSize(iter->value->size());
if (file_range.GetValueSize() > read_opts.value_size_soft_limit) {
if (file_range.GetValueSize() >
read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
}
@ -2367,7 +2364,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
if (s.ok() && !blob_rqs.empty()) {
MultiGetBlob(read_opts, keys_with_blobs_range, blob_rqs);
MultiGetBlob(read_options, keys_with_blobs_range, blob_rqs);
}
// Process any left over keys
@ -2398,7 +2395,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
range->MarkKeyDone(iter);
if (range->GetValueSize() > read_opts.value_size_soft_limit) {
if (range->GetValueSize() > read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
}

View File

@ -46,8 +46,9 @@ InternalIteratorBase<IndexValue>* BinarySearchIndexReader::NewIterator(
const BlockBasedTable::Rep* rep = table()->get_rep();
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s = GetOrReadIndexBlock(no_io, read_options, get_context,
lookup_context, &index_block);
const Status s =
GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority,
get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);

View File

@ -19,6 +19,8 @@ class BlockPrefetcher {
readahead_size_(initial_auto_readahead_size),
initial_auto_readahead_size_(initial_auto_readahead_size) {}
// Only three fields of read_options are used: async_io, readahead_size,
// and rate_limiter_priority.
void PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
const BlockHandle& handle,
const ReadOptions& read_options,

View File

@ -116,8 +116,9 @@ InternalIteratorBase<IndexValue>* HashIndexReader::NewIterator(
const BlockBasedTable::Rep* rep = table()->get_rep();
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s = GetOrReadIndexBlock(no_io, read_options, get_context,
lookup_context, &index_block);
const Status s =
GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority,
get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);

View File

@ -33,7 +33,7 @@ Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock(
}
Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock(
bool no_io, const ReadOptions& read_options, GetContext* get_context,
bool no_io, Env::IOPriority rate_limiter_priority, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block) const {
assert(index_block != nullptr);
@ -43,12 +43,13 @@ Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock(
return Status::OK();
}
ReadOptions read_opts = read_options;
ReadOptions read_options;
read_options.rate_limiter_priority = rate_limiter_priority;
if (no_io) {
read_opts.read_tier = kBlockCacheTier;
read_options.read_tier = kBlockCacheTier;
}
return ReadIndexBlock(table_, /*prefetch_buffer=*/nullptr, read_opts,
return ReadIndexBlock(table_, /*prefetch_buffer=*/nullptr, read_options,
cache_index_blocks(), get_context, lookup_context,
index_block);
}

View File

@ -66,7 +66,7 @@ class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader {
return table_->get_rep()->table_options.cache_index_and_filter_blocks;
}
Status GetOrReadIndexBlock(bool no_io, const ReadOptions& read_options,
Status GetOrReadIndexBlock(bool no_io, Env::IOPriority rate_limiter_priority,
GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block) const;

View File

@ -48,8 +48,9 @@ InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
BlockCacheLookupContext* lookup_context) {
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s = GetOrReadIndexBlock(no_io, read_options, get_context,
lookup_context, &index_block);
const Status s =
GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority,
get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);