a0e0feca62
Summary: BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future and this is valid for forward sequential scans. But BlockPrefetcher tracks only num_file_reads_ and not if reads are sequential. This presents problem for MultiGet with large number of keys when it reseeks index iterator and data block. FilePrefetchBuffer can end up doing large readahead for reseeks as readahead size increases exponentially once readahead is enabled. Same issue is with BlockBasedTableIterator. Add previous length and offset read as well in BlockPrefetcher (creates FilePrefetchBuffer) and FilePrefetchBuffer (does prefetching of data) to determine if reads are sequential and then prefetch. Update the last block read after cache hit to take reads from cache also in account. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7394 Test Plan: Add new unit test case Reviewed By: anand1976 Differential Revision: D23737617 Pulled By: akankshamahajan15 fbshipit-source-id: 8e6917c25ed87b285ee495d1b68dc623d71205a3
175 lines
6.5 KiB
C++
175 lines
6.5 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "file/file_prefetch_buffer.h"
|
|
|
|
#include <algorithm>
|
|
#include <mutex>
|
|
|
|
#include "file/random_access_file_reader.h"
|
|
#include "monitoring/histogram.h"
|
|
#include "monitoring/iostats_context_imp.h"
|
|
#include "port/port.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/random.h"
|
|
#include "util/rate_limiter.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
|
|
RandomAccessFileReader* reader,
|
|
uint64_t offset, size_t n,
|
|
bool for_compaction) {
|
|
if (!enable_ || reader == nullptr) {
|
|
return Status::OK();
|
|
}
|
|
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
|
|
size_t alignment = reader->file()->GetRequiredBufferAlignment();
|
|
size_t offset_ = static_cast<size_t>(offset);
|
|
uint64_t rounddown_offset = Rounddown(offset_, alignment);
|
|
uint64_t roundup_end = Roundup(offset_ + n, alignment);
|
|
uint64_t roundup_len = roundup_end - rounddown_offset;
|
|
assert(roundup_len >= alignment);
|
|
assert(roundup_len % alignment == 0);
|
|
|
|
// Check if requested bytes are in the existing buffer_.
|
|
// If all bytes exist -- return.
|
|
// If only a few bytes exist -- reuse them & read only what is really needed.
|
|
// This is typically the case of incremental reading of data.
|
|
// If no bytes exist in buffer -- full pread.
|
|
|
|
Status s;
|
|
uint64_t chunk_offset_in_buffer = 0;
|
|
uint64_t chunk_len = 0;
|
|
bool copy_data_to_new_buffer = false;
|
|
if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ &&
|
|
offset <= buffer_offset_ + buffer_.CurrentSize()) {
|
|
if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) {
|
|
// All requested bytes are already in the buffer. So no need to Read
|
|
// again.
|
|
return s;
|
|
} else {
|
|
// Only a few requested bytes are in the buffer. memmove those chunk of
|
|
// bytes to the beginning, and memcpy them back into the new buffer if a
|
|
// new buffer is created.
|
|
chunk_offset_in_buffer =
|
|
Rounddown(static_cast<size_t>(offset - buffer_offset_), alignment);
|
|
chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer;
|
|
assert(chunk_offset_in_buffer % alignment == 0);
|
|
assert(chunk_len % alignment == 0);
|
|
assert(chunk_offset_in_buffer + chunk_len <=
|
|
buffer_offset_ + buffer_.CurrentSize());
|
|
if (chunk_len > 0) {
|
|
copy_data_to_new_buffer = true;
|
|
} else {
|
|
// this reset is not necessary, but just to be safe.
|
|
chunk_offset_in_buffer = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create a new buffer only if current capacity is not sufficient, and memcopy
|
|
// bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
|
|
if (buffer_.Capacity() < roundup_len) {
|
|
buffer_.Alignment(alignment);
|
|
buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len),
|
|
copy_data_to_new_buffer, chunk_offset_in_buffer,
|
|
static_cast<size_t>(chunk_len));
|
|
} else if (chunk_len > 0) {
|
|
// New buffer not needed. But memmove bytes from tail to the beginning since
|
|
// chunk_len is greater than 0.
|
|
buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
|
|
static_cast<size_t>(chunk_len));
|
|
}
|
|
|
|
Slice result;
|
|
size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
|
|
s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result,
|
|
buffer_.BufferStart() + chunk_len, nullptr, for_compaction);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
if (result.size() < read_len) {
|
|
// Fake an IO error to force db_stress fault injection to ignore
|
|
// truncated read errors
|
|
IGNORE_STATUS_IF_ERROR(Status::IOError());
|
|
}
|
|
#endif
|
|
buffer_offset_ = rounddown_offset;
|
|
buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
|
|
return s;
|
|
}
|
|
|
|
bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
|
|
uint64_t offset, size_t n,
|
|
Slice* result, Status* status,
|
|
bool for_compaction) {
|
|
if (track_min_offset_ && offset < min_offset_read_) {
|
|
min_offset_read_ = static_cast<size_t>(offset);
|
|
}
|
|
if (!enable_ || offset < buffer_offset_) {
|
|
return false;
|
|
}
|
|
|
|
// If the buffer contains only a few of the requested bytes:
|
|
// If readahead is enabled: prefetch the remaining bytes + readahead bytes
|
|
// and satisfy the request.
|
|
// If readahead is not enabled: return false.
|
|
if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
|
|
if (readahead_size_ > 0) {
|
|
assert(file_reader_ != nullptr);
|
|
assert(max_readahead_size_ >= readahead_size_);
|
|
Status s;
|
|
if (for_compaction) {
|
|
s = Prefetch(opts, file_reader_, offset, std::max(n, readahead_size_),
|
|
for_compaction);
|
|
} else {
|
|
if (implicit_auto_readahead_) {
|
|
// Prefetch only if this read is sequential otherwise reset
|
|
// readahead_size_ to initial value.
|
|
if (!IsBlockSequential(offset)) {
|
|
UpdateReadPattern(offset, n);
|
|
ResetValues();
|
|
// Ignore status as Prefetch is not called.
|
|
s.PermitUncheckedError();
|
|
return false;
|
|
}
|
|
num_file_reads_++;
|
|
if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) {
|
|
UpdateReadPattern(offset, n);
|
|
// Ignore status as Prefetch is not called.
|
|
s.PermitUncheckedError();
|
|
return false;
|
|
}
|
|
}
|
|
s = Prefetch(opts, file_reader_, offset, n + readahead_size_,
|
|
for_compaction);
|
|
}
|
|
if (!s.ok()) {
|
|
if (status) {
|
|
*status = s;
|
|
}
|
|
#ifndef NDEBUG
|
|
IGNORE_STATUS_IF_ERROR(s);
|
|
#endif
|
|
return false;
|
|
}
|
|
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
UpdateReadPattern(offset, n);
|
|
uint64_t offset_in_buffer = offset - buffer_offset_;
|
|
*result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
|
|
return true;
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|