Summary: This reverts commit 7d503e66a9a9d9bdf17e2d35038bfe8f3e12f5dc. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7505 Reviewed By: ajkr Differential Revision: D24100875 Pulled By: ltamasi fbshipit-source-id: 8705e3e6e8be4b4fd175ffdb031baa6530b61151
This commit is contained in:
parent
758ead5df7
commit
5d16325ce3
@ -91,19 +91,17 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
|
|||||||
size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
|
size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
|
||||||
s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result,
|
s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result,
|
||||||
buffer_.BufferStart() + chunk_len, nullptr, for_compaction);
|
buffer_.BufferStart() + chunk_len, nullptr, for_compaction);
|
||||||
if (!s.ok()) {
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
if (result.size() < read_len) {
|
if (!s.ok() || result.size() < read_len) {
|
||||||
// Fake an IO error to force db_stress fault injection to ignore
|
// Fake an IO error to force db_stress fault injection to ignore
|
||||||
// truncated read errors
|
// truncated read errors
|
||||||
IGNORE_STATUS_IF_ERROR(Status::IOError());
|
IGNORE_STATUS_IF_ERROR(Status::IOError());
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
buffer_offset_ = rounddown_offset;
|
if (s.ok()) {
|
||||||
buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
|
buffer_offset_ = rounddown_offset;
|
||||||
|
buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
|
||||||
|
}
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1038,16 +1038,13 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
|
|||||||
auto filter = new_table->CreateFilterBlockReader(
|
auto filter = new_table->CreateFilterBlockReader(
|
||||||
ro, prefetch_buffer, use_cache, prefetch_filter, pin_filter,
|
ro, prefetch_buffer, use_cache, prefetch_filter, pin_filter,
|
||||||
lookup_context);
|
lookup_context);
|
||||||
|
|
||||||
if (filter) {
|
if (filter) {
|
||||||
rep_->filter = std::move(filter);
|
|
||||||
// Refer to the comment above about paritioned indexes always being cached
|
// Refer to the comment above about paritioned indexes always being cached
|
||||||
if (prefetch_all) {
|
if (prefetch_all) {
|
||||||
s = rep_->filter->CacheDependencies(ro, pin_all);
|
filter->CacheDependencies(ro, pin_all);
|
||||||
if (!s.ok()) {
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rep_->filter = std::move(filter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,9 +153,7 @@ class FilterBlockReader {
|
|||||||
return error_msg;
|
return error_msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Status CacheDependencies(const ReadOptions& /*ro*/, bool /*pin*/) {
|
virtual void CacheDependencies(const ReadOptions& /*ro*/, bool /*pin*/) {}
|
||||||
return Status::OK();
|
|
||||||
}
|
|
||||||
|
|
||||||
virtual bool RangeMayExist(const Slice* /*iterate_upper_bound*/,
|
virtual bool RangeMayExist(const Slice* /*iterate_upper_bound*/,
|
||||||
const Slice& user_key,
|
const Slice& user_key,
|
||||||
|
@ -412,8 +412,8 @@ size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO(myabandeh): merge this with the same function in IndexReader
|
// TODO(myabandeh): merge this with the same function in IndexReader
|
||||||
Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
|
void PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
|
||||||
bool pin) {
|
bool pin) {
|
||||||
assert(table());
|
assert(table());
|
||||||
|
|
||||||
const BlockBasedTable::Rep* const rep = table()->get_rep();
|
const BlockBasedTable::Rep* const rep = table()->get_rep();
|
||||||
@ -426,11 +426,12 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
|
|||||||
Status s = GetOrReadFilterBlock(false /* no_io */, nullptr /* get_context */,
|
Status s = GetOrReadFilterBlock(false /* no_io */, nullptr /* get_context */,
|
||||||
&lookup_context, &filter_block);
|
&lookup_context, &filter_block);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ROCKS_LOG_ERROR(rep->ioptions.info_log,
|
ROCKS_LOG_WARN(rep->ioptions.info_log,
|
||||||
"Error retrieving top-level filter block while trying to "
|
"Error retrieving top-level filter block while trying to "
|
||||||
"cache filter partitions: %s",
|
"cache filter partitions: %s",
|
||||||
s.ToString().c_str());
|
s.ToString().c_str());
|
||||||
return s;
|
IGNORE_STATUS_IF_ERROR(s);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Before read partitions, prefetch them to avoid lots of IOs
|
// Before read partitions, prefetch them to avoid lots of IOs
|
||||||
@ -464,9 +465,6 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
|
|||||||
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
|
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
|
||||||
static_cast<size_t>(prefetch_len));
|
static_cast<size_t>(prefetch_len));
|
||||||
}
|
}
|
||||||
if (!s.ok()) {
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
|
|
||||||
// After prefetch, read the partitions one by one
|
// After prefetch, read the partitions one by one
|
||||||
for (biter.SeekToFirst(); biter.Valid(); biter.Next()) {
|
for (biter.SeekToFirst(); biter.Valid(); biter.Next()) {
|
||||||
@ -479,20 +477,17 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
|
|||||||
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
|
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
|
||||||
&block, BlockType::kFilter, nullptr /* get_context */, &lookup_context,
|
&block, BlockType::kFilter, nullptr /* get_context */, &lookup_context,
|
||||||
nullptr /* contents */);
|
nullptr /* contents */);
|
||||||
if (!s.ok()) {
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
assert(s.ok() || block.GetValue() == nullptr);
|
|
||||||
|
|
||||||
if (block.GetValue() != nullptr) {
|
assert(s.ok() || block.GetValue() == nullptr);
|
||||||
|
if (s.ok() && block.GetValue() != nullptr) {
|
||||||
if (block.IsCached()) {
|
if (block.IsCached()) {
|
||||||
if (pin) {
|
if (pin) {
|
||||||
filter_map_[handle.offset()] = std::move(block);
|
filter_map_[handle.offset()] = std::move(block);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
IGNORE_STATUS_IF_ERROR(s);
|
||||||
}
|
}
|
||||||
return biter.status();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const InternalKeyComparator* PartitionedFilterBlockReader::internal_comparator()
|
const InternalKeyComparator* PartitionedFilterBlockReader::internal_comparator()
|
||||||
|
@ -130,7 +130,7 @@ class PartitionedFilterBlockReader : public FilterBlockReaderCommon<Block> {
|
|||||||
uint64_t block_offset, BlockHandle filter_handle,
|
uint64_t block_offset, BlockHandle filter_handle,
|
||||||
bool no_io, BlockCacheLookupContext* lookup_context,
|
bool no_io, BlockCacheLookupContext* lookup_context,
|
||||||
FilterManyFunction filter_function) const;
|
FilterManyFunction filter_function) const;
|
||||||
Status CacheDependencies(const ReadOptions& ro, bool pin) override;
|
void CacheDependencies(const ReadOptions& ro, bool pin) override;
|
||||||
|
|
||||||
const InternalKeyComparator* internal_comparator() const;
|
const InternalKeyComparator* internal_comparator() const;
|
||||||
bool index_key_includes_seq() const;
|
bool index_key_includes_seq() const;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user