rocksdb/file/random_access_file_reader.h
wenh 226d1f9c73 extend listener callback functions to more file I/O operations (#7055)
Summary:
Currently, `EventListener` in listner.h only have callback functions for file read and write. One may favor extended callback functions for more file I/O operations like flush, sync and close. This PR tries to add those interface and have them called when appropriate throughout the code base.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7055

Test Plan:
Write an experimental listener with those new callback functions with log output in them; run experiments and check logs to see those functions are actually called.
Default test suits `make check` should also be included.

Reviewed By: riversand963

Differential Revision: D22380624

Pulled By: roghnin

fbshipit-source-id: 4121491d45c2c2aae8c255e7998090559a241c6a
2020-07-07 18:21:18 -07:00

144 lines
5.1 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <sstream>
#include <string>
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/listener.h"
#include "rocksdb/options.h"
#include "rocksdb/rate_limiter.h"
#include "util/aligned_buffer.h"
namespace ROCKSDB_NAMESPACE {
class Statistics;
class HistogramImpl;
using AlignedBuf = std::unique_ptr<char[]>;
// RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is
// responsible for:
// - Handling Buffered and Direct reads appropriately.
// - Rate limiting compaction reads.
// - Notifying any interested listeners on the completion of a read.
// - Updating IO stats.
class RandomAccessFileReader {
private:
#ifndef ROCKSDB_LITE
void NotifyOnFileReadFinish(uint64_t offset, size_t length,
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const Status& status) const {
FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts,
finish_ts, status);
info.offset = offset;
info.length = length;
for (auto& listener : listeners_) {
listener->OnFileReadFinish(info);
}
}
#endif // ROCKSDB_LITE
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
std::unique_ptr<FSRandomAccessFile> file_;
std::string file_name_;
Env* env_;
Statistics* stats_;
uint32_t hist_type_;
HistogramImpl* file_read_hist_;
RateLimiter* rate_limiter_;
std::vector<std::shared_ptr<EventListener>> listeners_;
public:
explicit RandomAccessFileReader(
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
Env* _env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr,
RateLimiter* rate_limiter = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: file_(std::move(raf)),
file_name_(std::move(_file_name)),
env_(_env),
stats_(stats),
hist_type_(hist_type),
file_read_hist_(file_read_hist),
rate_limiter_(rate_limiter),
listeners_() {
#ifndef ROCKSDB_LITE
std::for_each(listeners.begin(), listeners.end(),
[this](const std::shared_ptr<EventListener>& e) {
if (e->ShouldBeNotifiedOnFileIO()) {
listeners_.emplace_back(e);
}
});
#else // !ROCKSDB_LITE
(void)listeners;
#endif
}
RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
RandomAccessFileReader& operator=(RandomAccessFileReader&& o)
ROCKSDB_NOEXCEPT {
file_ = std::move(o.file_);
env_ = std::move(o.env_);
stats_ = std::move(o.stats_);
hist_type_ = std::move(o.hist_type_);
file_read_hist_ = std::move(o.file_read_hist_);
rate_limiter_ = std::move(o.rate_limiter_);
return *this;
}
RandomAccessFileReader(const RandomAccessFileReader&) = delete;
RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;
// In non-direct IO mode,
// 1. if using mmap, result is stored in a buffer other than scratch;
// 2. if not using mmap, result is stored in the buffer starting from scratch.
//
// In direct IO mode, an aligned buffer is allocated internally.
// 1. If aligned_buf is null, then results are copied to the buffer
// starting from scratch;
// 2. Otherwise, scratch is not used and can be null, the aligned_buf owns
// the internally allocated buffer on return, and the result refers to a
// region in aligned_buf.
Status Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
char* scratch, AlignedBuf* aligned_buf,
bool for_compaction = false) const;
// REQUIRES:
// num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing.
// In non-direct IO mode, aligned_buf should be null;
// In direct IO mode, aligned_buf stores the aligned buffer allocated inside
// MultiRead, the result Slices in reqs refer to aligned_buf.
Status MultiRead(const IOOptions& opts, FSReadRequest* reqs, size_t num_reqs,
AlignedBuf* aligned_buf) const;
Status Prefetch(uint64_t offset, size_t n) const {
return file_->Prefetch(offset, n, IOOptions(), nullptr);
}
FSRandomAccessFile* file() { return file_.get(); }
const std::string& file_name() const { return file_name_; }
bool use_direct_io() const { return file_->use_direct_io(); }
Env* env() const { return env_; }
};
} // namespace ROCKSDB_NAMESPACE