b931f84e56
Summary: file_reader_writer.h and .cc contain several files and helper function, and it's hard to navigate. Separate it to multiple files and put them under file/ Pull Request resolved: https://github.com/facebook/rocksdb/pull/5803 Test Plan: Build whole project using make and cmake. Differential Revision: D17374550 fbshipit-source-id: 10efca907721e7a78ed25bbf74dc5410dea05987
189 lines
6.5 KiB
C++
189 lines
6.5 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "file/random_access_file_reader.h"
|
|
|
|
#include <algorithm>
|
|
#include <mutex>
|
|
|
|
#include "monitoring/histogram.h"
|
|
#include "monitoring/iostats_context_imp.h"
|
|
#include "port/port.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/random.h"
|
|
#include "util/rate_limiter.h"
|
|
|
|
namespace rocksdb {
|
|
Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
|
|
char* scratch, bool for_compaction) const {
|
|
Status s;
|
|
uint64_t elapsed = 0;
|
|
{
|
|
StopWatch sw(env_, stats_, hist_type_,
|
|
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
|
|
true /*delay_enabled*/);
|
|
auto prev_perf_level = GetPerfLevel();
|
|
IOSTATS_TIMER_GUARD(read_nanos);
|
|
if (use_direct_io()) {
|
|
#ifndef ROCKSDB_LITE
|
|
size_t alignment = file_->GetRequiredBufferAlignment();
|
|
size_t aligned_offset =
|
|
TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
|
|
size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
|
|
size_t read_size =
|
|
Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
|
|
AlignedBuffer buf;
|
|
buf.Alignment(alignment);
|
|
buf.AllocateNewBuffer(read_size);
|
|
while (buf.CurrentSize() < read_size) {
|
|
size_t allowed;
|
|
if (for_compaction && rate_limiter_ != nullptr) {
|
|
allowed = rate_limiter_->RequestToken(
|
|
buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
|
|
Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
|
|
} else {
|
|
assert(buf.CurrentSize() == 0);
|
|
allowed = read_size;
|
|
}
|
|
Slice tmp;
|
|
|
|
FileOperationInfo::TimePoint start_ts;
|
|
uint64_t orig_offset = 0;
|
|
if (ShouldNotifyListeners()) {
|
|
start_ts = std::chrono::system_clock::now();
|
|
orig_offset = aligned_offset + buf.CurrentSize();
|
|
}
|
|
{
|
|
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
|
|
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp,
|
|
buf.Destination());
|
|
}
|
|
if (ShouldNotifyListeners()) {
|
|
auto finish_ts = std::chrono::system_clock::now();
|
|
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
|
|
s);
|
|
}
|
|
|
|
buf.Size(buf.CurrentSize() + tmp.size());
|
|
if (!s.ok() || tmp.size() < allowed) {
|
|
break;
|
|
}
|
|
}
|
|
size_t res_len = 0;
|
|
if (s.ok() && offset_advance < buf.CurrentSize()) {
|
|
res_len = buf.Read(scratch, offset_advance,
|
|
std::min(buf.CurrentSize() - offset_advance, n));
|
|
}
|
|
*result = Slice(scratch, res_len);
|
|
#endif // !ROCKSDB_LITE
|
|
} else {
|
|
size_t pos = 0;
|
|
const char* res_scratch = nullptr;
|
|
while (pos < n) {
|
|
size_t allowed;
|
|
if (for_compaction && rate_limiter_ != nullptr) {
|
|
if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
|
|
sw.DelayStart();
|
|
}
|
|
allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
|
|
Env::IOPriority::IO_LOW, stats_,
|
|
RateLimiter::OpType::kRead);
|
|
if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
|
|
sw.DelayStop();
|
|
}
|
|
} else {
|
|
allowed = n;
|
|
}
|
|
Slice tmp_result;
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
FileOperationInfo::TimePoint start_ts;
|
|
if (ShouldNotifyListeners()) {
|
|
start_ts = std::chrono::system_clock::now();
|
|
}
|
|
#endif
|
|
{
|
|
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
|
|
s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos);
|
|
}
|
|
#ifndef ROCKSDB_LITE
|
|
if (ShouldNotifyListeners()) {
|
|
auto finish_ts = std::chrono::system_clock::now();
|
|
NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
|
|
finish_ts, s);
|
|
}
|
|
#endif
|
|
|
|
if (res_scratch == nullptr) {
|
|
// we can't simply use `scratch` because reads of mmap'd files return
|
|
// data in a different buffer.
|
|
res_scratch = tmp_result.data();
|
|
} else {
|
|
// make sure chunks are inserted contiguously into `res_scratch`.
|
|
assert(tmp_result.data() == res_scratch + pos);
|
|
}
|
|
pos += tmp_result.size();
|
|
if (!s.ok() || tmp_result.size() < allowed) {
|
|
break;
|
|
}
|
|
}
|
|
*result = Slice(res_scratch, s.ok() ? pos : 0);
|
|
}
|
|
IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
|
|
SetPerfLevel(prev_perf_level);
|
|
}
|
|
if (stats_ != nullptr && file_read_hist_ != nullptr) {
|
|
file_read_hist_->Add(elapsed);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status RandomAccessFileReader::MultiRead(ReadRequest* read_reqs,
|
|
size_t num_reqs) const {
|
|
Status s;
|
|
uint64_t elapsed = 0;
|
|
assert(!use_direct_io());
|
|
{
|
|
StopWatch sw(env_, stats_, hist_type_,
|
|
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
|
|
true /*delay_enabled*/);
|
|
auto prev_perf_level = GetPerfLevel();
|
|
IOSTATS_TIMER_GUARD(read_nanos);
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
FileOperationInfo::TimePoint start_ts;
|
|
if (ShouldNotifyListeners()) {
|
|
start_ts = std::chrono::system_clock::now();
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
{
|
|
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
|
|
s = file_->MultiRead(read_reqs, num_reqs);
|
|
}
|
|
for (size_t i = 0; i < num_reqs; ++i) {
|
|
#ifndef ROCKSDB_LITE
|
|
if (ShouldNotifyListeners()) {
|
|
auto finish_ts = std::chrono::system_clock::now();
|
|
NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
|
|
start_ts, finish_ts, read_reqs[i].status);
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
|
|
}
|
|
SetPerfLevel(prev_perf_level);
|
|
}
|
|
if (stats_ != nullptr && file_read_hist_ != nullptr) {
|
|
file_read_hist_->Add(elapsed);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
} // namespace rocksdb
|