From 60e5af83c14f48e302a29591deab6d06bf59d30f Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 18 May 2021 16:08:21 -0700 Subject: [PATCH] Handle return code by io_uring_submit_and_wait() and io_uring_wait_cqe() (#8311) Summary: Right now return codes by io_uring_submit_and_wait() and io_uring_wait_cqe() are not handled. It is not the good practice. Although these two functions are not supposed to return non-0 values in normal exeuction, people suspect that they might return non-0 value when an interruption happens, and the code might cause hanging. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8311 Test Plan: Make sure at least normal test cases still pass. Reviewed By: anand1976 Differential Revision: D28500828 fbshipit-source-id: 8a76cea9cafbd041102e0b6a8eef9d0bfed7c211 --- HISTORY.md | 1 + env/env_test.cc | 116 ++++++++++++++++++++++++++++++++++++++++++++++++ env/io_posix.cc | 38 ++++++++++++++-- 3 files changed, 151 insertions(+), 4 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 1f23fac2f..356ddacc5 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Fixed a data race between insertion into memtables and the retrieval of the DB properties `rocksdb.cur-size-active-mem-table`, `rocksdb.cur-size-all-mem-tables`, and `rocksdb.size-all-mem-tables`. * Fixed the false-positive alert when recovering from the WAL file. Avoid reporting "SST file is ahead of WAL" on a newly created empty column family, if the previous WAL file is corrupted. * Fixed a bug where `GetLiveFiles()` output included a non-existent file called "OPTIONS-000000". Backups and checkpoints, which use `GetLiveFiles()`, failed on DBs impacted by this bug. Read-write DBs were impacted when the latest OPTIONS file failed to write and `fail_if_options_file_error == false`. Read-only DBs were impacted when no OPTIONS files existed. +* Handle return code by io_uring_submit_and_wait() and io_uring_wait_cqe(). ### Behavior Changes * Due to the fix of false-postive alert of "SST file is ahead of WAL", all the CFs with no SST file (CF empty) will bypass the consistency check. We fixed a false-positive, but introduced a very rare true-negative which will be triggered in the following conditions: A CF with some delete operations in the last a few queries which will result in an empty CF (those are flushed to SST file and a compaction triggered which combines this file and all other SST files and generates an empty CF, or there is another reason to write a manifest entry for this CF after a flush that generates no SST file from an empty CF). The deletion entries are logged in a WAL and this WAL was corrupted, while the CF's log number points to the next WAL (due to the flush). Therefore, the DB can only recover to the point without these trailing deletions and cause the inconsistent DB status. diff --git a/env/env_test.cc b/env/env_test.cc index 1c4340b43..c1d7173ff 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -11,6 +11,11 @@ #include #endif +#if defined(ROCKSDB_IOURING_PRESENT) +#include +#include +#endif + #include #include @@ -1359,6 +1364,117 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) { } } +#if defined(ROCKSDB_IOURING_PRESENT) +void GenerateFilesAndRequest(Env* env, const std::string& fname, + std::vector* ret_reqs, + std::vector* scratches) { + const size_t kTotalSize = 81920; + Random rnd(301); + std::string expected_data = rnd.RandomString(kTotalSize); + + // Create file. + { + std::unique_ptr wfile; + ASSERT_OK(env->NewWritableFile(fname, &wfile, EnvOptions())); + ASSERT_OK(wfile->Append(expected_data)); + ASSERT_OK(wfile->Close()); + } + + // Right now kIoUringDepth is hard coded as 256, so we need very large + // number of keys to cover the case of multiple rounds of submissions. + // Right now the test latency is still acceptable. If it ends up with + // too long, we can modify the io uring depth with SyncPoint here. + const int num_reads = 3; + std::vector offsets = {10000, 20000, 30000}; + std::vector lens = {3000, 200, 100}; + + // Create requests + scratches->reserve(num_reads); + std::vector& reqs = *ret_reqs; + reqs.resize(num_reads); + for (int i = 0; i < num_reads; ++i) { + reqs[i].offset = offsets[i]; + reqs[i].len = lens[i]; + scratches->emplace_back(reqs[i].len, ' '); + reqs[i].scratch = const_cast(scratches->back().data()); + } +} + +TEST_F(EnvPosixTest, MultiReadIOUringError) { + // In this test we don't do aligned read, wo it doesn't work for + // direct I/O case. + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = false; + std::string fname = test::PerThreadDBPath(env_, "testfile"); + + std::vector scratches; + std::vector reqs; + GenerateFilesAndRequest(env_, fname, &reqs, &scratches); + // Query the data + std::unique_ptr file; + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + + bool io_uring_wait_cqe_called = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", + [&](void* arg) { + if (!io_uring_wait_cqe_called) { + io_uring_wait_cqe_called = true; + ssize_t& ret = *(static_cast(arg)); + ret = 1; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + Status s = file->MultiRead(reqs.data(), reqs.size()); + if (io_uring_wait_cqe_called) { + ASSERT_NOK(s); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(EnvPosixTest, MultiReadIOUringError2) { + // In this test we don't do aligned read, wo it doesn't work for + // direct I/O case. + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = false; + std::string fname = test::PerThreadDBPath(env_, "testfile"); + + std::vector scratches; + std::vector reqs; + GenerateFilesAndRequest(env_, fname, &reqs, &scratches); + // Query the data + std::unique_ptr file; + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + + bool io_uring_submit_and_wait_called = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1", + [&](void* arg) { + io_uring_submit_and_wait_called = true; + ssize_t* ret = static_cast(arg); + (*ret)--; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2", + [&](void* arg) { + struct io_uring* iu = static_cast(arg); + struct io_uring_cqe* cqe; + assert(io_uring_wait_cqe(iu, &cqe) == 0); + io_uring_cqe_seen(iu, cqe); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + Status s = file->MultiRead(reqs.data(), reqs.size()); + if (io_uring_submit_and_wait_called) { + ASSERT_NOK(s); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} +#endif // ROCKSDB_IOURING_PRESENT + // Only works in linux platforms #ifdef OS_WIN TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) { diff --git a/env/io_posix.cc b/env/io_posix.cc index de0a74539..a041b32aa 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -633,6 +633,8 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg); } + IOStatus ios = IOStatus::OK(); + struct WrappedReadRequest { FSReadRequest* req; struct iovec iov; @@ -679,19 +681,47 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, ssize_t ret = io_uring_submit_and_wait(iu, static_cast(this_reqs)); + TEST_SYNC_POINT_CALLBACK( + "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1", + &ret); + TEST_SYNC_POINT_CALLBACK( + "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2", + iu); + if (static_cast(ret) != this_reqs) { fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs); + // If error happens and we submitted fewer than expected, it is an + // exception case and we don't retry here. We should still consume + // what is is submitted in the ring. + for (ssize_t i = 0; i < ret; i++) { + struct io_uring_cqe* cqe = nullptr; + io_uring_wait_cqe(iu, &cqe); + if (cqe != nullptr) { + io_uring_cqe_seen(iu, cqe); + } + } + return IOStatus::IOError("io_uring_submit_and_wait() requested " + + ToString(this_reqs) + " but returned " + + ToString(ret)); } - assert(static_cast(ret) == this_reqs); for (size_t i = 0; i < this_reqs; i++) { - struct io_uring_cqe* cqe; + struct io_uring_cqe* cqe = nullptr; WrappedReadRequest* req_wrap; // We could use the peek variant here, but this seems safer in terms // of our initial wait not reaping all completions ret = io_uring_wait_cqe(iu, &cqe); - assert(!ret); + TEST_SYNC_POINT_CALLBACK( + "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret); + if (ret) { + ios = IOStatus::IOError("io_uring_wait_cqe() returns " + ToString(ret)); + + if (cqe != nullptr) { + io_uring_cqe_seen(iu, cqe); + } + continue; + } req_wrap = static_cast(io_uring_cqe_get_data(cqe)); FSReadRequest* req = req_wrap->req; @@ -740,7 +770,7 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, io_uring_cqe_seen(iu, cqe); } } - return IOStatus::OK(); + return ios; #else return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg); #endif