Update stats for Read and ReadAsync in random_access_file_reader for async prefetching (#9810)

Summary:
Update stats in random_access_file_reader for Read and
ReadAsync API to take into account the read latency for async
prefetching.

It also fixes ERROR_HANDLER_AUTORESUME_RETRY_COUNT stat whose value was
incorrect in portal.h

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9810

Test Plan: Update unit test

Reviewed By: anand1976

Differential Revision: D35433081

Pulled By: akankshamahajan15

fbshipit-source-id: aeec3901270e58a003ce6b5214bd25ddcb3a12a9
This commit is contained in:
Akanksha Mahajan 2022-04-06 14:26:53 -07:00 committed by akankshamahajan
parent 71158842aa
commit 80bea2ba8e
7 changed files with 128 additions and 10 deletions

View File

@ -694,8 +694,10 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.statistics = CreateDBStatistics();
options.env = env.get();
if (std::get<0>(GetParam())) {
bool use_direct_io = std::get<0>(GetParam());
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
@ -708,8 +710,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (std::get<0>(GetParam()) &&
(s.IsNotSupported() || s.IsInvalidArgument())) {
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
@ -766,6 +767,8 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
// TODO akanksha: Remove after adding new units.
ro.async_io = true;
}
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -773,15 +776,25 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
buff_prefetch_count = 0;
// For index and data blocks.
if (is_adaptive_readahead) {
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
} else {
ASSERT_EQ(readahead_carry_over_count, 0);
}
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (ro.async_io && !use_direct_io) {
ASSERT_GT(async_read_bytes.count, 0);
} else {
ASSERT_EQ(async_read_bytes.count, 0);
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
@ -902,6 +915,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
table_options.block_cache = cache;
@ -948,7 +963,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
// TODO akanksha: Remove after adding new units.
ro.async_io = true;
{
/*
@ -964,7 +978,9 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
iter->Seek(BuildKey(1019));
buff_prefetch_count = 0;
}
{
ASSERT_OK(options.statistics->Reset());
// After caching, blocks will be read from cache (Sequential blocks)
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(BuildKey(0));
@ -1008,6 +1024,18 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
ASSERT_EQ(buff_prefetch_count, 2);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (GetParam()) {
ASSERT_EQ(async_read_bytes.count, 0);
} else {
ASSERT_GT(async_read_bytes.count, 0);
}
}
buff_prefetch_count = 0;
}
Close();
@ -1033,6 +1061,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
options.statistics = CreateDBStatistics();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
@ -1080,6 +1109,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
ro.adaptive_readahead = true;
ro.async_io = true;
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -1088,7 +1118,19 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
}
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
#if defined(ROCKSDB_IOURING_PRESENT)
ASSERT_GT(async_read_bytes.count, 0);
#else
ASSERT_EQ(async_read_bytes.count, 0);
#endif
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();

View File

@ -425,19 +425,75 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
}
}
// TODO akanksha: Add perf_times etc.
// TODO akanksha:
// 1. Handle use_direct_io case which currently calls Read API.
IOStatus RandomAccessFileReader::ReadAsync(
FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn,
Env::IOPriority rate_limiter_priority) {
if (use_direct_io()) {
// For direct_io, it calls Read API.
req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch,
nullptr /*dbg*/, rate_limiter_priority);
cb(req, cb_arg);
return IOStatus::OK();
}
return file_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn,
nullptr /*dbg*/);
// Create a callback and populate info.
auto read_async_callback =
std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
std::placeholders::_1, std::placeholders::_2);
ReadAsyncInfo* read_async_info = new ReadAsyncInfo;
read_async_info->cb_ = cb;
read_async_info->cb_arg_ = cb_arg;
read_async_info->start_time_ = clock_->NowMicros();
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
}
#endif
IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
io_handle, del_fn, nullptr /*dbg*/);
if (!s.ok()) {
delete read_async_info;
}
return s;
}
void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
void* cb_arg) {
ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
assert(read_async_info);
assert(read_async_info->cb_);
read_async_info->cb_(req, read_async_info->cb_arg_);
// Update stats and notify listeners.
if (stats_ != nullptr && file_read_hist_ != nullptr) {
// elapsed doesn't take into account delay and overwrite as StopWatch does
// in Read.
uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
file_read_hist_->Add(elapsed);
}
if (req.status.ok()) {
RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(req.offset, req.result.size(),
read_async_info->fs_start_ts_, finish_ts,
req.status);
}
if (!req.status.ok()) {
NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
req.result.size(), req.offset);
}
#endif
RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
delete read_async_info;
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -92,6 +92,15 @@ class RandomAccessFileReader {
const Temperature file_temperature_;
const bool is_last_level_;
struct ReadAsyncInfo {
#ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint fs_start_ts_;
#endif
uint64_t start_time_;
std::function<void(const FSReadRequest&, void*)> cb_;
void* cb_arg_;
};
public:
explicit RandomAccessFileReader(
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
@ -179,5 +188,7 @@ class RandomAccessFileReader {
std::function<void(const FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
Env::IOPriority rate_limiter_priority);
void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -534,6 +534,8 @@ enum Histograms : uint32_t {
// Error handler statistics
ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
ASYNC_READ_BYTES,
HISTOGRAM_ENUM_MAX,
};

View File

@ -5551,7 +5551,9 @@ class HistogramTypeJni {
case ROCKSDB_NAMESPACE::Histograms::NUM_SST_READ_PER_LEVEL:
return 0x31;
case ROCKSDB_NAMESPACE::Histograms::ERROR_HANDLER_AUTORESUME_RETRY_COUNT:
return 0x31;
return 0x32;
case ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES:
return 0x33;
case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
// 0x1F for backwards compatibility on current minor version.
return 0x1F;
@ -5669,6 +5671,8 @@ class HistogramTypeJni {
case 0x32:
return ROCKSDB_NAMESPACE::Histograms::
ERROR_HANDLER_AUTORESUME_RETRY_COUNT;
case 0x33:
return ROCKSDB_NAMESPACE::Histograms::ASYNC_READ_BYTES;
case 0x1F:
// 0x1F for backwards compatibility on current minor version.
return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;

View File

@ -180,6 +180,8 @@ public enum HistogramType {
*/
ERROR_HANDLER_AUTORESUME_RETRY_COUNT((byte) 0x32),
ASYNC_READ_BYTES((byte) 0x33),
// 0x1F for backwards compatibility on current minor version.
HISTOGRAM_ENUM_MAX((byte) 0x1F);

View File

@ -283,6 +283,7 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{NUM_SST_READ_PER_LEVEL, "rocksdb.num.sst.read.per.level"},
{ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
"rocksdb.error.handler.autoresume.retry.count"},
{ASYNC_READ_BYTES, "rocksdb.async.read.bytes"},
};
std::shared_ptr<Statistics> CreateDBStatistics() {