Provide support for IOTracing for ReadAsync API (#9833)
Summary: Same as title Pull Request resolved: https://github.com/facebook/rocksdb/pull/9833 Test Plan: Add unit test and manually check the output of tracing logs For fixed readahead_size it logs as: ``` Access Time : 193352113447923 , File Name: 000026.sst , File Operation: ReadAsync , Latency: 15075 , IO Status: OK, Length: 12288, Offset: 659456 Access Time : 193352113465232 , File Name: 000026.sst , File Operation: ReadAsync , Latency: 14425 , IO Status: OK, Length: 12288, Offset: 671744 Access Time : 193352113481539 , File Name: 000026.sst , File Operation: ReadAsync , Latency: 13062 , IO Status: OK, Length: 12288, Offset: 684032 Access Time : 193352113497692 , File Name: 000026.sst , File Operation: ReadAsync , Latency: 13649 , IO Status: OK, Length: 12288, Offset: 696320 Access Time : 193352113520043 , File Name: 000026.sst , File Operation: ReadAsync , Latency: 19384 , IO Status: OK, Length: 12288, Offset: 708608 Access Time : 193352113538401 , File Name: 000026.sst , File Operation: ReadAsync , Latency: 15406 , IO Status: OK, Length: 12288, Offset: 720896 Access Time : 193352113554855 , File Name: 000026.sst , File Operation: ReadAsync , Latency: 13670 , IO Status: OK, Length: 12288, Offset: 733184 Access Time : 193352113571624 , File Name: 000026.sst , File Operation: ReadAsync , Latency: 13855 , IO Status: OK, Length: 12288, Offset: 745472 Access Time : 193352113587924 , File Name: 000026.sst , File Operation: ReadAsync , Latency: 13953 , IO Status: OK, Length: 12288, Offset: 757760 Access Time : 193352113603285 , File Name: 000026.sst , File Operation: Prefetch , Latency: 59 , IO Status: Not implemented: Prefetch not supported, Length: 8868, Offset: 898349 ``` For implicit readahead: ``` Access Time : 193351865156587 , File Name: 000026.sst , File Operation: Prefetch , Latency: 48 , IO Status: Not implemented: Prefetch not supported, Length: 12266, Offset: 391174 Access Time : 193351865160354 , File Name: 000026.sst , File Operation: Prefetch , Latency: 51 , IO Status: Not implemented: Prefetch not supported, Length: 12266, Offset: 395248 Access Time : 193351865164253 , File Name: 000026.sst , File Operation: Prefetch , Latency: 49 , IO Status: Not implemented: Prefetch not supported, Length: 12266, Offset: 399322 Access Time : 193351865165461 , File Name: 000026.sst , File Operation: ReadAsync , Latency: 222871 , IO Status: OK, Length: 135168, Offset: 401408 ``` Reviewed By: anand1976 Differential Revision: D35601634 Pulled By: akankshamahajan15 fbshipit-source-id: 5a4f32a850af878efa0767bd5706380152a1f26e
This commit is contained in:
parent
07a00828af
commit
c8bae6e29c
@ -14,6 +14,7 @@
|
||||
* RemoteCompaction supports table_properties_collector_factories override on compaction worker.
|
||||
* Start tracking SST unique id in MANIFEST, which will be used to verify with SST properties during DB open to make sure the SST file is not overwritten or misplaced. A db option `verify_sst_unique_id_in_manifest` is introduced to enable/disable the verification, if enabled all SST files will be opened during DB-open to verify the unique id (default is false), so it's recommended to use it with `max_open_files = -1` to pre-open the files.
|
||||
* Added the ability to concurrently read data blocks from multiple files in a level in batched MultiGet. This can be enabled by setting the async_io option in ReadOptions. Using this feature requires a FileSystem that supports ReadAsync (PosixFileSystem is not supported yet for this), and for RocksDB to be compiled with folly and c++20.
|
||||
* Add FileSystem::ReadAsync API in io_tracing.
|
||||
|
||||
### Public API changes
|
||||
* Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions.
|
||||
|
2
Makefile
2
Makefile
@ -1876,7 +1876,7 @@ testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY)
|
||||
io_tracer_test: $(OBJ_DIR)/trace_replay/io_tracer_test.o $(OBJ_DIR)/trace_replay/io_tracer.o $(TEST_LIBRARY) $(LIBRARY)
|
||||
$(AM_LINK)
|
||||
|
||||
prefetch_test: $(OBJ_DIR)/file/prefetch_test.o $(TEST_LIBRARY) $(LIBRARY)
|
||||
prefetch_test: $(OBJ_DIR)/file/prefetch_test.o $(OBJ_DIR)/tools/io_tracer_parser_tool.o $(TEST_LIBRARY) $(LIBRARY)
|
||||
$(AM_LINK)
|
||||
|
||||
io_tracer_parser_test: $(OBJ_DIR)/tools/io_tracer_parser_test.o $(OBJ_DIR)/tools/io_tracer_parser_tool.o $(TEST_LIBRARY) $(LIBRARY)
|
||||
|
45
env/file_system_tracer.cc
vendored
45
env/file_system_tracer.cc
vendored
@ -338,6 +338,51 @@ IOStatus FSRandomAccessFileTracingWrapper::InvalidateCache(size_t offset,
|
||||
return s;
|
||||
}
|
||||
|
||||
IOStatus FSRandomAccessFileTracingWrapper::ReadAsync(
|
||||
FSReadRequest& req, const IOOptions& opts,
|
||||
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
|
||||
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) {
|
||||
// Create a callback and populate info.
|
||||
auto read_async_callback =
|
||||
std::bind(&FSRandomAccessFileTracingWrapper::ReadAsyncCallback, this,
|
||||
std::placeholders::_1, std::placeholders::_2);
|
||||
ReadAsyncCallbackInfo* read_async_cb_info = new ReadAsyncCallbackInfo;
|
||||
read_async_cb_info->cb_ = cb;
|
||||
read_async_cb_info->cb_arg_ = cb_arg;
|
||||
read_async_cb_info->start_time_ = clock_->NowNanos();
|
||||
read_async_cb_info->file_op_ = __func__;
|
||||
|
||||
IOStatus s = target()->ReadAsync(req, opts, read_async_callback,
|
||||
read_async_cb_info, io_handle, del_fn, dbg);
|
||||
|
||||
if (!s.ok()) {
|
||||
delete read_async_cb_info;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
void FSRandomAccessFileTracingWrapper::ReadAsyncCallback(
|
||||
const FSReadRequest& req, void* cb_arg) {
|
||||
ReadAsyncCallbackInfo* read_async_cb_info =
|
||||
static_cast<ReadAsyncCallbackInfo*>(cb_arg);
|
||||
assert(read_async_cb_info);
|
||||
assert(read_async_cb_info->cb_);
|
||||
|
||||
uint64_t elapsed = clock_->NowNanos() - read_async_cb_info->start_time_;
|
||||
uint64_t io_op_data = 0;
|
||||
io_op_data |= (1 << IOTraceOp::kIOLen);
|
||||
io_op_data |= (1 << IOTraceOp::kIOOffset);
|
||||
IOTraceRecord io_record(clock_->NowNanos(), TraceType::kIOTracer, io_op_data,
|
||||
read_async_cb_info->file_op_, elapsed,
|
||||
req.status.ToString(), file_name_, req.result.size(),
|
||||
req.offset);
|
||||
io_tracer_->WriteIOOp(io_record, nullptr /*dbg*/);
|
||||
|
||||
// call the underlying callback.
|
||||
read_async_cb_info->cb_(req, read_async_cb_info->cb_arg_);
|
||||
delete read_async_cb_info;
|
||||
}
|
||||
|
||||
IOStatus FSWritableFileTracingWrapper::Append(const Slice& data,
|
||||
const IOOptions& options,
|
||||
IODebugContext* dbg) {
|
||||
|
14
env/file_system_tracer.h
vendored
14
env/file_system_tracer.h
vendored
@ -228,11 +228,25 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileOwnerWrapper {
|
||||
|
||||
IOStatus InvalidateCache(size_t offset, size_t length) override;
|
||||
|
||||
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
|
||||
std::function<void(const FSReadRequest&, void*)> cb,
|
||||
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
|
||||
IODebugContext* dbg) override;
|
||||
|
||||
void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);
|
||||
|
||||
private:
|
||||
std::shared_ptr<IOTracer> io_tracer_;
|
||||
SystemClock* clock_;
|
||||
// Stores file name instead of full path.
|
||||
std::string file_name_;
|
||||
|
||||
struct ReadAsyncCallbackInfo {
|
||||
uint64_t start_time_;
|
||||
std::function<void(const FSReadRequest&, void*)> cb_;
|
||||
void* cb_arg_;
|
||||
std::string file_op_;
|
||||
};
|
||||
};
|
||||
|
||||
// The FSRandomAccessFilePtr is a wrapper class that takes pointer to storage
|
||||
|
@ -5,6 +5,9 @@
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#ifdef GFLAGS
|
||||
#include "tools/io_tracer_parser_tool.h"
|
||||
#endif
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
@ -1266,6 +1269,33 @@ class PrefetchTestWithPosix : public DBTestBase,
|
||||
public ::testing::WithParamInterface<bool> {
|
||||
public:
|
||||
PrefetchTestWithPosix() : DBTestBase("prefetch_test_with_posix", true) {}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
#ifdef GFLAGS
|
||||
const int kMaxArgCount = 100;
|
||||
const size_t kArgBufferSize = 100000;
|
||||
|
||||
void RunIOTracerParserTool(std::string trace_file) {
|
||||
std::vector<std::string> params = {"./io_tracer_parser",
|
||||
"-io_trace_file=" + trace_file};
|
||||
|
||||
char arg_buffer[kArgBufferSize];
|
||||
char* argv[kMaxArgCount];
|
||||
int argc = 0;
|
||||
int cursor = 0;
|
||||
for (const auto& arg : params) {
|
||||
ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize);
|
||||
ASSERT_LE(argc + 1, kMaxArgCount);
|
||||
|
||||
snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str());
|
||||
|
||||
argv[argc++] = arg_buffer + cursor;
|
||||
cursor += static_cast<int>(arg.size()) + 1;
|
||||
}
|
||||
ASSERT_EQ(0, ROCKSDB_NAMESPACE::io_tracer_parser(argc, argv));
|
||||
}
|
||||
#endif // GFLAGS
|
||||
#endif // ROCKSDB_LITE
|
||||
};
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(PrefetchTestWithPosix, PrefetchTestWithPosix,
|
||||
@ -1433,6 +1463,133 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
|
||||
|
||||
Close();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
#ifdef GFLAGS
|
||||
TEST_P(PrefetchTestWithPosix, TraceReadAsyncWithCallbackWrapper) {
|
||||
if (mem_env_ || encrypted_env_) {
|
||||
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
|
||||
return;
|
||||
}
|
||||
|
||||
const int kNumKeys = 1000;
|
||||
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
|
||||
FileSystem::Default(), /*support_prefetch=*/false);
|
||||
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
|
||||
|
||||
bool use_direct_io = false;
|
||||
Options options = CurrentOptions();
|
||||
options.write_buffer_size = 1024;
|
||||
options.create_if_missing = true;
|
||||
options.compression = kNoCompression;
|
||||
options.env = env.get();
|
||||
options.statistics = CreateDBStatistics();
|
||||
if (use_direct_io) {
|
||||
options.use_direct_reads = true;
|
||||
options.use_direct_io_for_flush_and_compaction = true;
|
||||
}
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.no_block_cache = true;
|
||||
table_options.cache_index_and_filter_blocks = false;
|
||||
table_options.metadata_block_size = 1024;
|
||||
table_options.index_type =
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
|
||||
Status s = TryReopen(options);
|
||||
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
|
||||
// If direct IO is not supported, skip the test
|
||||
return;
|
||||
} else {
|
||||
ASSERT_OK(s);
|
||||
}
|
||||
|
||||
int total_keys = 0;
|
||||
// Write the keys.
|
||||
{
|
||||
WriteBatch batch;
|
||||
Random rnd(309);
|
||||
for (int j = 0; j < 5; j++) {
|
||||
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
|
||||
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
|
||||
total_keys++;
|
||||
}
|
||||
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
MoveFilesToLevel(2);
|
||||
}
|
||||
|
||||
int buff_prefetch_count = 0;
|
||||
bool read_async_called = false;
|
||||
ReadOptions ro;
|
||||
ro.adaptive_readahead = true;
|
||||
ro.async_io = true;
|
||||
|
||||
if (GetParam()) {
|
||||
ro.readahead_size = 16 * 1024;
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
|
||||
[&](void*) { buff_prefetch_count++; });
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"UpdateResults::io_uring_result",
|
||||
[&](void* /*arg*/) { read_async_called = true; });
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Read the keys.
|
||||
{
|
||||
// Start io_tracing.
|
||||
WriteOptions write_opt;
|
||||
TraceOptions trace_opt;
|
||||
std::unique_ptr<TraceWriter> trace_writer;
|
||||
std::string trace_file_path = dbname_ + "/io_trace_file";
|
||||
|
||||
ASSERT_OK(
|
||||
NewFileTraceWriter(env_, EnvOptions(), trace_file_path, &trace_writer));
|
||||
ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer)));
|
||||
ASSERT_OK(options.statistics->Reset());
|
||||
|
||||
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
|
||||
int num_keys = 0;
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
ASSERT_OK(iter->status());
|
||||
num_keys++;
|
||||
}
|
||||
|
||||
// End the tracing.
|
||||
ASSERT_OK(db_->EndIOTrace());
|
||||
ASSERT_OK(env_->FileExists(trace_file_path));
|
||||
|
||||
ASSERT_EQ(num_keys, total_keys);
|
||||
ASSERT_GT(buff_prefetch_count, 0);
|
||||
|
||||
// Check stats to make sure async prefetch is done.
|
||||
{
|
||||
HistogramData async_read_bytes;
|
||||
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
|
||||
// Not all platforms support iouring. In that case, ReadAsync in posix
|
||||
// won't submit async requests.
|
||||
if (read_async_called) {
|
||||
ASSERT_GT(async_read_bytes.count, 0);
|
||||
} else {
|
||||
ASSERT_EQ(async_read_bytes.count, 0);
|
||||
}
|
||||
}
|
||||
|
||||
// Check the file to see if ReadAsync is logged.
|
||||
RunIOTracerParserTool(trace_file_path);
|
||||
}
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
|
||||
Close();
|
||||
}
|
||||
#endif // GFLAGS
|
||||
#endif // ROCKSDB_LITE
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -457,9 +457,16 @@ IOStatus RandomAccessFileReader::ReadAsync(
|
||||
|
||||
IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
|
||||
io_handle, del_fn, nullptr /*dbg*/);
|
||||
// Suppress false positive clang analyzer warnings.
|
||||
// Memory is not released if file_->ReadAsync returns !s.ok(), because
|
||||
// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
|
||||
// called then ReadAsync should always return IOStatus::OK().
|
||||
#ifndef __clang_analyzer__
|
||||
if (!s.ok()) {
|
||||
delete read_async_info;
|
||||
}
|
||||
#endif // __clang_analyzer__
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user