rocksdb/file/file_prefetch_buffer.cc
Andrew Kryczka babe56ddba Add rate limiter priority to ReadOptions (#9424)
Summary:
Users can set the priority for file reads associated with their operation by setting `ReadOptions::rate_limiter_priority` to something other than `Env::IO_TOTAL`. Rate limiting `VerifyChecksum()` and `VerifyFileChecksums()` is the motivation for this PR, so it also includes benchmarks and minor bug fixes to get that working.

`RandomAccessFileReader::Read()` already had support for rate limiting compaction reads. I changed that rate limiting to be non-specific to compaction, but rather performed according to the passed in `Env::IOPriority`. Now the compaction read rate limiting is supported by setting `rate_limiter_priority = Env::IO_LOW` on its `ReadOptions`.

There is no default value for the new `Env::IOPriority` parameter to `RandomAccessFileReader::Read()`. That means this PR goes through all callers (in some cases multiple layers up the call stack) to find a `ReadOptions` to provide the priority. There are TODOs for cases I believe it would be good to let user control the priority some day (e.g., file footer reads), and no TODO in cases I believe it doesn't matter (e.g., trace file reads).

The API doc only lists the missing cases where a file read associated with a provided `ReadOptions` cannot be rate limited. For cases like file ingestion checksum calculation, there is no API to provide `ReadOptions` or `Env::IOPriority`, so I didn't count that as missing.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9424

Test Plan:
- new unit tests
- new benchmarks on ~50MB database with 1MB/s read rate limit and 100ms refill interval; verified with strace reads are chunked (at 0.1MB per chunk) and spaced roughly 100ms apart.
  - setup command: `./db_bench -benchmarks=fillrandom,compact -db=/tmp/testdb -target_file_size_base=1048576 -disable_auto_compactions=true -file_checksum=true`
  - benchmarks command: `strace -ttfe pread64 ./db_bench -benchmarks=verifychecksum,verifyfilechecksums -use_existing_db=true -db=/tmp/testdb -rate_limiter_bytes_per_sec=1048576 -rate_limit_bg_reads=1 -rate_limit_user_ops=true -file_checksum=true`
- crash test using IO_USER priority on non-validation reads with https://github.com/facebook/rocksdb/issues/9567 reverted: `python3 tools/db_crashtest.py blackbox --max_key=1000000 --write_buffer_size=524288 --target_file_size_base=524288 --level_compaction_dynamic_level_bytes=true --duration=3600 --rate_limit_bg_reads=true --rate_limit_user_ops=true --rate_limiter_bytes_per_sec=10485760 --interval=10`

Reviewed By: hx235

Differential Revision: D33747386

Pulled By: ajkr

fbshipit-source-id: a2d985e97912fba8c54763798e04f006ccc56e0c
2022-02-16 23:18:14 -08:00

180 lines
6.8 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "file/file_prefetch_buffer.h"
#include <algorithm>
#include <mutex>
#include "file/random_access_file_reader.h"
#include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "test_util/sync_point.h"
#include "util/random.h"
#include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Env::IOPriority rate_limiter_priority) {
if (!enable_ || reader == nullptr) {
return Status::OK();
}
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
size_t alignment = reader->file()->GetRequiredBufferAlignment();
size_t offset_ = static_cast<size_t>(offset);
uint64_t rounddown_offset = Rounddown(offset_, alignment);
uint64_t roundup_end = Roundup(offset_ + n, alignment);
uint64_t roundup_len = roundup_end - rounddown_offset;
assert(roundup_len >= alignment);
assert(roundup_len % alignment == 0);
// Check if requested bytes are in the existing buffer_.
// If all bytes exist -- return.
// If only a few bytes exist -- reuse them & read only what is really needed.
// This is typically the case of incremental reading of data.
// If no bytes exist in buffer -- full pread.
Status s;
uint64_t chunk_offset_in_buffer = 0;
uint64_t chunk_len = 0;
bool copy_data_to_new_buffer = false;
if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ &&
offset <= buffer_offset_ + buffer_.CurrentSize()) {
if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) {
// All requested bytes are already in the buffer. So no need to Read
// again.
return s;
} else {
// Only a few requested bytes are in the buffer. memmove those chunk of
// bytes to the beginning, and memcpy them back into the new buffer if a
// new buffer is created.
chunk_offset_in_buffer =
Rounddown(static_cast<size_t>(offset - buffer_offset_), alignment);
chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer;
assert(chunk_offset_in_buffer % alignment == 0);
assert(chunk_len % alignment == 0);
assert(chunk_offset_in_buffer + chunk_len <=
buffer_offset_ + buffer_.CurrentSize());
if (chunk_len > 0) {
copy_data_to_new_buffer = true;
} else {
// this reset is not necessary, but just to be safe.
chunk_offset_in_buffer = 0;
}
}
}
// Create a new buffer only if current capacity is not sufficient, and memcopy
// bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
if (buffer_.Capacity() < roundup_len) {
buffer_.Alignment(alignment);
buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len),
copy_data_to_new_buffer, chunk_offset_in_buffer,
static_cast<size_t>(chunk_len));
} else if (chunk_len > 0) {
// New buffer not needed. But memmove bytes from tail to the beginning since
// chunk_len is greater than 0.
buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
static_cast<size_t>(chunk_len));
}
Slice result;
size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result,
buffer_.BufferStart() + chunk_len, nullptr,
rate_limiter_priority);
if (!s.ok()) {
return s;
}
#ifndef NDEBUG
if (result.size() < read_len) {
// Fake an IO error to force db_stress fault injection to ignore
// truncated read errors
IGNORE_STATUS_IF_ERROR(Status::IOError());
}
#endif
buffer_offset_ = rounddown_offset;
buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
return s;
}
bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Slice* result, Status* status,
Env::IOPriority rate_limiter_priority,
bool for_compaction /* = false */) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
if (!enable_ || offset < buffer_offset_) {
return false;
}
// If the buffer contains only a few of the requested bytes:
// If readahead is enabled: prefetch the remaining bytes + readahead bytes
// and satisfy the request.
// If readahead is not enabled: return false.
TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
&readahead_size_);
if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
if (readahead_size_ > 0) {
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);
Status s;
if (for_compaction) {
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_),
rate_limiter_priority);
} else {
if (implicit_auto_readahead_) {
// Prefetch only if this read is sequential otherwise reset
// readahead_size_ to initial value.
if (!IsBlockSequential(offset)) {
UpdateReadPattern(offset, n);
ResetValues();
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
num_file_reads_++;
if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) {
UpdateReadPattern(offset, n);
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
}
s = Prefetch(opts, reader, offset, n + readahead_size_,
rate_limiter_priority);
}
if (!s.ok()) {
if (status) {
*status = s;
}
#ifndef NDEBUG
IGNORE_STATUS_IF_ERROR(s);
#endif
return false;
}
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
} else {
return false;
}
}
UpdateReadPattern(offset, n);
uint64_t offset_in_buffer = offset - buffer_offset_;
*result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
return true;
}
} // namespace ROCKSDB_NAMESPACE