Move some MultiGet code to new source files
Summary: Move MultiGet functions to new source files that are included by the original files.
This commit is contained in:
parent
4a206e228f
commit
1ee57c5f28
@ -31,16 +31,14 @@
|
||||
#include "util/coding.h"
|
||||
#include "util/stop_watch.h"
|
||||
|
||||
// clang-format off
|
||||
#include "db/table_cache_coro.h"
|
||||
// clang-format on
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
namespace {
|
||||
|
||||
template <class T>
|
||||
static void DeleteEntry(const Slice& /*key*/, void* value) {
|
||||
T* typed_value = reinterpret_cast<T*>(value);
|
||||
delete typed_value;
|
||||
}
|
||||
|
||||
static void UnrefEntry(void* arg1, void* arg2) {
|
||||
Cache* cache = reinterpret_cast<Cache*>(arg1);
|
||||
Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2);
|
||||
@ -484,131 +482,6 @@ Status TableCache::Get(
|
||||
return s;
|
||||
}
|
||||
|
||||
// Batched version of TableCache::MultiGet.
|
||||
Status TableCache::MultiGet(
|
||||
const ReadOptions& options,
|
||||
const InternalKeyComparator& internal_comparator,
|
||||
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
|
||||
const std::shared_ptr<const SliceTransform>& prefix_extractor,
|
||||
HistogramImpl* file_read_hist, bool skip_filters, int level) {
|
||||
auto& fd = file_meta.fd;
|
||||
Status s;
|
||||
TableReader* t = fd.table_reader;
|
||||
Cache::Handle* handle = nullptr;
|
||||
MultiGetRange table_range(*mget_range, mget_range->begin(),
|
||||
mget_range->end());
|
||||
#ifndef ROCKSDB_LITE
|
||||
autovector<std::string, MultiGetContext::MAX_BATCH_SIZE> row_cache_entries;
|
||||
IterKey row_cache_key;
|
||||
size_t row_cache_key_prefix_size = 0;
|
||||
KeyContext& first_key = *table_range.begin();
|
||||
bool lookup_row_cache =
|
||||
ioptions_.row_cache && !first_key.get_context->NeedToReadSequence();
|
||||
|
||||
// Check row cache if enabled. Since row cache does not currently store
|
||||
// sequence numbers, we cannot use it if we need to fetch the sequence.
|
||||
if (lookup_row_cache) {
|
||||
GetContext* first_context = first_key.get_context;
|
||||
CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context,
|
||||
row_cache_key);
|
||||
row_cache_key_prefix_size = row_cache_key.Size();
|
||||
|
||||
for (auto miter = table_range.begin(); miter != table_range.end();
|
||||
++miter) {
|
||||
const Slice& user_key = miter->ukey_with_ts;
|
||||
|
||||
GetContext* get_context = miter->get_context;
|
||||
|
||||
if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
|
||||
get_context)) {
|
||||
table_range.SkipKey(miter);
|
||||
} else {
|
||||
row_cache_entries.emplace_back();
|
||||
get_context->SetReplayLog(&(row_cache_entries.back()));
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// Check that table_range is not empty. Its possible all keys may have been
|
||||
// found in the row cache and thus the range may now be empty
|
||||
if (s.ok() && !table_range.empty()) {
|
||||
if (t == nullptr) {
|
||||
s = FindTable(options, file_options_, internal_comparator, fd, &handle,
|
||||
prefix_extractor,
|
||||
options.read_tier == kBlockCacheTier /* no_io */,
|
||||
true /* record_read_stats */, file_read_hist, skip_filters,
|
||||
level, true /* prefetch_index_and_filter_in_cache */,
|
||||
0 /*max_file_size_for_l0_meta_pin*/, file_meta.temperature);
|
||||
TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
|
||||
if (s.ok()) {
|
||||
t = GetTableReaderFromHandle(handle);
|
||||
assert(t);
|
||||
}
|
||||
}
|
||||
if (s.ok() && !options.ignore_range_deletions) {
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
|
||||
t->NewRangeTombstoneIterator(options));
|
||||
if (range_del_iter != nullptr) {
|
||||
for (auto iter = table_range.begin(); iter != table_range.end();
|
||||
++iter) {
|
||||
SequenceNumber* max_covering_tombstone_seq =
|
||||
iter->get_context->max_covering_tombstone_seq();
|
||||
*max_covering_tombstone_seq = std::max(
|
||||
*max_covering_tombstone_seq,
|
||||
range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
t->MultiGet(options, &table_range, prefix_extractor.get(), skip_filters);
|
||||
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
|
||||
for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
|
||||
Status* status = iter->s;
|
||||
if (status->IsIncomplete()) {
|
||||
// Couldn't find Table in cache but treat as kFound if no_io set
|
||||
iter->get_context->MarkKeyMayExist();
|
||||
s = Status::OK();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (lookup_row_cache) {
|
||||
size_t row_idx = 0;
|
||||
|
||||
for (auto miter = table_range.begin(); miter != table_range.end();
|
||||
++miter) {
|
||||
std::string& row_cache_entry = row_cache_entries[row_idx++];
|
||||
const Slice& user_key = miter->ukey_with_ts;
|
||||
;
|
||||
GetContext* get_context = miter->get_context;
|
||||
|
||||
get_context->SetReplayLog(nullptr);
|
||||
// Compute row cache key.
|
||||
row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(),
|
||||
user_key.size());
|
||||
// Put the replay log in row cache only if something was found.
|
||||
if (s.ok() && !row_cache_entry.empty()) {
|
||||
size_t charge = row_cache_entry.capacity() + sizeof(std::string);
|
||||
void* row_ptr = new std::string(std::move(row_cache_entry));
|
||||
// If row cache is full, it's OK.
|
||||
ioptions_.row_cache
|
||||
->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
|
||||
&DeleteEntry<std::string>)
|
||||
.PermitUncheckedError();
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
if (handle != nullptr) {
|
||||
ReleaseHandle(handle);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status TableCache::GetTableProperties(
|
||||
const FileOptions& file_options,
|
||||
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
|
||||
|
143
db/table_cache_coro.h
Normal file
143
db/table_cache_coro.h
Normal file
@ -0,0 +1,143 @@
|
||||
// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
namespace {
|
||||
|
||||
template <class T>
|
||||
static void DeleteEntry(const Slice& /*key*/, void* value) {
|
||||
T* typed_value = reinterpret_cast<T*>(value);
|
||||
delete typed_value;
|
||||
}
|
||||
}
|
||||
|
||||
// Batched version of TableCache::MultiGet.
|
||||
Status TableCache::MultiGet(
|
||||
const ReadOptions& options,
|
||||
const InternalKeyComparator& internal_comparator,
|
||||
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
|
||||
const std::shared_ptr<const SliceTransform>& prefix_extractor,
|
||||
HistogramImpl* file_read_hist, bool skip_filters, int level) {
|
||||
auto& fd = file_meta.fd;
|
||||
Status s;
|
||||
TableReader* t = fd.table_reader;
|
||||
Cache::Handle* handle = nullptr;
|
||||
MultiGetRange table_range(*mget_range, mget_range->begin(),
|
||||
mget_range->end());
|
||||
#ifndef ROCKSDB_LITE
|
||||
autovector<std::string, MultiGetContext::MAX_BATCH_SIZE> row_cache_entries;
|
||||
IterKey row_cache_key;
|
||||
size_t row_cache_key_prefix_size = 0;
|
||||
KeyContext& first_key = *table_range.begin();
|
||||
bool lookup_row_cache =
|
||||
ioptions_.row_cache && !first_key.get_context->NeedToReadSequence();
|
||||
|
||||
// Check row cache if enabled. Since row cache does not currently store
|
||||
// sequence numbers, we cannot use it if we need to fetch the sequence.
|
||||
if (lookup_row_cache) {
|
||||
GetContext* first_context = first_key.get_context;
|
||||
CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context,
|
||||
row_cache_key);
|
||||
row_cache_key_prefix_size = row_cache_key.Size();
|
||||
|
||||
for (auto miter = table_range.begin(); miter != table_range.end();
|
||||
++miter) {
|
||||
const Slice& user_key = miter->ukey_with_ts;
|
||||
|
||||
GetContext* get_context = miter->get_context;
|
||||
|
||||
if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
|
||||
get_context)) {
|
||||
table_range.SkipKey(miter);
|
||||
} else {
|
||||
row_cache_entries.emplace_back();
|
||||
get_context->SetReplayLog(&(row_cache_entries.back()));
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// Check that table_range is not empty. Its possible all keys may have been
|
||||
// found in the row cache and thus the range may now be empty
|
||||
if (s.ok() && !table_range.empty()) {
|
||||
if (t == nullptr) {
|
||||
s = FindTable(options, file_options_, internal_comparator, fd, &handle,
|
||||
prefix_extractor,
|
||||
options.read_tier == kBlockCacheTier /* no_io */,
|
||||
true /* record_read_stats */, file_read_hist, skip_filters,
|
||||
level, true /* prefetch_index_and_filter_in_cache */,
|
||||
0 /*max_file_size_for_l0_meta_pin*/, file_meta.temperature);
|
||||
TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
|
||||
if (s.ok()) {
|
||||
t = GetTableReaderFromHandle(handle);
|
||||
assert(t);
|
||||
}
|
||||
}
|
||||
if (s.ok() && !options.ignore_range_deletions) {
|
||||
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
|
||||
t->NewRangeTombstoneIterator(options));
|
||||
if (range_del_iter != nullptr) {
|
||||
for (auto iter = table_range.begin(); iter != table_range.end();
|
||||
++iter) {
|
||||
SequenceNumber* max_covering_tombstone_seq =
|
||||
iter->get_context->max_covering_tombstone_seq();
|
||||
*max_covering_tombstone_seq = std::max(
|
||||
*max_covering_tombstone_seq,
|
||||
range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
t->MultiGet(options, &table_range, prefix_extractor.get(), skip_filters);
|
||||
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
|
||||
for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
|
||||
Status* status = iter->s;
|
||||
if (status->IsIncomplete()) {
|
||||
// Couldn't find Table in cache but treat as kFound if no_io set
|
||||
iter->get_context->MarkKeyMayExist();
|
||||
s = Status::OK();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (lookup_row_cache) {
|
||||
size_t row_idx = 0;
|
||||
|
||||
for (auto miter = table_range.begin(); miter != table_range.end();
|
||||
++miter) {
|
||||
std::string& row_cache_entry = row_cache_entries[row_idx++];
|
||||
const Slice& user_key = miter->ukey_with_ts;
|
||||
;
|
||||
GetContext* get_context = miter->get_context;
|
||||
|
||||
get_context->SetReplayLog(nullptr);
|
||||
// Compute row cache key.
|
||||
row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(),
|
||||
user_key.size());
|
||||
// Put the replay log in row cache only if something was found.
|
||||
if (s.ok() && !row_cache_entry.empty()) {
|
||||
size_t charge = row_cache_entry.capacity() + sizeof(std::string);
|
||||
void* row_ptr = new std::string(std::move(row_cache_entry));
|
||||
// If row cache is full, it's OK.
|
||||
ioptions_.row_cache
|
||||
->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
|
||||
&DeleteEntry<std::string>)
|
||||
.PermitUncheckedError();
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
if (handle != nullptr) {
|
||||
ReleaseHandle(handle);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
} // ROCKSDB_NAMESPACE
|
@ -66,6 +66,10 @@
|
||||
#include "util/string_util.h"
|
||||
#include "util/user_comparator_wrapper.h"
|
||||
|
||||
// clang-format off
|
||||
#include "db/version_set_coro.h"
|
||||
// clang-format on
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
namespace {
|
||||
@ -2137,147 +2141,6 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
|
||||
}
|
||||
}
|
||||
|
||||
// Lookup a batch of keys in a single SST file
|
||||
Status Version::MultiGetFromSST(
|
||||
const ReadOptions& read_options, MultiGetRange file_range,
|
||||
int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f,
|
||||
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs,
|
||||
uint64_t& num_filter_read, uint64_t& num_index_read,
|
||||
uint64_t& num_data_read, uint64_t& num_sst_read) {
|
||||
bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
|
||||
get_perf_context()->per_level_perf_context_enabled;
|
||||
|
||||
Status s;
|
||||
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
|
||||
s = table_cache_->MultiGet(
|
||||
read_options, *internal_comparator(), *f->file_metadata, &file_range,
|
||||
mutable_cf_options_.prefix_extractor,
|
||||
cfd_->internal_stats()->GetFileReadHist(hit_file_level),
|
||||
IsFilterSkipped(static_cast<int>(hit_file_level),
|
||||
is_hit_file_last_in_level),
|
||||
hit_file_level);
|
||||
// TODO: examine the behavior for corrupted key
|
||||
if (timer_enabled) {
|
||||
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
|
||||
hit_file_level);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
// TODO: Set status for individual keys appropriately
|
||||
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
|
||||
*iter->s = s;
|
||||
file_range.MarkKeyDone(iter);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
uint64_t batch_size = 0;
|
||||
for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
|
||||
++iter) {
|
||||
GetContext& get_context = *iter->get_context;
|
||||
Status* status = iter->s;
|
||||
// The Status in the KeyContext takes precedence over GetContext state
|
||||
// Status may be an error if there were any IO errors in the table
|
||||
// reader. We never expect Status to be NotFound(), as that is
|
||||
// determined by get_context
|
||||
assert(!status->IsNotFound());
|
||||
if (!status->ok()) {
|
||||
file_range.MarkKeyDone(iter);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (get_context.sample()) {
|
||||
sample_file_read_inc(f->file_metadata);
|
||||
}
|
||||
batch_size++;
|
||||
num_index_read += get_context.get_context_stats_.num_index_read;
|
||||
num_filter_read += get_context.get_context_stats_.num_filter_read;
|
||||
num_data_read += get_context.get_context_stats_.num_data_read;
|
||||
num_sst_read += get_context.get_context_stats_.num_sst_read;
|
||||
// Reset these stats since they're specific to a level
|
||||
get_context.get_context_stats_.num_index_read = 0;
|
||||
get_context.get_context_stats_.num_filter_read = 0;
|
||||
get_context.get_context_stats_.num_data_read = 0;
|
||||
get_context.get_context_stats_.num_sst_read = 0;
|
||||
|
||||
// report the counters before returning
|
||||
if (get_context.State() != GetContext::kNotFound &&
|
||||
get_context.State() != GetContext::kMerge &&
|
||||
db_statistics_ != nullptr) {
|
||||
get_context.ReportCounters();
|
||||
} else {
|
||||
if (iter->max_covering_tombstone_seq > 0) {
|
||||
// The remaining files we look at will only contain covered keys, so
|
||||
// we stop here for this key
|
||||
file_range.SkipKey(iter);
|
||||
}
|
||||
}
|
||||
switch (get_context.State()) {
|
||||
case GetContext::kNotFound:
|
||||
// Keep searching in other files
|
||||
break;
|
||||
case GetContext::kMerge:
|
||||
// TODO: update per-level perfcontext user_key_return_count for kMerge
|
||||
break;
|
||||
case GetContext::kFound:
|
||||
if (hit_file_level == 0) {
|
||||
RecordTick(db_statistics_, GET_HIT_L0);
|
||||
} else if (hit_file_level == 1) {
|
||||
RecordTick(db_statistics_, GET_HIT_L1);
|
||||
} else if (hit_file_level >= 2) {
|
||||
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
|
||||
}
|
||||
|
||||
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level);
|
||||
|
||||
file_range.MarkKeyDone(iter);
|
||||
|
||||
if (iter->is_blob_index) {
|
||||
if (iter->value) {
|
||||
TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
|
||||
&(*iter));
|
||||
|
||||
const Slice& blob_index_slice = *(iter->value);
|
||||
BlobIndex blob_index;
|
||||
Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
|
||||
if (tmp_s.ok()) {
|
||||
const uint64_t blob_file_num = blob_index.file_number();
|
||||
blob_rqs[blob_file_num].emplace_back(
|
||||
std::make_pair(blob_index, std::cref(*iter)));
|
||||
} else {
|
||||
*(iter->s) = tmp_s;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
file_range.AddValueSize(iter->value->size());
|
||||
if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
|
||||
s = Status::Aborted();
|
||||
break;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
case GetContext::kDeleted:
|
||||
// Use empty error message for speed
|
||||
*status = Status::NotFound();
|
||||
file_range.MarkKeyDone(iter);
|
||||
continue;
|
||||
case GetContext::kCorrupt:
|
||||
*status =
|
||||
Status::Corruption("corrupted key for ", iter->lkey->user_key());
|
||||
file_range.MarkKeyDone(iter);
|
||||
continue;
|
||||
case GetContext::kUnexpectedBlobIndex:
|
||||
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
|
||||
*status = Status::NotSupported(
|
||||
"Encounter unexpected blob index. Please open DB with "
|
||||
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
|
||||
file_range.MarkKeyDone(iter);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
|
||||
return s;
|
||||
}
|
||||
|
||||
void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
|
||||
ReadCallback* callback) {
|
||||
PinnedIteratorsManager pinned_iters_mgr;
|
||||
|
151
db/version_set_coro.h
Normal file
151
db/version_set_coro.h
Normal file
@ -0,0 +1,151 @@
|
||||
// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
// Lookup a batch of keys in a single SST file
|
||||
Status Version::MultiGetFromSST(
|
||||
const ReadOptions& read_options, MultiGetRange file_range,
|
||||
int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f,
|
||||
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs,
|
||||
uint64_t& num_filter_read, uint64_t& num_index_read,
|
||||
uint64_t& num_data_read, uint64_t& num_sst_read) {
|
||||
bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
|
||||
get_perf_context()->per_level_perf_context_enabled;
|
||||
|
||||
Status s;
|
||||
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
|
||||
s = table_cache_->MultiGet(
|
||||
read_options, *internal_comparator(), *f->file_metadata, &file_range,
|
||||
mutable_cf_options_.prefix_extractor,
|
||||
cfd_->internal_stats()->GetFileReadHist(hit_file_level),
|
||||
IsFilterSkipped(static_cast<int>(hit_file_level),
|
||||
is_hit_file_last_in_level),
|
||||
hit_file_level);
|
||||
// TODO: examine the behavior for corrupted key
|
||||
if (timer_enabled) {
|
||||
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
|
||||
hit_file_level);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
// TODO: Set status for individual keys appropriately
|
||||
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
|
||||
*iter->s = s;
|
||||
file_range.MarkKeyDone(iter);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
uint64_t batch_size = 0;
|
||||
for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
|
||||
++iter) {
|
||||
GetContext& get_context = *iter->get_context;
|
||||
Status* status = iter->s;
|
||||
// The Status in the KeyContext takes precedence over GetContext state
|
||||
// Status may be an error if there were any IO errors in the table
|
||||
// reader. We never expect Status to be NotFound(), as that is
|
||||
// determined by get_context
|
||||
assert(!status->IsNotFound());
|
||||
if (!status->ok()) {
|
||||
file_range.MarkKeyDone(iter);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (get_context.sample()) {
|
||||
sample_file_read_inc(f->file_metadata);
|
||||
}
|
||||
batch_size++;
|
||||
num_index_read += get_context.get_context_stats_.num_index_read;
|
||||
num_filter_read += get_context.get_context_stats_.num_filter_read;
|
||||
num_data_read += get_context.get_context_stats_.num_data_read;
|
||||
num_sst_read += get_context.get_context_stats_.num_sst_read;
|
||||
// Reset these stats since they're specific to a level
|
||||
get_context.get_context_stats_.num_index_read = 0;
|
||||
get_context.get_context_stats_.num_filter_read = 0;
|
||||
get_context.get_context_stats_.num_data_read = 0;
|
||||
get_context.get_context_stats_.num_sst_read = 0;
|
||||
|
||||
// report the counters before returning
|
||||
if (get_context.State() != GetContext::kNotFound &&
|
||||
get_context.State() != GetContext::kMerge &&
|
||||
db_statistics_ != nullptr) {
|
||||
get_context.ReportCounters();
|
||||
} else {
|
||||
if (iter->max_covering_tombstone_seq > 0) {
|
||||
// The remaining files we look at will only contain covered keys, so
|
||||
// we stop here for this key
|
||||
file_range.SkipKey(iter);
|
||||
}
|
||||
}
|
||||
switch (get_context.State()) {
|
||||
case GetContext::kNotFound:
|
||||
// Keep searching in other files
|
||||
break;
|
||||
case GetContext::kMerge:
|
||||
// TODO: update per-level perfcontext user_key_return_count for kMerge
|
||||
break;
|
||||
case GetContext::kFound:
|
||||
if (hit_file_level == 0) {
|
||||
RecordTick(db_statistics_, GET_HIT_L0);
|
||||
} else if (hit_file_level == 1) {
|
||||
RecordTick(db_statistics_, GET_HIT_L1);
|
||||
} else if (hit_file_level >= 2) {
|
||||
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
|
||||
}
|
||||
|
||||
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level);
|
||||
|
||||
file_range.MarkKeyDone(iter);
|
||||
|
||||
if (iter->is_blob_index) {
|
||||
if (iter->value) {
|
||||
TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
|
||||
&(*iter));
|
||||
|
||||
const Slice& blob_index_slice = *(iter->value);
|
||||
BlobIndex blob_index;
|
||||
Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
|
||||
if (tmp_s.ok()) {
|
||||
const uint64_t blob_file_num = blob_index.file_number();
|
||||
blob_rqs[blob_file_num].emplace_back(
|
||||
std::make_pair(blob_index, std::cref(*iter)));
|
||||
} else {
|
||||
*(iter->s) = tmp_s;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
file_range.AddValueSize(iter->value->size());
|
||||
if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
|
||||
s = Status::Aborted();
|
||||
break;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
case GetContext::kDeleted:
|
||||
// Use empty error message for speed
|
||||
*status = Status::NotFound();
|
||||
file_range.MarkKeyDone(iter);
|
||||
continue;
|
||||
case GetContext::kCorrupt:
|
||||
*status =
|
||||
Status::Corruption("corrupted key for ", iter->lkey->user_key());
|
||||
file_range.MarkKeyDone(iter);
|
||||
continue;
|
||||
case GetContext::kUnexpectedBlobIndex:
|
||||
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
|
||||
*status = Status::NotSupported(
|
||||
"Encounter unexpected blob index. Please open DB with "
|
||||
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
|
||||
file_range.MarkKeyDone(iter);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
|
||||
return s;
|
||||
}
|
||||
} // ROCKSDB_NAMESPACE
|
||||
|
@ -73,6 +73,10 @@
|
||||
#include "util/stop_watch.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
// clang-format off
|
||||
#include "table/block_based/block_based_table_reader_coro.h"
|
||||
// clang-format on
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
extern const uint64_t kBlockBasedTableMagicNumber;
|
||||
@ -138,12 +142,6 @@ inline bool PrefixExtractorChangedHelper(
|
||||
}
|
||||
}
|
||||
|
||||
CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
|
||||
CacheAllocationPtr heap_buf;
|
||||
heap_buf = AllocateBlock(buf.size(), allocator);
|
||||
memcpy(heap_buf.get(), buf.data(), buf.size());
|
||||
return heap_buf;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
void BlockBasedTable::UpdateCacheHitMetrics(BlockType block_type,
|
||||
@ -1646,298 +1644,6 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
|
||||
return s;
|
||||
}
|
||||
|
||||
// This function reads multiple data blocks from disk using Env::MultiRead()
|
||||
// and optionally inserts them into the block cache. It uses the scratch
|
||||
// buffer provided by the caller, which is contiguous. If scratch is a nullptr
|
||||
// it allocates a separate buffer for each block. Typically, if the blocks
|
||||
// need to be uncompressed and there is no compressed block cache, callers
|
||||
// can allocate a temporary scratch buffer in order to minimize memory
|
||||
// allocations.
|
||||
// If options.fill_cache is true, it inserts the blocks into cache. If its
|
||||
// false and scratch is non-null and the blocks are uncompressed, it copies
|
||||
// the buffers to heap. In any case, the CachableEntry<Block> returned will
|
||||
// own the data bytes.
|
||||
// If compression is enabled and also there is no compressed block cache,
|
||||
// the adjacent blocks are read out in one IO (combined read)
|
||||
// batch - A MultiGetRange with only those keys with unique data blocks not
|
||||
// found in cache
|
||||
// handles - A vector of block handles. Some of them me be NULL handles
|
||||
// scratch - An optional contiguous buffer to read compressed blocks into
|
||||
void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
const ReadOptions& options, const MultiGetRange* batch,
|
||||
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
|
||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
|
||||
char* scratch, const UncompressionDict& uncompression_dict) const {
|
||||
RandomAccessFileReader* file = rep_->file.get();
|
||||
const Footer& footer = rep_->footer;
|
||||
const ImmutableOptions& ioptions = rep_->ioptions;
|
||||
size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
|
||||
MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
|
||||
|
||||
if (ioptions.allow_mmap_reads) {
|
||||
size_t idx_in_batch = 0;
|
||||
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
|
||||
++mget_iter, ++idx_in_batch) {
|
||||
BlockCacheLookupContext lookup_data_block_context(
|
||||
TableReaderCaller::kUserMultiGet);
|
||||
const BlockHandle& handle = (*handles)[idx_in_batch];
|
||||
if (handle.IsNull()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
(*statuses)[idx_in_batch] =
|
||||
RetrieveBlock(nullptr, options, handle, uncompression_dict,
|
||||
&(*results)[idx_in_batch], BlockType::kData,
|
||||
mget_iter->get_context, &lookup_data_block_context,
|
||||
/* for_compaction */ false, /* use_cache */ true,
|
||||
/* wait_for_cache */ true);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// In direct IO mode, blocks share the direct io buffer.
|
||||
// Otherwise, blocks share the scratch buffer.
|
||||
const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr;
|
||||
|
||||
autovector<FSReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
|
||||
size_t buf_offset = 0;
|
||||
size_t idx_in_batch = 0;
|
||||
|
||||
uint64_t prev_offset = 0;
|
||||
size_t prev_len = 0;
|
||||
autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_idx_for_block;
|
||||
autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_offset_for_block;
|
||||
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
|
||||
++mget_iter, ++idx_in_batch) {
|
||||
const BlockHandle& handle = (*handles)[idx_in_batch];
|
||||
if (handle.IsNull()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
size_t prev_end = static_cast<size_t>(prev_offset) + prev_len;
|
||||
|
||||
// If current block is adjacent to the previous one, at the same time,
|
||||
// compression is enabled and there is no compressed cache, we combine
|
||||
// the two block read as one.
|
||||
// We don't combine block reads here in direct IO mode, because when doing
|
||||
// direct IO read, the block requests will be realigned and merged when
|
||||
// necessary.
|
||||
if (use_shared_buffer && !file->use_direct_io() &&
|
||||
prev_end == handle.offset()) {
|
||||
req_offset_for_block.emplace_back(prev_len);
|
||||
prev_len += BlockSizeWithTrailer(handle);
|
||||
} else {
|
||||
// No compression or current block and previous one is not adjacent:
|
||||
// Step 1, create a new request for previous blocks
|
||||
if (prev_len != 0) {
|
||||
FSReadRequest req;
|
||||
req.offset = prev_offset;
|
||||
req.len = prev_len;
|
||||
if (file->use_direct_io()) {
|
||||
req.scratch = nullptr;
|
||||
} else if (use_shared_buffer) {
|
||||
req.scratch = scratch + buf_offset;
|
||||
buf_offset += req.len;
|
||||
} else {
|
||||
req.scratch = new char[req.len];
|
||||
}
|
||||
read_reqs.emplace_back(req);
|
||||
}
|
||||
|
||||
// Step 2, remeber the previous block info
|
||||
prev_offset = handle.offset();
|
||||
prev_len = BlockSizeWithTrailer(handle);
|
||||
req_offset_for_block.emplace_back(0);
|
||||
}
|
||||
req_idx_for_block.emplace_back(read_reqs.size());
|
||||
|
||||
PERF_COUNTER_ADD(block_read_count, 1);
|
||||
PERF_COUNTER_ADD(block_read_byte, BlockSizeWithTrailer(handle));
|
||||
}
|
||||
// Handle the last block and process the pending last request
|
||||
if (prev_len != 0) {
|
||||
FSReadRequest req;
|
||||
req.offset = prev_offset;
|
||||
req.len = prev_len;
|
||||
if (file->use_direct_io()) {
|
||||
req.scratch = nullptr;
|
||||
} else if (use_shared_buffer) {
|
||||
req.scratch = scratch + buf_offset;
|
||||
} else {
|
||||
req.scratch = new char[req.len];
|
||||
}
|
||||
read_reqs.emplace_back(req);
|
||||
}
|
||||
|
||||
AlignedBuf direct_io_buf;
|
||||
{
|
||||
IOOptions opts;
|
||||
IOStatus s = file->PrepareIOOptions(options, opts);
|
||||
if (s.ok()) {
|
||||
s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf,
|
||||
options.rate_limiter_priority);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
// Discard all the results in this batch if there is any time out
|
||||
// or overall MultiRead error
|
||||
for (FSReadRequest& req : read_reqs) {
|
||||
req.status = s;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
idx_in_batch = 0;
|
||||
size_t valid_batch_idx = 0;
|
||||
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
|
||||
++mget_iter, ++idx_in_batch) {
|
||||
const BlockHandle& handle = (*handles)[idx_in_batch];
|
||||
|
||||
if (handle.IsNull()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(valid_batch_idx < req_idx_for_block.size());
|
||||
assert(valid_batch_idx < req_offset_for_block.size());
|
||||
assert(req_idx_for_block[valid_batch_idx] < read_reqs.size());
|
||||
size_t& req_idx = req_idx_for_block[valid_batch_idx];
|
||||
size_t& req_offset = req_offset_for_block[valid_batch_idx];
|
||||
valid_batch_idx++;
|
||||
if (mget_iter->get_context) {
|
||||
++(mget_iter->get_context->get_context_stats_.num_data_read);
|
||||
}
|
||||
FSReadRequest& req = read_reqs[req_idx];
|
||||
Status s = req.status;
|
||||
if (s.ok()) {
|
||||
if ((req.result.size() != req.len) ||
|
||||
(req_offset + BlockSizeWithTrailer(handle) > req.result.size())) {
|
||||
s = Status::Corruption("truncated block read from " +
|
||||
rep_->file->file_name() + " offset " +
|
||||
std::to_string(handle.offset()) + ", expected " +
|
||||
std::to_string(req.len) + " bytes, got " +
|
||||
std::to_string(req.result.size()));
|
||||
}
|
||||
}
|
||||
|
||||
BlockContents raw_block_contents;
|
||||
if (s.ok()) {
|
||||
if (!use_shared_buffer) {
|
||||
// We allocated a buffer for this block. Give ownership of it to
|
||||
// BlockContents so it can free the memory
|
||||
assert(req.result.data() == req.scratch);
|
||||
assert(req.result.size() == BlockSizeWithTrailer(handle));
|
||||
assert(req_offset == 0);
|
||||
std::unique_ptr<char[]> raw_block(req.scratch);
|
||||
raw_block_contents = BlockContents(std::move(raw_block), handle.size());
|
||||
} else {
|
||||
// We used the scratch buffer or direct io buffer
|
||||
// which are shared by the blocks.
|
||||
// raw_block_contents does not have the ownership.
|
||||
raw_block_contents =
|
||||
BlockContents(Slice(req.result.data() + req_offset, handle.size()));
|
||||
}
|
||||
#ifndef NDEBUG
|
||||
raw_block_contents.is_raw_block = true;
|
||||
#endif
|
||||
|
||||
if (options.verify_checksums) {
|
||||
PERF_TIMER_GUARD(block_checksum_time);
|
||||
const char* data = req.result.data();
|
||||
// Since the scratch might be shared, the offset of the data block in
|
||||
// the buffer might not be 0. req.result.data() only point to the
|
||||
// begin address of each read request, we need to add the offset
|
||||
// in each read request. Checksum is stored in the block trailer,
|
||||
// beyond the payload size.
|
||||
s = VerifyBlockChecksum(footer.checksum_type(), data + req_offset,
|
||||
handle.size(), rep_->file->file_name(),
|
||||
handle.offset());
|
||||
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
|
||||
}
|
||||
} else if (!use_shared_buffer) {
|
||||
// Free the allocated scratch buffer.
|
||||
delete[] req.scratch;
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
// When the blocks share the same underlying buffer (scratch or direct io
|
||||
// buffer), we may need to manually copy the block into heap if the raw
|
||||
// block has to be inserted into a cache. That falls into th following
|
||||
// cases -
|
||||
// 1. Raw block is not compressed, it needs to be inserted into the
|
||||
// uncompressed block cache if there is one
|
||||
// 2. If the raw block is compressed, it needs to be inserted into the
|
||||
// compressed block cache if there is one
|
||||
//
|
||||
// In all other cases, the raw block is either uncompressed into a heap
|
||||
// buffer or there is no cache at all.
|
||||
CompressionType compression_type =
|
||||
GetBlockCompressionType(raw_block_contents);
|
||||
if (use_shared_buffer && (compression_type == kNoCompression ||
|
||||
(compression_type != kNoCompression &&
|
||||
rep_->table_options.block_cache_compressed))) {
|
||||
Slice raw =
|
||||
Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle));
|
||||
raw_block_contents = BlockContents(
|
||||
CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
|
||||
handle.size());
|
||||
#ifndef NDEBUG
|
||||
raw_block_contents.is_raw_block = true;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
if (options.fill_cache) {
|
||||
BlockCacheLookupContext lookup_data_block_context(
|
||||
TableReaderCaller::kUserMultiGet);
|
||||
CachableEntry<Block>* block_entry = &(*results)[idx_in_batch];
|
||||
// MaybeReadBlockAndLoadToCache will insert into the block caches if
|
||||
// necessary. Since we're passing the raw block contents, it will
|
||||
// avoid looking up the block cache
|
||||
s = MaybeReadBlockAndLoadToCache(
|
||||
nullptr, options, handle, uncompression_dict, /*wait=*/true,
|
||||
/*for_compaction=*/false, block_entry, BlockType::kData,
|
||||
mget_iter->get_context, &lookup_data_block_context,
|
||||
&raw_block_contents);
|
||||
|
||||
// block_entry value could be null if no block cache is present, i.e
|
||||
// BlockBasedTableOptions::no_block_cache is true and no compressed
|
||||
// block cache is configured. In that case, fall
|
||||
// through and set up the block explicitly
|
||||
if (block_entry->GetValue() != nullptr) {
|
||||
s.PermitUncheckedError();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
CompressionType compression_type =
|
||||
GetBlockCompressionType(raw_block_contents);
|
||||
BlockContents contents;
|
||||
if (compression_type != kNoCompression) {
|
||||
UncompressionContext context(compression_type);
|
||||
UncompressionInfo info(context, uncompression_dict, compression_type);
|
||||
s = UncompressBlockContents(
|
||||
info, req.result.data() + req_offset, handle.size(), &contents,
|
||||
footer.format_version(), rep_->ioptions, memory_allocator);
|
||||
} else {
|
||||
// There are two cases here:
|
||||
// 1) caller uses the shared buffer (scratch or direct io buffer);
|
||||
// 2) we use the requst buffer.
|
||||
// If scratch buffer or direct io buffer is used, we ensure that
|
||||
// all raw blocks are copyed to the heap as single blocks. If scratch
|
||||
// buffer is not used, we also have no combined read, so the raw
|
||||
// block can be used directly.
|
||||
contents = std::move(raw_block_contents);
|
||||
}
|
||||
if (s.ok()) {
|
||||
(*results)[idx_in_batch].SetOwnedValue(new Block(
|
||||
std::move(contents), read_amp_bytes_per_bit, ioptions.stats));
|
||||
}
|
||||
}
|
||||
(*statuses)[idx_in_batch] = s;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TBlocklike>
|
||||
Status BlockBasedTable::RetrieveBlock(
|
||||
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
|
||||
@ -2518,443 +2224,6 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
|
||||
return s;
|
||||
}
|
||||
|
||||
using MultiGetRange = MultiGetContext::Range;
|
||||
void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
const MultiGetRange* mget_range,
|
||||
const SliceTransform* prefix_extractor,
|
||||
bool skip_filters) {
|
||||
if (mget_range->empty()) {
|
||||
// Caller should ensure non-empty (performance bug)
|
||||
assert(false);
|
||||
return; // Nothing to do
|
||||
}
|
||||
|
||||
FilterBlockReader* const filter =
|
||||
!skip_filters ? rep_->filter.get() : nullptr;
|
||||
MultiGetRange sst_file_range(*mget_range, mget_range->begin(),
|
||||
mget_range->end());
|
||||
|
||||
// First check the full filter
|
||||
// If full filter not useful, Then go into each block
|
||||
const bool no_io = read_options.read_tier == kBlockCacheTier;
|
||||
uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
|
||||
if (sst_file_range.begin()->get_context) {
|
||||
tracing_mget_id = sst_file_range.begin()->get_context->get_tracing_get_id();
|
||||
}
|
||||
BlockCacheLookupContext lookup_context{
|
||||
TableReaderCaller::kUserMultiGet, tracing_mget_id,
|
||||
/*get_from_user_specified_snapshot=*/read_options.snapshot != nullptr};
|
||||
FullFilterKeysMayMatch(filter, &sst_file_range, no_io, prefix_extractor,
|
||||
&lookup_context);
|
||||
|
||||
if (!sst_file_range.empty()) {
|
||||
IndexBlockIter iiter_on_stack;
|
||||
// if prefix_extractor found in block differs from options, disable
|
||||
// BlockPrefixIndex. Only do this check when index_type is kHashSearch.
|
||||
bool need_upper_bound_check = false;
|
||||
if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
|
||||
need_upper_bound_check = PrefixExtractorChanged(prefix_extractor);
|
||||
}
|
||||
auto iiter =
|
||||
NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack,
|
||||
sst_file_range.begin()->get_context, &lookup_context);
|
||||
std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
|
||||
if (iiter != &iiter_on_stack) {
|
||||
iiter_unique_ptr.reset(iiter);
|
||||
}
|
||||
|
||||
uint64_t prev_offset = std::numeric_limits<uint64_t>::max();
|
||||
autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
|
||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
|
||||
MultiGetContext::Mask reused_mask = 0;
|
||||
char stack_buf[kMultiGetReadStackBufSize];
|
||||
std::unique_ptr<char[]> block_buf;
|
||||
{
|
||||
MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
|
||||
sst_file_range.end());
|
||||
std::vector<Cache::Handle*> cache_handles;
|
||||
bool wait_for_cache_results = false;
|
||||
|
||||
CachableEntry<UncompressionDict> uncompression_dict;
|
||||
Status uncompression_dict_status;
|
||||
uncompression_dict_status.PermitUncheckedError();
|
||||
bool uncompression_dict_inited = false;
|
||||
size_t total_len = 0;
|
||||
ReadOptions ro = read_options;
|
||||
ro.read_tier = kBlockCacheTier;
|
||||
|
||||
for (auto miter = data_block_range.begin();
|
||||
miter != data_block_range.end(); ++miter) {
|
||||
const Slice& key = miter->ikey;
|
||||
iiter->Seek(miter->ikey);
|
||||
|
||||
IndexValue v;
|
||||
if (iiter->Valid()) {
|
||||
v = iiter->value();
|
||||
}
|
||||
if (!iiter->Valid() ||
|
||||
(!v.first_internal_key.empty() && !skip_filters &&
|
||||
UserComparatorWrapper(rep_->internal_comparator.user_comparator())
|
||||
.CompareWithoutTimestamp(
|
||||
ExtractUserKey(key),
|
||||
ExtractUserKey(v.first_internal_key)) < 0)) {
|
||||
// The requested key falls between highest key in previous block and
|
||||
// lowest key in current block.
|
||||
if (!iiter->status().IsNotFound()) {
|
||||
*(miter->s) = iiter->status();
|
||||
}
|
||||
data_block_range.SkipKey(miter);
|
||||
sst_file_range.SkipKey(miter);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!uncompression_dict_inited && rep_->uncompression_dict_reader) {
|
||||
uncompression_dict_status =
|
||||
rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
|
||||
nullptr /* prefetch_buffer */, no_io,
|
||||
read_options.verify_checksums,
|
||||
sst_file_range.begin()->get_context, &lookup_context,
|
||||
&uncompression_dict);
|
||||
uncompression_dict_inited = true;
|
||||
}
|
||||
|
||||
if (!uncompression_dict_status.ok()) {
|
||||
assert(!uncompression_dict_status.IsNotFound());
|
||||
*(miter->s) = uncompression_dict_status;
|
||||
data_block_range.SkipKey(miter);
|
||||
sst_file_range.SkipKey(miter);
|
||||
continue;
|
||||
}
|
||||
|
||||
statuses.emplace_back();
|
||||
results.emplace_back();
|
||||
if (v.handle.offset() == prev_offset) {
|
||||
// This key can reuse the previous block (later on).
|
||||
// Mark previous as "reused"
|
||||
reused_mask |= MultiGetContext::Mask{1} << (block_handles.size() - 1);
|
||||
// Use null handle to indicate this one reuses same block as
|
||||
// previous.
|
||||
block_handles.emplace_back(BlockHandle::NullBlockHandle());
|
||||
continue;
|
||||
}
|
||||
// Lookup the cache for the given data block referenced by an index
|
||||
// iterator value (i.e BlockHandle). If it exists in the cache,
|
||||
// initialize block to the contents of the data block.
|
||||
prev_offset = v.handle.offset();
|
||||
BlockHandle handle = v.handle;
|
||||
BlockCacheLookupContext lookup_data_block_context(
|
||||
TableReaderCaller::kUserMultiGet);
|
||||
const UncompressionDict& dict = uncompression_dict.GetValue()
|
||||
? *uncompression_dict.GetValue()
|
||||
: UncompressionDict::GetEmptyDict();
|
||||
Status s = RetrieveBlock(
|
||||
nullptr, ro, handle, dict, &(results.back()), BlockType::kData,
|
||||
miter->get_context, &lookup_data_block_context,
|
||||
/* for_compaction */ false, /* use_cache */ true,
|
||||
/* wait_for_cache */ false);
|
||||
if (s.IsIncomplete()) {
|
||||
s = Status::OK();
|
||||
}
|
||||
if (s.ok() && !results.back().IsEmpty()) {
|
||||
// Since we have a valid handle, check the value. If its nullptr,
|
||||
// it means the cache is waiting for the final result and we're
|
||||
// supposed to call WaitAll() to wait for the result.
|
||||
if (results.back().GetValue() != nullptr) {
|
||||
// Found it in the cache. Add NULL handle to indicate there is
|
||||
// nothing to read from disk.
|
||||
if (results.back().GetCacheHandle()) {
|
||||
results.back().UpdateCachedValue();
|
||||
}
|
||||
block_handles.emplace_back(BlockHandle::NullBlockHandle());
|
||||
} else {
|
||||
// We have to wait for the cache lookup to finish in the
|
||||
// background, and then we may have to read the block from disk
|
||||
// anyway
|
||||
assert(results.back().GetCacheHandle());
|
||||
wait_for_cache_results = true;
|
||||
block_handles.emplace_back(handle);
|
||||
cache_handles.emplace_back(results.back().GetCacheHandle());
|
||||
}
|
||||
} else {
|
||||
block_handles.emplace_back(handle);
|
||||
total_len += BlockSizeWithTrailer(handle);
|
||||
}
|
||||
}
|
||||
|
||||
if (wait_for_cache_results) {
|
||||
Cache* block_cache = rep_->table_options.block_cache.get();
|
||||
block_cache->WaitAll(cache_handles);
|
||||
for (size_t i = 0; i < block_handles.size(); ++i) {
|
||||
// If this block was a success or failure or not needed because
|
||||
// the corresponding key is in the same block as a prior key, skip
|
||||
if (block_handles[i] == BlockHandle::NullBlockHandle() ||
|
||||
results[i].IsEmpty()) {
|
||||
continue;
|
||||
}
|
||||
results[i].UpdateCachedValue();
|
||||
void* val = results[i].GetValue();
|
||||
if (!val) {
|
||||
// The async cache lookup failed - could be due to an error
|
||||
// or a false positive. We need to read the data block from
|
||||
// the SST file
|
||||
results[i].Reset();
|
||||
total_len += BlockSizeWithTrailer(block_handles[i]);
|
||||
} else {
|
||||
block_handles[i] = BlockHandle::NullBlockHandle();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (total_len) {
|
||||
char* scratch = nullptr;
|
||||
const UncompressionDict& dict = uncompression_dict.GetValue()
|
||||
? *uncompression_dict.GetValue()
|
||||
: UncompressionDict::GetEmptyDict();
|
||||
assert(uncompression_dict_inited || !rep_->uncompression_dict_reader);
|
||||
assert(uncompression_dict_status.ok());
|
||||
// If using direct IO, then scratch is not used, so keep it nullptr.
|
||||
// If the blocks need to be uncompressed and we don't need the
|
||||
// compressed blocks, then we can use a contiguous block of
|
||||
// memory to read in all the blocks as it will be temporary
|
||||
// storage
|
||||
// 1. If blocks are compressed and compressed block cache is there,
|
||||
// alloc heap bufs
|
||||
// 2. If blocks are uncompressed, alloc heap bufs
|
||||
// 3. If blocks are compressed and no compressed block cache, use
|
||||
// stack buf
|
||||
if (!rep_->file->use_direct_io() &&
|
||||
rep_->table_options.block_cache_compressed == nullptr &&
|
||||
rep_->blocks_maybe_compressed) {
|
||||
if (total_len <= kMultiGetReadStackBufSize) {
|
||||
scratch = stack_buf;
|
||||
} else {
|
||||
scratch = new char[total_len];
|
||||
block_buf.reset(scratch);
|
||||
}
|
||||
}
|
||||
RetrieveMultipleBlocks(read_options, &data_block_range, &block_handles,
|
||||
&statuses, &results, scratch, dict);
|
||||
if (sst_file_range.begin()->get_context) {
|
||||
++(sst_file_range.begin()
|
||||
->get_context->get_context_stats_.num_sst_read);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DataBlockIter first_biter;
|
||||
DataBlockIter next_biter;
|
||||
size_t idx_in_batch = 0;
|
||||
SharedCleanablePtr shared_cleanable;
|
||||
for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
|
||||
++miter) {
|
||||
Status s;
|
||||
GetContext* get_context = miter->get_context;
|
||||
const Slice& key = miter->ikey;
|
||||
bool matched = false; // if such user key matched a key in SST
|
||||
bool done = false;
|
||||
bool first_block = true;
|
||||
do {
|
||||
DataBlockIter* biter = nullptr;
|
||||
bool reusing_prev_block;
|
||||
bool later_reused;
|
||||
uint64_t referenced_data_size = 0;
|
||||
bool does_referenced_key_exist = false;
|
||||
BlockCacheLookupContext lookup_data_block_context(
|
||||
TableReaderCaller::kUserMultiGet, tracing_mget_id,
|
||||
/*get_from_user_specified_snapshot=*/read_options.snapshot !=
|
||||
nullptr);
|
||||
if (first_block) {
|
||||
if (!block_handles[idx_in_batch].IsNull() ||
|
||||
!results[idx_in_batch].IsEmpty()) {
|
||||
first_biter.Invalidate(Status::OK());
|
||||
NewDataBlockIterator<DataBlockIter>(
|
||||
read_options, results[idx_in_batch], &first_biter,
|
||||
statuses[idx_in_batch]);
|
||||
reusing_prev_block = false;
|
||||
} else {
|
||||
// If handler is null and result is empty, then the status is never
|
||||
// set, which should be the initial value: ok().
|
||||
assert(statuses[idx_in_batch].ok());
|
||||
reusing_prev_block = true;
|
||||
}
|
||||
biter = &first_biter;
|
||||
later_reused =
|
||||
(reused_mask & (MultiGetContext::Mask{1} << idx_in_batch)) != 0;
|
||||
idx_in_batch++;
|
||||
} else {
|
||||
IndexValue v = iiter->value();
|
||||
if (!v.first_internal_key.empty() && !skip_filters &&
|
||||
UserComparatorWrapper(rep_->internal_comparator.user_comparator())
|
||||
.CompareWithoutTimestamp(
|
||||
ExtractUserKey(key),
|
||||
ExtractUserKey(v.first_internal_key)) < 0) {
|
||||
// The requested key falls between highest key in previous block and
|
||||
// lowest key in current block.
|
||||
break;
|
||||
}
|
||||
|
||||
next_biter.Invalidate(Status::OK());
|
||||
NewDataBlockIterator<DataBlockIter>(
|
||||
read_options, iiter->value().handle, &next_biter,
|
||||
BlockType::kData, get_context, &lookup_data_block_context,
|
||||
Status(), nullptr);
|
||||
biter = &next_biter;
|
||||
reusing_prev_block = false;
|
||||
later_reused = false;
|
||||
}
|
||||
|
||||
if (read_options.read_tier == kBlockCacheTier &&
|
||||
biter->status().IsIncomplete()) {
|
||||
// couldn't get block from block_cache
|
||||
// Update Saver.state to Found because we are only looking for
|
||||
// whether we can guarantee the key is not there when "no_io" is set
|
||||
get_context->MarkKeyMayExist();
|
||||
break;
|
||||
}
|
||||
if (!biter->status().ok()) {
|
||||
s = biter->status();
|
||||
break;
|
||||
}
|
||||
|
||||
bool may_exist = biter->SeekForGet(key);
|
||||
if (!may_exist) {
|
||||
// HashSeek cannot find the key this block and the the iter is not
|
||||
// the end of the block, i.e. cannot be in the following blocks
|
||||
// either. In this case, the seek_key cannot be found, so we break
|
||||
// from the top level for-loop.
|
||||
break;
|
||||
}
|
||||
|
||||
// Reusing blocks complicates pinning/Cleanable, because the cache
|
||||
// entry referenced by biter can only be released once all returned
|
||||
// pinned values are released. This code previously did an extra
|
||||
// block_cache Ref for each reuse, but that unnecessarily increases
|
||||
// block cache contention. Instead we can use a variant of shared_ptr
|
||||
// to release in block cache only once.
|
||||
//
|
||||
// Although the biter loop below might SaveValue multiple times for
|
||||
// merges, just one value_pinner suffices, as MultiGet will merge
|
||||
// the operands before returning to the API user.
|
||||
Cleanable* value_pinner;
|
||||
if (biter->IsValuePinned()) {
|
||||
if (reusing_prev_block) {
|
||||
// Note that we don't yet know if the MultiGet results will need
|
||||
// to pin this block, so we might wrap a block for sharing and
|
||||
// still end up with 1 (or 0) pinning ref. Not ideal but OK.
|
||||
//
|
||||
// Here we avoid adding redundant cleanups if we didn't end up
|
||||
// delegating the cleanup from last time around.
|
||||
if (!biter->HasCleanups()) {
|
||||
assert(shared_cleanable.get());
|
||||
if (later_reused) {
|
||||
shared_cleanable.RegisterCopyWith(biter);
|
||||
} else {
|
||||
shared_cleanable.MoveAsCleanupTo(biter);
|
||||
}
|
||||
}
|
||||
} else if (later_reused) {
|
||||
assert(biter->HasCleanups());
|
||||
// Make the existing cleanups on `biter` sharable:
|
||||
shared_cleanable.Allocate();
|
||||
// Move existing `biter` cleanup(s) to `shared_cleanable`
|
||||
biter->DelegateCleanupsTo(&*shared_cleanable);
|
||||
// Reference `shared_cleanable` as new cleanup for `biter`
|
||||
shared_cleanable.RegisterCopyWith(biter);
|
||||
}
|
||||
assert(biter->HasCleanups());
|
||||
value_pinner = biter;
|
||||
} else {
|
||||
value_pinner = nullptr;
|
||||
}
|
||||
|
||||
// Call the *saver function on each entry/block until it returns false
|
||||
for (; biter->Valid(); biter->Next()) {
|
||||
ParsedInternalKey parsed_key;
|
||||
Status pik_status = ParseInternalKey(
|
||||
biter->key(), &parsed_key, false /* log_err_key */); // TODO
|
||||
if (!pik_status.ok()) {
|
||||
s = pik_status;
|
||||
}
|
||||
if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
|
||||
value_pinner)) {
|
||||
if (get_context->State() == GetContext::GetState::kFound) {
|
||||
does_referenced_key_exist = true;
|
||||
referenced_data_size =
|
||||
biter->key().size() + biter->value().size();
|
||||
}
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
s = biter->status();
|
||||
}
|
||||
// Write the block cache access.
|
||||
// XXX: There appear to be 'break' statements above that bypass this
|
||||
// writing of the block cache trace record
|
||||
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() &&
|
||||
!reusing_prev_block) {
|
||||
// Avoid making copy of block_key, cf_name, and referenced_key when
|
||||
// constructing the access record.
|
||||
Slice referenced_key;
|
||||
if (does_referenced_key_exist) {
|
||||
referenced_key = biter->key();
|
||||
} else {
|
||||
referenced_key = key;
|
||||
}
|
||||
BlockCacheTraceRecord access_record(
|
||||
rep_->ioptions.clock->NowMicros(),
|
||||
/*block_key=*/"", lookup_data_block_context.block_type,
|
||||
lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
|
||||
/*cf_name=*/"", rep_->level_for_tracing(),
|
||||
rep_->sst_number_for_tracing(), lookup_data_block_context.caller,
|
||||
lookup_data_block_context.is_cache_hit,
|
||||
lookup_data_block_context.no_insert,
|
||||
lookup_data_block_context.get_id,
|
||||
lookup_data_block_context.get_from_user_specified_snapshot,
|
||||
/*referenced_key=*/"", referenced_data_size,
|
||||
lookup_data_block_context.num_keys_in_block,
|
||||
does_referenced_key_exist);
|
||||
// TODO: Should handle status here?
|
||||
block_cache_tracer_
|
||||
->WriteBlockAccess(access_record,
|
||||
lookup_data_block_context.block_key,
|
||||
rep_->cf_name_for_tracing(), referenced_key)
|
||||
.PermitUncheckedError();
|
||||
}
|
||||
s = biter->status();
|
||||
if (done) {
|
||||
// Avoid the extra Next which is expensive in two-level indexes
|
||||
break;
|
||||
}
|
||||
if (first_block) {
|
||||
iiter->Seek(key);
|
||||
if (!iiter->Valid()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
first_block = false;
|
||||
iiter->Next();
|
||||
} while (iiter->Valid());
|
||||
|
||||
if (matched && filter != nullptr && !filter->IsBlockBased()) {
|
||||
RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE);
|
||||
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
|
||||
rep_->level);
|
||||
}
|
||||
if (s.ok() && !iiter->status().IsNotFound()) {
|
||||
s = iiter->status();
|
||||
}
|
||||
*(miter->s) = s;
|
||||
}
|
||||
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
// Not sure why we need to do it. Should investigate more.
|
||||
for (auto& st : statuses) {
|
||||
st.PermitUncheckedError();
|
||||
}
|
||||
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
}
|
||||
}
|
||||
|
||||
Status BlockBasedTable::Prefetch(const Slice* const begin,
|
||||
const Slice* const end) {
|
||||
auto& comparator = rep_->internal_comparator;
|
||||
|
748
table/block_based/block_based_table_reader_coro.h
Normal file
748
table/block_based/block_based_table_reader_coro.h
Normal file
@ -0,0 +1,748 @@
|
||||
// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
namespace {
|
||||
|
||||
CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
|
||||
CacheAllocationPtr heap_buf;
|
||||
heap_buf = AllocateBlock(buf.size(), allocator);
|
||||
memcpy(heap_buf.get(), buf.data(), buf.size());
|
||||
return heap_buf;
|
||||
}
|
||||
}
|
||||
|
||||
// This function reads multiple data blocks from disk using Env::MultiRead()
|
||||
// and optionally inserts them into the block cache. It uses the scratch
|
||||
// buffer provided by the caller, which is contiguous. If scratch is a nullptr
|
||||
// it allocates a separate buffer for each block. Typically, if the blocks
|
||||
// need to be uncompressed and there is no compressed block cache, callers
|
||||
// can allocate a temporary scratch buffer in order to minimize memory
|
||||
// allocations.
|
||||
// If options.fill_cache is true, it inserts the blocks into cache. If its
|
||||
// false and scratch is non-null and the blocks are uncompressed, it copies
|
||||
// the buffers to heap. In any case, the CachableEntry<Block> returned will
|
||||
// own the data bytes.
|
||||
// If compression is enabled and also there is no compressed block cache,
|
||||
// the adjacent blocks are read out in one IO (combined read)
|
||||
// batch - A MultiGetRange with only those keys with unique data blocks not
|
||||
// found in cache
|
||||
// handles - A vector of block handles. Some of them me be NULL handles
|
||||
// scratch - An optional contiguous buffer to read compressed blocks into
|
||||
void BlockBasedTable::RetrieveMultipleBlocks(
|
||||
const ReadOptions& options, const MultiGetRange* batch,
|
||||
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
|
||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
|
||||
char* scratch, const UncompressionDict& uncompression_dict) const {
|
||||
RandomAccessFileReader* file = rep_->file.get();
|
||||
const Footer& footer = rep_->footer;
|
||||
const ImmutableOptions& ioptions = rep_->ioptions;
|
||||
size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
|
||||
MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
|
||||
|
||||
if (ioptions.allow_mmap_reads) {
|
||||
size_t idx_in_batch = 0;
|
||||
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
|
||||
++mget_iter, ++idx_in_batch) {
|
||||
BlockCacheLookupContext lookup_data_block_context(
|
||||
TableReaderCaller::kUserMultiGet);
|
||||
const BlockHandle& handle = (*handles)[idx_in_batch];
|
||||
if (handle.IsNull()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
(*statuses)[idx_in_batch] =
|
||||
RetrieveBlock(nullptr, options, handle, uncompression_dict,
|
||||
&(*results)[idx_in_batch], BlockType::kData,
|
||||
mget_iter->get_context, &lookup_data_block_context,
|
||||
/* for_compaction */ false, /* use_cache */ true,
|
||||
/* wait_for_cache */ true);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// In direct IO mode, blocks share the direct io buffer.
|
||||
// Otherwise, blocks share the scratch buffer.
|
||||
const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr;
|
||||
|
||||
autovector<FSReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
|
||||
size_t buf_offset = 0;
|
||||
size_t idx_in_batch = 0;
|
||||
|
||||
uint64_t prev_offset = 0;
|
||||
size_t prev_len = 0;
|
||||
autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_idx_for_block;
|
||||
autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_offset_for_block;
|
||||
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
|
||||
++mget_iter, ++idx_in_batch) {
|
||||
const BlockHandle& handle = (*handles)[idx_in_batch];
|
||||
if (handle.IsNull()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
size_t prev_end = static_cast<size_t>(prev_offset) + prev_len;
|
||||
|
||||
// If current block is adjacent to the previous one, at the same time,
|
||||
// compression is enabled and there is no compressed cache, we combine
|
||||
// the two block read as one.
|
||||
// We don't combine block reads here in direct IO mode, because when doing
|
||||
// direct IO read, the block requests will be realigned and merged when
|
||||
// necessary.
|
||||
if (use_shared_buffer && !file->use_direct_io() &&
|
||||
prev_end == handle.offset()) {
|
||||
req_offset_for_block.emplace_back(prev_len);
|
||||
prev_len += BlockSizeWithTrailer(handle);
|
||||
} else {
|
||||
// No compression or current block and previous one is not adjacent:
|
||||
// Step 1, create a new request for previous blocks
|
||||
if (prev_len != 0) {
|
||||
FSReadRequest req;
|
||||
req.offset = prev_offset;
|
||||
req.len = prev_len;
|
||||
if (file->use_direct_io()) {
|
||||
req.scratch = nullptr;
|
||||
} else if (use_shared_buffer) {
|
||||
req.scratch = scratch + buf_offset;
|
||||
buf_offset += req.len;
|
||||
} else {
|
||||
req.scratch = new char[req.len];
|
||||
}
|
||||
read_reqs.emplace_back(req);
|
||||
}
|
||||
|
||||
// Step 2, remeber the previous block info
|
||||
prev_offset = handle.offset();
|
||||
prev_len = BlockSizeWithTrailer(handle);
|
||||
req_offset_for_block.emplace_back(0);
|
||||
}
|
||||
req_idx_for_block.emplace_back(read_reqs.size());
|
||||
|
||||
PERF_COUNTER_ADD(block_read_count, 1);
|
||||
PERF_COUNTER_ADD(block_read_byte, BlockSizeWithTrailer(handle));
|
||||
}
|
||||
// Handle the last block and process the pending last request
|
||||
if (prev_len != 0) {
|
||||
FSReadRequest req;
|
||||
req.offset = prev_offset;
|
||||
req.len = prev_len;
|
||||
if (file->use_direct_io()) {
|
||||
req.scratch = nullptr;
|
||||
} else if (use_shared_buffer) {
|
||||
req.scratch = scratch + buf_offset;
|
||||
} else {
|
||||
req.scratch = new char[req.len];
|
||||
}
|
||||
read_reqs.emplace_back(req);
|
||||
}
|
||||
|
||||
AlignedBuf direct_io_buf;
|
||||
{
|
||||
IOOptions opts;
|
||||
IOStatus s = file->PrepareIOOptions(options, opts);
|
||||
if (s.ok()) {
|
||||
s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf,
|
||||
options.rate_limiter_priority);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
// Discard all the results in this batch if there is any time out
|
||||
// or overall MultiRead error
|
||||
for (FSReadRequest& req : read_reqs) {
|
||||
req.status = s;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
idx_in_batch = 0;
|
||||
size_t valid_batch_idx = 0;
|
||||
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
|
||||
++mget_iter, ++idx_in_batch) {
|
||||
const BlockHandle& handle = (*handles)[idx_in_batch];
|
||||
|
||||
if (handle.IsNull()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(valid_batch_idx < req_idx_for_block.size());
|
||||
assert(valid_batch_idx < req_offset_for_block.size());
|
||||
assert(req_idx_for_block[valid_batch_idx] < read_reqs.size());
|
||||
size_t& req_idx = req_idx_for_block[valid_batch_idx];
|
||||
size_t& req_offset = req_offset_for_block[valid_batch_idx];
|
||||
valid_batch_idx++;
|
||||
if (mget_iter->get_context) {
|
||||
++(mget_iter->get_context->get_context_stats_.num_data_read);
|
||||
}
|
||||
FSReadRequest& req = read_reqs[req_idx];
|
||||
Status s = req.status;
|
||||
if (s.ok()) {
|
||||
if ((req.result.size() != req.len) ||
|
||||
(req_offset + BlockSizeWithTrailer(handle) > req.result.size())) {
|
||||
s = Status::Corruption("truncated block read from " +
|
||||
rep_->file->file_name() + " offset " +
|
||||
std::to_string(handle.offset()) + ", expected " +
|
||||
std::to_string(req.len) + " bytes, got " +
|
||||
std::to_string(req.result.size()));
|
||||
}
|
||||
}
|
||||
|
||||
BlockContents raw_block_contents;
|
||||
if (s.ok()) {
|
||||
if (!use_shared_buffer) {
|
||||
// We allocated a buffer for this block. Give ownership of it to
|
||||
// BlockContents so it can free the memory
|
||||
assert(req.result.data() == req.scratch);
|
||||
assert(req.result.size() == BlockSizeWithTrailer(handle));
|
||||
assert(req_offset == 0);
|
||||
std::unique_ptr<char[]> raw_block(req.scratch);
|
||||
raw_block_contents = BlockContents(std::move(raw_block), handle.size());
|
||||
} else {
|
||||
// We used the scratch buffer or direct io buffer
|
||||
// which are shared by the blocks.
|
||||
// raw_block_contents does not have the ownership.
|
||||
raw_block_contents =
|
||||
BlockContents(Slice(req.result.data() + req_offset, handle.size()));
|
||||
}
|
||||
#ifndef NDEBUG
|
||||
raw_block_contents.is_raw_block = true;
|
||||
#endif
|
||||
|
||||
if (options.verify_checksums) {
|
||||
PERF_TIMER_GUARD(block_checksum_time);
|
||||
const char* data = req.result.data();
|
||||
// Since the scratch might be shared, the offset of the data block in
|
||||
// the buffer might not be 0. req.result.data() only point to the
|
||||
// begin address of each read request, we need to add the offset
|
||||
// in each read request. Checksum is stored in the block trailer,
|
||||
// beyond the payload size.
|
||||
s = VerifyBlockChecksum(footer.checksum_type(), data + req_offset,
|
||||
handle.size(), rep_->file->file_name(),
|
||||
handle.offset());
|
||||
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
|
||||
}
|
||||
} else if (!use_shared_buffer) {
|
||||
// Free the allocated scratch buffer.
|
||||
delete[] req.scratch;
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
// When the blocks share the same underlying buffer (scratch or direct io
|
||||
// buffer), we may need to manually copy the block into heap if the raw
|
||||
// block has to be inserted into a cache. That falls into th following
|
||||
// cases -
|
||||
// 1. Raw block is not compressed, it needs to be inserted into the
|
||||
// uncompressed block cache if there is one
|
||||
// 2. If the raw block is compressed, it needs to be inserted into the
|
||||
// compressed block cache if there is one
|
||||
//
|
||||
// In all other cases, the raw block is either uncompressed into a heap
|
||||
// buffer or there is no cache at all.
|
||||
CompressionType compression_type =
|
||||
GetBlockCompressionType(raw_block_contents);
|
||||
if (use_shared_buffer && (compression_type == kNoCompression ||
|
||||
(compression_type != kNoCompression &&
|
||||
rep_->table_options.block_cache_compressed))) {
|
||||
Slice raw =
|
||||
Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle));
|
||||
raw_block_contents = BlockContents(
|
||||
CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
|
||||
handle.size());
|
||||
#ifndef NDEBUG
|
||||
raw_block_contents.is_raw_block = true;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
if (options.fill_cache) {
|
||||
BlockCacheLookupContext lookup_data_block_context(
|
||||
TableReaderCaller::kUserMultiGet);
|
||||
CachableEntry<Block>* block_entry = &(*results)[idx_in_batch];
|
||||
// MaybeReadBlockAndLoadToCache will insert into the block caches if
|
||||
// necessary. Since we're passing the raw block contents, it will
|
||||
// avoid looking up the block cache
|
||||
s = MaybeReadBlockAndLoadToCache(
|
||||
nullptr, options, handle, uncompression_dict, /*wait=*/true,
|
||||
/*for_compaction=*/false, block_entry, BlockType::kData,
|
||||
mget_iter->get_context, &lookup_data_block_context,
|
||||
&raw_block_contents);
|
||||
|
||||
// block_entry value could be null if no block cache is present, i.e
|
||||
// BlockBasedTableOptions::no_block_cache is true and no compressed
|
||||
// block cache is configured. In that case, fall
|
||||
// through and set up the block explicitly
|
||||
if (block_entry->GetValue() != nullptr) {
|
||||
s.PermitUncheckedError();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
CompressionType compression_type =
|
||||
GetBlockCompressionType(raw_block_contents);
|
||||
BlockContents contents;
|
||||
if (compression_type != kNoCompression) {
|
||||
UncompressionContext context(compression_type);
|
||||
UncompressionInfo info(context, uncompression_dict, compression_type);
|
||||
s = UncompressBlockContents(
|
||||
info, req.result.data() + req_offset, handle.size(), &contents,
|
||||
footer.format_version(), rep_->ioptions, memory_allocator);
|
||||
} else {
|
||||
// There are two cases here:
|
||||
// 1) caller uses the shared buffer (scratch or direct io buffer);
|
||||
// 2) we use the requst buffer.
|
||||
// If scratch buffer or direct io buffer is used, we ensure that
|
||||
// all raw blocks are copyed to the heap as single blocks. If scratch
|
||||
// buffer is not used, we also have no combined read, so the raw
|
||||
// block can be used directly.
|
||||
contents = std::move(raw_block_contents);
|
||||
}
|
||||
if (s.ok()) {
|
||||
(*results)[idx_in_batch].SetOwnedValue(new Block(
|
||||
std::move(contents), read_amp_bytes_per_bit, ioptions.stats));
|
||||
}
|
||||
}
|
||||
(*statuses)[idx_in_batch] = s;
|
||||
}
|
||||
}
|
||||
|
||||
using MultiGetRange = MultiGetContext::Range;
|
||||
void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||
const MultiGetRange* mget_range,
|
||||
const SliceTransform* prefix_extractor,
|
||||
bool skip_filters) {
|
||||
if (mget_range->empty()) {
|
||||
// Caller should ensure non-empty (performance bug)
|
||||
assert(false);
|
||||
return; // Nothing to do
|
||||
}
|
||||
|
||||
FilterBlockReader* const filter =
|
||||
!skip_filters ? rep_->filter.get() : nullptr;
|
||||
MultiGetRange sst_file_range(*mget_range, mget_range->begin(),
|
||||
mget_range->end());
|
||||
|
||||
// First check the full filter
|
||||
// If full filter not useful, Then go into each block
|
||||
const bool no_io = read_options.read_tier == kBlockCacheTier;
|
||||
uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
|
||||
if (sst_file_range.begin()->get_context) {
|
||||
tracing_mget_id = sst_file_range.begin()->get_context->get_tracing_get_id();
|
||||
}
|
||||
BlockCacheLookupContext lookup_context{
|
||||
TableReaderCaller::kUserMultiGet, tracing_mget_id,
|
||||
/*get_from_user_specified_snapshot=*/read_options.snapshot != nullptr};
|
||||
FullFilterKeysMayMatch(filter, &sst_file_range, no_io, prefix_extractor,
|
||||
&lookup_context);
|
||||
|
||||
if (!sst_file_range.empty()) {
|
||||
IndexBlockIter iiter_on_stack;
|
||||
// if prefix_extractor found in block differs from options, disable
|
||||
// BlockPrefixIndex. Only do this check when index_type is kHashSearch.
|
||||
bool need_upper_bound_check = false;
|
||||
if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
|
||||
need_upper_bound_check = PrefixExtractorChanged(prefix_extractor);
|
||||
}
|
||||
auto iiter =
|
||||
NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack,
|
||||
sst_file_range.begin()->get_context, &lookup_context);
|
||||
std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
|
||||
if (iiter != &iiter_on_stack) {
|
||||
iiter_unique_ptr.reset(iiter);
|
||||
}
|
||||
|
||||
uint64_t prev_offset = std::numeric_limits<uint64_t>::max();
|
||||
autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
|
||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
|
||||
MultiGetContext::Mask reused_mask = 0;
|
||||
char stack_buf[kMultiGetReadStackBufSize];
|
||||
std::unique_ptr<char[]> block_buf;
|
||||
{
|
||||
MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
|
||||
sst_file_range.end());
|
||||
std::vector<Cache::Handle*> cache_handles;
|
||||
bool wait_for_cache_results = false;
|
||||
|
||||
CachableEntry<UncompressionDict> uncompression_dict;
|
||||
Status uncompression_dict_status;
|
||||
uncompression_dict_status.PermitUncheckedError();
|
||||
bool uncompression_dict_inited = false;
|
||||
size_t total_len = 0;
|
||||
ReadOptions ro = read_options;
|
||||
ro.read_tier = kBlockCacheTier;
|
||||
|
||||
for (auto miter = data_block_range.begin();
|
||||
miter != data_block_range.end(); ++miter) {
|
||||
const Slice& key = miter->ikey;
|
||||
iiter->Seek(miter->ikey);
|
||||
|
||||
IndexValue v;
|
||||
if (iiter->Valid()) {
|
||||
v = iiter->value();
|
||||
}
|
||||
if (!iiter->Valid() ||
|
||||
(!v.first_internal_key.empty() && !skip_filters &&
|
||||
UserComparatorWrapper(rep_->internal_comparator.user_comparator())
|
||||
.CompareWithoutTimestamp(
|
||||
ExtractUserKey(key),
|
||||
ExtractUserKey(v.first_internal_key)) < 0)) {
|
||||
// The requested key falls between highest key in previous block and
|
||||
// lowest key in current block.
|
||||
if (!iiter->status().IsNotFound()) {
|
||||
*(miter->s) = iiter->status();
|
||||
}
|
||||
data_block_range.SkipKey(miter);
|
||||
sst_file_range.SkipKey(miter);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!uncompression_dict_inited && rep_->uncompression_dict_reader) {
|
||||
uncompression_dict_status =
|
||||
rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
|
||||
nullptr /* prefetch_buffer */, no_io,
|
||||
read_options.verify_checksums,
|
||||
sst_file_range.begin()->get_context, &lookup_context,
|
||||
&uncompression_dict);
|
||||
uncompression_dict_inited = true;
|
||||
}
|
||||
|
||||
if (!uncompression_dict_status.ok()) {
|
||||
assert(!uncompression_dict_status.IsNotFound());
|
||||
*(miter->s) = uncompression_dict_status;
|
||||
data_block_range.SkipKey(miter);
|
||||
sst_file_range.SkipKey(miter);
|
||||
continue;
|
||||
}
|
||||
|
||||
statuses.emplace_back();
|
||||
results.emplace_back();
|
||||
if (v.handle.offset() == prev_offset) {
|
||||
// This key can reuse the previous block (later on).
|
||||
// Mark previous as "reused"
|
||||
reused_mask |= MultiGetContext::Mask{1} << (block_handles.size() - 1);
|
||||
// Use null handle to indicate this one reuses same block as
|
||||
// previous.
|
||||
block_handles.emplace_back(BlockHandle::NullBlockHandle());
|
||||
continue;
|
||||
}
|
||||
// Lookup the cache for the given data block referenced by an index
|
||||
// iterator value (i.e BlockHandle). If it exists in the cache,
|
||||
// initialize block to the contents of the data block.
|
||||
prev_offset = v.handle.offset();
|
||||
BlockHandle handle = v.handle;
|
||||
BlockCacheLookupContext lookup_data_block_context(
|
||||
TableReaderCaller::kUserMultiGet);
|
||||
const UncompressionDict& dict = uncompression_dict.GetValue()
|
||||
? *uncompression_dict.GetValue()
|
||||
: UncompressionDict::GetEmptyDict();
|
||||
Status s = RetrieveBlock(
|
||||
nullptr, ro, handle, dict, &(results.back()), BlockType::kData,
|
||||
miter->get_context, &lookup_data_block_context,
|
||||
/* for_compaction */ false, /* use_cache */ true,
|
||||
/* wait_for_cache */ false);
|
||||
if (s.IsIncomplete()) {
|
||||
s = Status::OK();
|
||||
}
|
||||
if (s.ok() && !results.back().IsEmpty()) {
|
||||
// Since we have a valid handle, check the value. If its nullptr,
|
||||
// it means the cache is waiting for the final result and we're
|
||||
// supposed to call WaitAll() to wait for the result.
|
||||
if (results.back().GetValue() != nullptr) {
|
||||
// Found it in the cache. Add NULL handle to indicate there is
|
||||
// nothing to read from disk.
|
||||
if (results.back().GetCacheHandle()) {
|
||||
results.back().UpdateCachedValue();
|
||||
}
|
||||
block_handles.emplace_back(BlockHandle::NullBlockHandle());
|
||||
} else {
|
||||
// We have to wait for the cache lookup to finish in the
|
||||
// background, and then we may have to read the block from disk
|
||||
// anyway
|
||||
assert(results.back().GetCacheHandle());
|
||||
wait_for_cache_results = true;
|
||||
block_handles.emplace_back(handle);
|
||||
cache_handles.emplace_back(results.back().GetCacheHandle());
|
||||
}
|
||||
} else {
|
||||
block_handles.emplace_back(handle);
|
||||
total_len += BlockSizeWithTrailer(handle);
|
||||
}
|
||||
}
|
||||
|
||||
if (wait_for_cache_results) {
|
||||
Cache* block_cache = rep_->table_options.block_cache.get();
|
||||
block_cache->WaitAll(cache_handles);
|
||||
for (size_t i = 0; i < block_handles.size(); ++i) {
|
||||
// If this block was a success or failure or not needed because
|
||||
// the corresponding key is in the same block as a prior key, skip
|
||||
if (block_handles[i] == BlockHandle::NullBlockHandle() ||
|
||||
results[i].IsEmpty()) {
|
||||
continue;
|
||||
}
|
||||
results[i].UpdateCachedValue();
|
||||
void* val = results[i].GetValue();
|
||||
if (!val) {
|
||||
// The async cache lookup failed - could be due to an error
|
||||
// or a false positive. We need to read the data block from
|
||||
// the SST file
|
||||
results[i].Reset();
|
||||
total_len += BlockSizeWithTrailer(block_handles[i]);
|
||||
} else {
|
||||
block_handles[i] = BlockHandle::NullBlockHandle();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (total_len) {
|
||||
char* scratch = nullptr;
|
||||
const UncompressionDict& dict = uncompression_dict.GetValue()
|
||||
? *uncompression_dict.GetValue()
|
||||
: UncompressionDict::GetEmptyDict();
|
||||
assert(uncompression_dict_inited || !rep_->uncompression_dict_reader);
|
||||
assert(uncompression_dict_status.ok());
|
||||
// If using direct IO, then scratch is not used, so keep it nullptr.
|
||||
// If the blocks need to be uncompressed and we don't need the
|
||||
// compressed blocks, then we can use a contiguous block of
|
||||
// memory to read in all the blocks as it will be temporary
|
||||
// storage
|
||||
// 1. If blocks are compressed and compressed block cache is there,
|
||||
// alloc heap bufs
|
||||
// 2. If blocks are uncompressed, alloc heap bufs
|
||||
// 3. If blocks are compressed and no compressed block cache, use
|
||||
// stack buf
|
||||
if (!rep_->file->use_direct_io() &&
|
||||
rep_->table_options.block_cache_compressed == nullptr &&
|
||||
rep_->blocks_maybe_compressed) {
|
||||
if (total_len <= kMultiGetReadStackBufSize) {
|
||||
scratch = stack_buf;
|
||||
} else {
|
||||
scratch = new char[total_len];
|
||||
block_buf.reset(scratch);
|
||||
}
|
||||
}
|
||||
RetrieveMultipleBlocks(read_options, &data_block_range, &block_handles,
|
||||
&statuses, &results, scratch, dict);
|
||||
if (sst_file_range.begin()->get_context) {
|
||||
++(sst_file_range.begin()
|
||||
->get_context->get_context_stats_.num_sst_read);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DataBlockIter first_biter;
|
||||
DataBlockIter next_biter;
|
||||
size_t idx_in_batch = 0;
|
||||
SharedCleanablePtr shared_cleanable;
|
||||
for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
|
||||
++miter) {
|
||||
Status s;
|
||||
GetContext* get_context = miter->get_context;
|
||||
const Slice& key = miter->ikey;
|
||||
bool matched = false; // if such user key matched a key in SST
|
||||
bool done = false;
|
||||
bool first_block = true;
|
||||
do {
|
||||
DataBlockIter* biter = nullptr;
|
||||
bool reusing_prev_block;
|
||||
bool later_reused;
|
||||
uint64_t referenced_data_size = 0;
|
||||
bool does_referenced_key_exist = false;
|
||||
BlockCacheLookupContext lookup_data_block_context(
|
||||
TableReaderCaller::kUserMultiGet, tracing_mget_id,
|
||||
/*get_from_user_specified_snapshot=*/read_options.snapshot !=
|
||||
nullptr);
|
||||
if (first_block) {
|
||||
if (!block_handles[idx_in_batch].IsNull() ||
|
||||
!results[idx_in_batch].IsEmpty()) {
|
||||
first_biter.Invalidate(Status::OK());
|
||||
NewDataBlockIterator<DataBlockIter>(
|
||||
read_options, results[idx_in_batch], &first_biter,
|
||||
statuses[idx_in_batch]);
|
||||
reusing_prev_block = false;
|
||||
} else {
|
||||
// If handler is null and result is empty, then the status is never
|
||||
// set, which should be the initial value: ok().
|
||||
assert(statuses[idx_in_batch].ok());
|
||||
reusing_prev_block = true;
|
||||
}
|
||||
biter = &first_biter;
|
||||
later_reused =
|
||||
(reused_mask & (MultiGetContext::Mask{1} << idx_in_batch)) != 0;
|
||||
idx_in_batch++;
|
||||
} else {
|
||||
IndexValue v = iiter->value();
|
||||
if (!v.first_internal_key.empty() && !skip_filters &&
|
||||
UserComparatorWrapper(rep_->internal_comparator.user_comparator())
|
||||
.CompareWithoutTimestamp(
|
||||
ExtractUserKey(key),
|
||||
ExtractUserKey(v.first_internal_key)) < 0) {
|
||||
// The requested key falls between highest key in previous block and
|
||||
// lowest key in current block.
|
||||
break;
|
||||
}
|
||||
|
||||
next_biter.Invalidate(Status::OK());
|
||||
NewDataBlockIterator<DataBlockIter>(
|
||||
read_options, iiter->value().handle, &next_biter,
|
||||
BlockType::kData, get_context, &lookup_data_block_context,
|
||||
Status(), nullptr);
|
||||
biter = &next_biter;
|
||||
reusing_prev_block = false;
|
||||
later_reused = false;
|
||||
}
|
||||
|
||||
if (read_options.read_tier == kBlockCacheTier &&
|
||||
biter->status().IsIncomplete()) {
|
||||
// couldn't get block from block_cache
|
||||
// Update Saver.state to Found because we are only looking for
|
||||
// whether we can guarantee the key is not there when "no_io" is set
|
||||
get_context->MarkKeyMayExist();
|
||||
break;
|
||||
}
|
||||
if (!biter->status().ok()) {
|
||||
s = biter->status();
|
||||
break;
|
||||
}
|
||||
|
||||
bool may_exist = biter->SeekForGet(key);
|
||||
if (!may_exist) {
|
||||
// HashSeek cannot find the key this block and the the iter is not
|
||||
// the end of the block, i.e. cannot be in the following blocks
|
||||
// either. In this case, the seek_key cannot be found, so we break
|
||||
// from the top level for-loop.
|
||||
break;
|
||||
}
|
||||
|
||||
// Reusing blocks complicates pinning/Cleanable, because the cache
|
||||
// entry referenced by biter can only be released once all returned
|
||||
// pinned values are released. This code previously did an extra
|
||||
// block_cache Ref for each reuse, but that unnecessarily increases
|
||||
// block cache contention. Instead we can use a variant of shared_ptr
|
||||
// to release in block cache only once.
|
||||
//
|
||||
// Although the biter loop below might SaveValue multiple times for
|
||||
// merges, just one value_pinner suffices, as MultiGet will merge
|
||||
// the operands before returning to the API user.
|
||||
Cleanable* value_pinner;
|
||||
if (biter->IsValuePinned()) {
|
||||
if (reusing_prev_block) {
|
||||
// Note that we don't yet know if the MultiGet results will need
|
||||
// to pin this block, so we might wrap a block for sharing and
|
||||
// still end up with 1 (or 0) pinning ref. Not ideal but OK.
|
||||
//
|
||||
// Here we avoid adding redundant cleanups if we didn't end up
|
||||
// delegating the cleanup from last time around.
|
||||
if (!biter->HasCleanups()) {
|
||||
assert(shared_cleanable.get());
|
||||
if (later_reused) {
|
||||
shared_cleanable.RegisterCopyWith(biter);
|
||||
} else {
|
||||
shared_cleanable.MoveAsCleanupTo(biter);
|
||||
}
|
||||
}
|
||||
} else if (later_reused) {
|
||||
assert(biter->HasCleanups());
|
||||
// Make the existing cleanups on `biter` sharable:
|
||||
shared_cleanable.Allocate();
|
||||
// Move existing `biter` cleanup(s) to `shared_cleanable`
|
||||
biter->DelegateCleanupsTo(&*shared_cleanable);
|
||||
// Reference `shared_cleanable` as new cleanup for `biter`
|
||||
shared_cleanable.RegisterCopyWith(biter);
|
||||
}
|
||||
assert(biter->HasCleanups());
|
||||
value_pinner = biter;
|
||||
} else {
|
||||
value_pinner = nullptr;
|
||||
}
|
||||
|
||||
// Call the *saver function on each entry/block until it returns false
|
||||
for (; biter->Valid(); biter->Next()) {
|
||||
ParsedInternalKey parsed_key;
|
||||
Status pik_status = ParseInternalKey(
|
||||
biter->key(), &parsed_key, false /* log_err_key */); // TODO
|
||||
if (!pik_status.ok()) {
|
||||
s = pik_status;
|
||||
}
|
||||
if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
|
||||
value_pinner)) {
|
||||
if (get_context->State() == GetContext::GetState::kFound) {
|
||||
does_referenced_key_exist = true;
|
||||
referenced_data_size =
|
||||
biter->key().size() + biter->value().size();
|
||||
}
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
s = biter->status();
|
||||
}
|
||||
// Write the block cache access.
|
||||
// XXX: There appear to be 'break' statements above that bypass this
|
||||
// writing of the block cache trace record
|
||||
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() &&
|
||||
!reusing_prev_block) {
|
||||
// Avoid making copy of block_key, cf_name, and referenced_key when
|
||||
// constructing the access record.
|
||||
Slice referenced_key;
|
||||
if (does_referenced_key_exist) {
|
||||
referenced_key = biter->key();
|
||||
} else {
|
||||
referenced_key = key;
|
||||
}
|
||||
BlockCacheTraceRecord access_record(
|
||||
rep_->ioptions.clock->NowMicros(),
|
||||
/*block_key=*/"", lookup_data_block_context.block_type,
|
||||
lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
|
||||
/*cf_name=*/"", rep_->level_for_tracing(),
|
||||
rep_->sst_number_for_tracing(), lookup_data_block_context.caller,
|
||||
lookup_data_block_context.is_cache_hit,
|
||||
lookup_data_block_context.no_insert,
|
||||
lookup_data_block_context.get_id,
|
||||
lookup_data_block_context.get_from_user_specified_snapshot,
|
||||
/*referenced_key=*/"", referenced_data_size,
|
||||
lookup_data_block_context.num_keys_in_block,
|
||||
does_referenced_key_exist);
|
||||
// TODO: Should handle status here?
|
||||
block_cache_tracer_
|
||||
->WriteBlockAccess(access_record,
|
||||
lookup_data_block_context.block_key,
|
||||
rep_->cf_name_for_tracing(), referenced_key)
|
||||
.PermitUncheckedError();
|
||||
}
|
||||
s = biter->status();
|
||||
if (done) {
|
||||
// Avoid the extra Next which is expensive in two-level indexes
|
||||
break;
|
||||
}
|
||||
if (first_block) {
|
||||
iiter->Seek(key);
|
||||
if (!iiter->Valid()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
first_block = false;
|
||||
iiter->Next();
|
||||
} while (iiter->Valid());
|
||||
|
||||
if (matched && filter != nullptr && !filter->IsBlockBased()) {
|
||||
RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE);
|
||||
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
|
||||
rep_->level);
|
||||
}
|
||||
if (s.ok() && !iiter->status().IsNotFound()) {
|
||||
s = iiter->status();
|
||||
}
|
||||
*(miter->s) = s;
|
||||
}
|
||||
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
// Not sure why we need to do it. Should investigate more.
|
||||
for (auto& st : statuses) {
|
||||
st.PermitUncheckedError();
|
||||
}
|
||||
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
|
||||
}
|
||||
}
|
||||
} // ROCKSDB_NAMESPACE
|
Loading…
Reference in New Issue
Block a user