04b2c16f9b
Summary: After introducing adaptive_readahead, the original flow got broken. Readahead size was set to 0 because of which rocksdb wasn't be able to do automatic prefetching which it enables after seeing sequential reads. This PR fixes it. ---------------------------------------------------------------------------------------------------- Before this patch: b_bench -use_existing_db=true -db=/tmp/prefix_scan -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 6.27 Date: Tue Nov 30 11:56:50 2021 CPU: 24 * Intel Core Processor (Broadwell) CPUCache: 16384 KB Keys: 32 bytes each (+ 0 bytes user-defined timestamp) Values: 512 bytes each (256 bytes after compression) Entries: 5000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 2594.0 MB (estimated) FileSize: 1373.3 MB (estimated) Write rate: 0 bytes/second Read rate: 0 ops/second Compression: Snappy Compression sampling rate: 0 Memtablerep: SkipListFactory Perf Level: 1 WARNING: Assertions are enabled; benchmarks unnecessarily slow ------------------------------------------------ DB path: [/tmp/prefix_scan] seekrandom : 5356367.174 micros/op 0 ops/sec; 29.4 MB/s (23 of 23 found) ---------------------------------------------------------------------------------------------------- After the patch: ./db_bench -use_existing_db=true -db=/tmp/prefix_scan -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 6.27 Date: Tue Nov 30 14:38:33 2021 CPU: 24 * Intel Core Processor (Broadwell) CPUCache: 16384 KB Keys: 32 bytes each (+ 0 bytes user-defined timestamp) Values: 512 bytes each (256 bytes after compression) Entries: 5000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 2594.0 MB (estimated) FileSize: 1373.3 MB (estimated) Write rate: 0 bytes/second Read rate: 0 ops/second Compression: Snappy Compression sampling rate: 0 Memtablerep: SkipListFactory Perf Level: 1 WARNING: Assertions are enabled; benchmarks unnecessarily slow ------------------------------------------------ DB path: [/tmp/prefix_scan] seekrandom : 456504.277 micros/op 2 ops/sec; 359.8 MB/s (264 of 264 found) Pull Request resolved: https://github.com/facebook/rocksdb/pull/9234 Test Plan: Ran ./db_bench -db=/data/mysql/rocksdb/prefix_scan -benchmarks="fillseq" -key_size=32 -value_size=512 -num=5000000 -use_d irect_io_for_flush_and_compaction=true -target_file_size_base=16777216 and then ./db_bench -use_existing_db=true -db=/data/mysql/rocksdb/prefix_scan -benchmarks="seekrandom" -key_size=32 -value_siz e=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 and compared the results. Reviewed By: anand1976 Differential Revision: D32743965 Pulled By: akankshamahajan15 fbshipit-source-id: b950fba68c91963b7deb5c20acdf471bc60251f5
177 lines
6.7 KiB
C++
177 lines
6.7 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "file/file_prefetch_buffer.h"
|
|
|
|
#include <algorithm>
|
|
#include <mutex>
|
|
|
|
#include "file/random_access_file_reader.h"
|
|
#include "monitoring/histogram.h"
|
|
#include "monitoring/iostats_context_imp.h"
|
|
#include "port/port.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/random.h"
|
|
#include "util/rate_limiter.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
|
|
RandomAccessFileReader* reader,
|
|
uint64_t offset, size_t n,
|
|
bool for_compaction) {
|
|
if (!enable_ || reader == nullptr) {
|
|
return Status::OK();
|
|
}
|
|
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
|
|
size_t alignment = reader->file()->GetRequiredBufferAlignment();
|
|
size_t offset_ = static_cast<size_t>(offset);
|
|
uint64_t rounddown_offset = Rounddown(offset_, alignment);
|
|
uint64_t roundup_end = Roundup(offset_ + n, alignment);
|
|
uint64_t roundup_len = roundup_end - rounddown_offset;
|
|
assert(roundup_len >= alignment);
|
|
assert(roundup_len % alignment == 0);
|
|
|
|
// Check if requested bytes are in the existing buffer_.
|
|
// If all bytes exist -- return.
|
|
// If only a few bytes exist -- reuse them & read only what is really needed.
|
|
// This is typically the case of incremental reading of data.
|
|
// If no bytes exist in buffer -- full pread.
|
|
|
|
Status s;
|
|
uint64_t chunk_offset_in_buffer = 0;
|
|
uint64_t chunk_len = 0;
|
|
bool copy_data_to_new_buffer = false;
|
|
if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ &&
|
|
offset <= buffer_offset_ + buffer_.CurrentSize()) {
|
|
if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) {
|
|
// All requested bytes are already in the buffer. So no need to Read
|
|
// again.
|
|
return s;
|
|
} else {
|
|
// Only a few requested bytes are in the buffer. memmove those chunk of
|
|
// bytes to the beginning, and memcpy them back into the new buffer if a
|
|
// new buffer is created.
|
|
chunk_offset_in_buffer =
|
|
Rounddown(static_cast<size_t>(offset - buffer_offset_), alignment);
|
|
chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer;
|
|
assert(chunk_offset_in_buffer % alignment == 0);
|
|
assert(chunk_len % alignment == 0);
|
|
assert(chunk_offset_in_buffer + chunk_len <=
|
|
buffer_offset_ + buffer_.CurrentSize());
|
|
if (chunk_len > 0) {
|
|
copy_data_to_new_buffer = true;
|
|
} else {
|
|
// this reset is not necessary, but just to be safe.
|
|
chunk_offset_in_buffer = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create a new buffer only if current capacity is not sufficient, and memcopy
|
|
// bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
|
|
if (buffer_.Capacity() < roundup_len) {
|
|
buffer_.Alignment(alignment);
|
|
buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len),
|
|
copy_data_to_new_buffer, chunk_offset_in_buffer,
|
|
static_cast<size_t>(chunk_len));
|
|
} else if (chunk_len > 0) {
|
|
// New buffer not needed. But memmove bytes from tail to the beginning since
|
|
// chunk_len is greater than 0.
|
|
buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
|
|
static_cast<size_t>(chunk_len));
|
|
}
|
|
|
|
Slice result;
|
|
size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
|
|
s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result,
|
|
buffer_.BufferStart() + chunk_len, nullptr, for_compaction);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
if (result.size() < read_len) {
|
|
// Fake an IO error to force db_stress fault injection to ignore
|
|
// truncated read errors
|
|
IGNORE_STATUS_IF_ERROR(Status::IOError());
|
|
}
|
|
#endif
|
|
buffer_offset_ = rounddown_offset;
|
|
buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
|
|
return s;
|
|
}
|
|
|
|
bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
|
|
RandomAccessFileReader* reader,
|
|
uint64_t offset, size_t n,
|
|
Slice* result, Status* status,
|
|
bool for_compaction) {
|
|
if (track_min_offset_ && offset < min_offset_read_) {
|
|
min_offset_read_ = static_cast<size_t>(offset);
|
|
}
|
|
if (!enable_ || offset < buffer_offset_) {
|
|
return false;
|
|
}
|
|
|
|
// If the buffer contains only a few of the requested bytes:
|
|
// If readahead is enabled: prefetch the remaining bytes + readahead bytes
|
|
// and satisfy the request.
|
|
// If readahead is not enabled: return false.
|
|
TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
|
|
&readahead_size_);
|
|
if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
|
|
if (readahead_size_ > 0) {
|
|
assert(reader != nullptr);
|
|
assert(max_readahead_size_ >= readahead_size_);
|
|
Status s;
|
|
if (for_compaction) {
|
|
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_),
|
|
for_compaction);
|
|
} else {
|
|
if (implicit_auto_readahead_) {
|
|
// Prefetch only if this read is sequential otherwise reset
|
|
// readahead_size_ to initial value.
|
|
if (!IsBlockSequential(offset)) {
|
|
UpdateReadPattern(offset, n);
|
|
ResetValues();
|
|
// Ignore status as Prefetch is not called.
|
|
s.PermitUncheckedError();
|
|
return false;
|
|
}
|
|
num_file_reads_++;
|
|
if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) {
|
|
UpdateReadPattern(offset, n);
|
|
// Ignore status as Prefetch is not called.
|
|
s.PermitUncheckedError();
|
|
return false;
|
|
}
|
|
}
|
|
s = Prefetch(opts, reader, offset, n + readahead_size_, for_compaction);
|
|
}
|
|
if (!s.ok()) {
|
|
if (status) {
|
|
*status = s;
|
|
}
|
|
#ifndef NDEBUG
|
|
IGNORE_STATUS_IF_ERROR(s);
|
|
#endif
|
|
return false;
|
|
}
|
|
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
UpdateReadPattern(offset, n);
|
|
uint64_t offset_in_buffer = offset - buffer_offset_;
|
|
*result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
|
|
return true;
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|